|
@@ -0,0 +1,188 @@
|
|
|
+package mqtt_client
|
|
|
+
|
|
|
+import (
|
|
|
+ "errors"
|
|
|
+ "fmt"
|
|
|
+ "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router"
|
|
|
+ "git.sxidc.com/go-tools/api_binding/utils"
|
|
|
+ mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
+)
|
|
|
+
|
|
|
+type MqttClientOptions struct {
|
|
|
+ UserName string
|
|
|
+ Password string
|
|
|
+ Address string
|
|
|
+ ClientID string
|
|
|
+ KeepAliveSec time.Duration
|
|
|
+ PingTimeoutSec time.Duration
|
|
|
+}
|
|
|
+
|
|
|
+func (opt *MqttClientOptions) check() error {
|
|
|
+ if utils.IsStringEmpty(opt.UserName) {
|
|
|
+ return errors.New("必须传递用户名")
|
|
|
+ }
|
|
|
+
|
|
|
+ if utils.IsStringEmpty(opt.Password) {
|
|
|
+ return errors.New("必须传递密码")
|
|
|
+ }
|
|
|
+
|
|
|
+ if utils.IsStringEmpty(opt.Address) {
|
|
|
+ return errors.New("必须传递地址")
|
|
|
+ }
|
|
|
+
|
|
|
+ if utils.IsStringEmpty(opt.ClientID) {
|
|
|
+ return errors.New("必须传递客户端ID")
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+type MqttClient struct {
|
|
|
+ client mqtt.Client
|
|
|
+ clientOptions *mqtt.ClientOptions
|
|
|
+
|
|
|
+ routersMutex *sync.Mutex
|
|
|
+ routers []router.Router
|
|
|
+}
|
|
|
+
|
|
|
+func NewMqttClient(opts *MqttClientOptions) (*MqttClient, error) {
|
|
|
+ if opts == nil {
|
|
|
+ return nil, errors.New("没有传递mqtt客户端选项")
|
|
|
+ }
|
|
|
+
|
|
|
+ err := opts.check()
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ mqttOptions := mqtt.NewClientOptions().
|
|
|
+ SetAutoReconnect(true).
|
|
|
+ SetUsername(opts.UserName).
|
|
|
+ SetPassword(opts.Password).
|
|
|
+ AddBroker(opts.Address).
|
|
|
+ SetClientID(opts.ClientID).
|
|
|
+ SetKeepAlive(opts.KeepAliveSec).
|
|
|
+ SetPingTimeout(opts.PingTimeoutSec).
|
|
|
+ SetWill(opts.ClientID+"/will", "dead", 2, true)
|
|
|
+
|
|
|
+ return &MqttClient{
|
|
|
+ client: mqtt.NewClient(mqttOptions),
|
|
|
+ clientOptions: mqttOptions,
|
|
|
+ routersMutex: &sync.Mutex{},
|
|
|
+ routers: make([]router.Router, 0),
|
|
|
+ }, nil
|
|
|
+}
|
|
|
+
|
|
|
+func DestroyMqttClient(c *MqttClient) {
|
|
|
+ if c != nil {
|
|
|
+ c.client = nil
|
|
|
+
|
|
|
+ c.routersMutex.Lock()
|
|
|
+ for _, r := range c.routers {
|
|
|
+ router.DestroyRouter(&r)
|
|
|
+ }
|
|
|
+ c.routers = nil
|
|
|
+ c.routersMutex.Unlock()
|
|
|
+ }
|
|
|
+
|
|
|
+ c = nil
|
|
|
+}
|
|
|
+
|
|
|
+func (c *MqttClient) Connect() error {
|
|
|
+ c.clientOptions.SetOnConnectHandler(func(client mqtt.Client) {
|
|
|
+ c.routersMutex.Lock()
|
|
|
+ defer c.routersMutex.Unlock()
|
|
|
+
|
|
|
+ err := c.rangeRouters(func(r *router.Router) error {
|
|
|
+ err := r.RangeItem(func(item router.Item) error {
|
|
|
+ topic := r.Group + item.Topic
|
|
|
+ token := c.client.Subscribe(topic, item.Qos, func(client mqtt.Client, message mqtt.Message) {
|
|
|
+ item.CallHandlers(message.Payload())
|
|
|
+ })
|
|
|
+ if token.Wait(); token.Error() != nil {
|
|
|
+ return token.Error()
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ return errors.New("SetOnConnectHandler订阅失败: " + err.Error())
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ fmt.Println(err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ })
|
|
|
+
|
|
|
+ token := c.client.Connect()
|
|
|
+ if token.Wait(); token.Error() != nil {
|
|
|
+ return token.Error()
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (c *MqttClient) Disconnect() {
|
|
|
+ c.client.Disconnect(10000)
|
|
|
+}
|
|
|
+
|
|
|
+func (c *MqttClient) GetRouter(group string, handlers []router.Handler) *router.Router {
|
|
|
+ r := router.NewRouter(group, handlers, func(item router.Item) error {
|
|
|
+ for {
|
|
|
+ if c.client.IsConnected() {
|
|
|
+ break
|
|
|
+ }
|
|
|
+
|
|
|
+ time.Sleep(1 * time.Second)
|
|
|
+ }
|
|
|
+
|
|
|
+ topic := group + item.Topic
|
|
|
+ token := c.client.Subscribe(topic, item.Qos, func(client mqtt.Client, message mqtt.Message) {
|
|
|
+ item.CallHandlers(message.Payload())
|
|
|
+ })
|
|
|
+ if token.Wait(); token.Error() != nil {
|
|
|
+ return token.Error()
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+
|
|
|
+ c.addRouter(r)
|
|
|
+
|
|
|
+ return r
|
|
|
+}
|
|
|
+
|
|
|
+func (c *MqttClient) Response(item *router.Item, data []byte) error {
|
|
|
+ token := c.client.Publish(item.Topic+"/reply", item.Qos, item.ResponseRetained, data)
|
|
|
+ if token.Wait(); token.Error() != nil {
|
|
|
+ return token.Error()
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (c *MqttClient) addRouter(router *router.Router) {
|
|
|
+ c.routersMutex.Lock()
|
|
|
+ defer c.routersMutex.Unlock()
|
|
|
+
|
|
|
+ c.routers = append(c.routers, *router)
|
|
|
+}
|
|
|
+
|
|
|
+func (c *MqttClient) rangeRouters(rangeFunc func(router *router.Router) error) error {
|
|
|
+ c.routersMutex.Lock()
|
|
|
+ defer c.routersMutex.Unlock()
|
|
|
+
|
|
|
+ for _, r := range c.routers {
|
|
|
+ err := rangeFunc(&r)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|