package mqtt_client import ( "errors" "fmt" "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router" "git.sxidc.com/go-tools/utils/strutils" mqtt "github.com/eclipse/paho.mqtt.golang" "sync" "time" ) type MqttClientOptions struct { UserName string Password string Address string ClientID string KeepAliveSec int64 PingTimeoutSec int64 WriteTimeoutSec int64 SubscribeRoutineCount int SubscribeBufferSize int } func (opts *MqttClientOptions) check() error { if strutils.IsStringEmpty(opts.UserName) { return errors.New("必须传递用户名") } if strutils.IsStringEmpty(opts.Password) { return errors.New("必须传递密码") } if strutils.IsStringEmpty(opts.Address) { return errors.New("必须传递地址") } if strutils.IsStringEmpty(opts.ClientID) { return errors.New("必须传递客户端ID") } return nil } type subscribePayload struct { item *router.Item payload []byte } type MqttClient struct { client mqtt.Client opts *MqttClientOptions routersMutex *sync.Mutex routers []*router.Router subscribeChan chan *subscribePayload subscribeDoneChan chan any } func NewMqttClient(opts *MqttClientOptions) (*MqttClient, error) { if opts == nil { return nil, errors.New("没有传递mqtt客户端选项") } err := opts.check() if err != nil { return nil, err } if opts.SubscribeRoutineCount == 0 { opts.SubscribeRoutineCount = 10 } if opts.SubscribeBufferSize == 0 { opts.SubscribeRoutineCount = 1024 } if opts.WriteTimeoutSec == 0 { opts.WriteTimeoutSec = 60 } mqttClient := &MqttClient{ opts: opts, routersMutex: &sync.Mutex{}, routers: make([]*router.Router, 0), subscribeChan: make(chan *subscribePayload, opts.SubscribeRoutineCount), subscribeDoneChan: make(chan any), } mqttOptions := mqtt.NewClientOptions(). SetAutoReconnect(true). SetUsername(opts.UserName). SetPassword(opts.Password). AddBroker(opts.Address). SetClientID(opts.ClientID). SetKeepAlive(time.Duration(opts.KeepAliveSec)*time.Second). SetPingTimeout(time.Duration(opts.PingTimeoutSec)*time.Second). SetWriteTimeout(time.Duration(opts.WriteTimeoutSec)*time.Second). SetWill(opts.ClientID+"/will", "dead", 2, false). SetOnConnectHandler(func(client mqtt.Client) { err := mqttClient.onConnect() if err != nil { fmt.Println(err) return } }). SetConnectionLostHandler(func(client mqtt.Client, _ error) { err := mqttClient.onConnectLost() if err != nil { fmt.Println(err) return } }) mqttClient.client = mqtt.NewClient(mqttOptions) for i := 0; i < opts.SubscribeRoutineCount; i++ { go mqttClient.subscribeRoutine() } return mqttClient, nil } func DestroyMqttClient(c *MqttClient) { if c != nil { close(c.subscribeDoneChan) c.subscribeDoneChan = nil close(c.subscribeChan) c.subscribeChan = nil c.client = nil c.routersMutex.Lock() for _, r := range c.routers { router.DestroyRouter(r) } c.routers = nil c.routersMutex.Unlock() } c = nil } func (c *MqttClient) Connect() error { token := c.client.Connect() if !token.WaitTimeout(time.Duration(c.opts.WriteTimeoutSec) * time.Second) { return errors.New("连接超时") } if token.Error() != nil { return token.Error() } return nil } func (c *MqttClient) Disconnect() { c.client.Disconnect(250) } func (c *MqttClient) GetRouter(group string, handlers []router.Handler) *router.Router { r := router.NewRouter(group, handlers, func(item *router.Item) error { for { if c.client.IsConnected() { break } time.Sleep(1 * time.Second) } err := item.DoIfUnSubscribe(func() error { return c.subscribe(item) }) if err != nil { return err } return nil }) c.addRouter(r) return r } func (c *MqttClient) Response(item *router.Item, responseIdentifier string, data []byte) error { replyTopic := item.Topic + "/reply" if strutils.IsStringNotEmpty(responseIdentifier) { replyTopic = fmt.Sprintf("%s/%s/reply", item.Topic, responseIdentifier) } token := c.client.Publish(replyTopic, 2, false, data) if !token.WaitTimeout(time.Duration(c.opts.WriteTimeoutSec) * time.Second) { return errors.New("发布超时") } if token.Error() != nil { return token.Error() } return nil } func (c *MqttClient) onConnect() error { err := c.rangeRouters(func(r *router.Router) error { err := r.RangeItem(func(item *router.Item) error { err := item.DoIfUnSubscribe(func() error { return c.subscribe(item) }) if err != nil { return err } return nil }) if err != nil { return errors.New("SetOnConnectHandler订阅失败: " + err.Error()) } return nil }) if err != nil { return err } return nil } func (c *MqttClient) onConnectLost() error { err := c.rangeRouters(func(r *router.Router) error { err := r.RangeItem(func(item *router.Item) error { item.SetUnSubscribe() return nil }) if err != nil { return err } return nil }) if err != nil { return errors.New("SetOnConnectHandler订阅失败: " + err.Error()) } return nil } func (c *MqttClient) addRouter(router *router.Router) { c.routersMutex.Lock() defer c.routersMutex.Unlock() c.routers = append(c.routers, router) } func (c *MqttClient) rangeRouters(rangeFunc func(router *router.Router) error) error { c.routersMutex.Lock() defer c.routersMutex.Unlock() for _, r := range c.routers { err := rangeFunc(r) if err != nil { return err } } return nil } func (c *MqttClient) subscribe(item *router.Item) error { token := c.client.Subscribe(item.Topic, 2, func(client mqtt.Client, message mqtt.Message) { c.subscribeChan <- &subscribePayload{ item: item, payload: message.Payload(), } }) if !token.WaitTimeout(time.Duration(c.opts.WriteTimeoutSec) * time.Second) { return errors.New("订阅超时") } if token.Error() != nil { return token.Error() } fmt.Println("[MQTT] Subscribe Topic: " + item.Topic) return nil } func (c *MqttClient) subscribeRoutine() { for { select { case <-c.subscribeDoneChan: return case payload := <-c.subscribeChan: payload.item.CallHandlers(payload.payload) } } }