package mqtt_client import ( "errors" "fmt" "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router" "git.sxidc.com/go-tools/api_binding/utils" mqtt "github.com/eclipse/paho.mqtt.golang" "sync" "time" ) type MqttClientOptions struct { UserName string Password string Address string ClientID string KeepAliveSec int64 PingTimeoutSec int64 WriteTimeoutSec int64 } func (opt *MqttClientOptions) check() error { if utils.IsStringEmpty(opt.UserName) { return errors.New("必须传递用户名") } if utils.IsStringEmpty(opt.Password) { return errors.New("必须传递密码") } if utils.IsStringEmpty(opt.Address) { return errors.New("必须传递地址") } if utils.IsStringEmpty(opt.ClientID) { return errors.New("必须传递客户端ID") } return nil } type MqttClient struct { client mqtt.Client routersMutex *sync.Mutex routers []*router.Router } func NewMqttClient(opts *MqttClientOptions) (*MqttClient, error) { if opts == nil { return nil, errors.New("没有传递mqtt客户端选项") } err := opts.check() if err != nil { return nil, err } mqttClient := &MqttClient{ routersMutex: &sync.Mutex{}, routers: make([]*router.Router, 0), } mqttOptions := mqtt.NewClientOptions(). SetAutoReconnect(true). SetUsername(opts.UserName). SetPassword(opts.Password). AddBroker(opts.Address). SetClientID(opts.ClientID). SetKeepAlive(time.Duration(opts.KeepAliveSec)*time.Second). SetPingTimeout(time.Duration(opts.PingTimeoutSec)*time.Second). SetWriteTimeout(time.Duration(opts.WriteTimeoutSec)*time.Second). SetWill(opts.ClientID+"/will", "dead", 2, false). SetOnConnectHandler(func(client mqtt.Client) { err := mqttClient.onConnect() if err != nil { fmt.Println(err) return } }). SetConnectionLostHandler(func(client mqtt.Client, _ error) { err := mqttClient.onConnectLost() if err != nil { fmt.Println(err) return } }) mqttClient.client = mqtt.NewClient(mqttOptions) return mqttClient, nil } func DestroyMqttClient(c *MqttClient) { if c != nil { c.client = nil c.routersMutex.Lock() for _, r := range c.routers { router.DestroyRouter(r) } c.routers = nil c.routersMutex.Unlock() } c = nil } func (c *MqttClient) Connect() error { token := c.client.Connect() if token.Wait(); token.Error() != nil { return token.Error() } return nil } func (c *MqttClient) Disconnect() { c.client.Disconnect(250) } func (c *MqttClient) GetRouter(group string, handlers []router.Handler) *router.Router { r := router.NewRouter(group, handlers, func(item *router.Item) error { for { if c.client.IsConnected() { break } time.Sleep(1 * time.Second) } err := item.DoIfUnSubscribe(func() error { return c.subscribe(item) }) if err != nil { return err } return nil }) c.addRouter(r) return r } func (c *MqttClient) Response(item *router.Item, data []byte) error { token := c.client.Publish(item.Topic+"/reply", 2, false, data) if token.Wait(); token.Error() != nil { return token.Error() } return nil } func (c *MqttClient) onConnect() error { err := c.rangeRouters(func(r *router.Router) error { err := r.RangeItem(func(item *router.Item) error { err := item.DoIfUnSubscribe(func() error { return c.subscribe(item) }) if err != nil { return err } return nil }) if err != nil { return errors.New("SetOnConnectHandler订阅失败: " + err.Error()) } return nil }) if err != nil { return err } return nil } func (c *MqttClient) onConnectLost() error { err := c.rangeRouters(func(r *router.Router) error { err := r.RangeItem(func(item *router.Item) error { item.SetUnSubscribe() return nil }) if err != nil { return err } return nil }) if err != nil { return errors.New("SetOnConnectHandler订阅失败: " + err.Error()) } return nil } func (c *MqttClient) addRouter(router *router.Router) { c.routersMutex.Lock() defer c.routersMutex.Unlock() c.routers = append(c.routers, router) } func (c *MqttClient) rangeRouters(rangeFunc func(router *router.Router) error) error { c.routersMutex.Lock() defer c.routersMutex.Unlock() for _, r := range c.routers { err := rangeFunc(r) if err != nil { return err } } return nil } func (c *MqttClient) subscribe(item *router.Item) error { token := c.client.Subscribe(item.Topic, 2, func(client mqtt.Client, message mqtt.Message) { item.CallHandlers(message.Payload()) }) if token.Wait(); token.Error() != nil { return token.Error() } fmt.Println("[MQTT] Subscribe Topic: " + item.Topic) return nil }