package mqtt import ( "git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger" "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common" "git.sxidc.com/go-tools/utils/strutils" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/pkg/errors" "sync" "time" ) type MessageQueue struct { mqttClient mqtt.Client topicGroupMapHandlerMutex *sync.RWMutex topicGroupHandlerMap map[string]map[string]common.MessageHandler } func New(address string, userName string, password string) (*MessageQueue, error) { clientID := "baize-" + strutils.SimpleUUID()[0:8] mqttClient := mqtt.NewClient(mqtt.NewClientOptions(). SetAutoReconnect(true). SetUsername(password). SetPassword(userName). AddBroker(address). SetClientID(clientID). SetOrderMatters(false). SetWill(clientID+"/will", "dead", 2, true)) token := mqttClient.Connect() if !token.WaitTimeout(20 * time.Second) { return nil, errors.New("连接超时") } if token.Error() != nil { return nil, token.Error() } for !mqttClient.IsConnected() { logger.GetInstance().Info("等待连接MQTT代理...") time.Sleep(1 * time.Second) } return &MessageQueue{ mqttClient: mqttClient, topicGroupMapHandlerMutex: &sync.RWMutex{}, topicGroupHandlerMap: make(map[string]map[string]common.MessageHandler), }, nil } func Destroy(messageQueue *MessageQueue) { if messageQueue == nil { return } messageQueue.mqttClient.Disconnect(250) messageQueue.mqttClient = nil messageQueue = nil } func (messageQueue *MessageQueue) Subscribe(group string, topic string, handler common.MessageHandler) error { messageQueue.topicGroupMapHandlerMutex.Lock() defer messageQueue.topicGroupMapHandlerMutex.Unlock() token := messageQueue.mqttClient.Subscribe(topic, 1, func(client mqtt.Client, message mqtt.Message) { messageQueue.topicGroupMapHandlerMutex.RLock() defer messageQueue.topicGroupMapHandlerMutex.RUnlock() for _, groupHandler := range messageQueue.topicGroupHandlerMap[topic] { go groupHandler(messageQueue, message.Topic(), string(message.Payload())) } }) if !token.WaitTimeout(20 * time.Second) { return errors.New("订阅超时") } if token.Error() != nil { return token.Error() } groupHandlerMap, ok := messageQueue.topicGroupHandlerMap[topic] if !ok { groupHandlerMap = make(map[string]common.MessageHandler) } h, ok := groupHandlerMap[group] if !ok { h = handler } groupHandlerMap[group] = h messageQueue.topicGroupHandlerMap[topic] = groupHandlerMap return nil } func (messageQueue *MessageQueue) UnSubscribe(group string, topic string) error { messageQueue.topicGroupMapHandlerMutex.Lock() defer messageQueue.topicGroupMapHandlerMutex.Unlock() groupHandlerMap, ok := messageQueue.topicGroupHandlerMap[topic] if !ok { return nil } _, ok = groupHandlerMap[group] if !ok { return nil } delete(groupHandlerMap, group) if len(groupHandlerMap) == 0 { delete(messageQueue.topicGroupHandlerMap, topic) } else { messageQueue.topicGroupHandlerMap[topic] = groupHandlerMap } token := messageQueue.mqttClient.Unsubscribe(topic) if !token.WaitTimeout(20 * time.Second) { return errors.New("取消订阅超时") } if token.Error() != nil { return token.Error() } return nil } func (messageQueue *MessageQueue) Publish(topic string, data string) error { token := messageQueue.mqttClient.Publish(topic, 1, false, data) if !token.WaitTimeout(20 * time.Second) { return errors.New("发布订阅超时") } if token.Error() != nil { return token.Error() } return nil }