123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- package mqtt
- import (
- "git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger"
- "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common"
- "git.sxidc.com/go-tools/utils/strutils"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- "github.com/pkg/errors"
- "sync"
- "time"
- )
- type MessageQueue struct {
- mqttClient mqtt.Client
- topicGroupMapHandlerMutex *sync.RWMutex
- topicGroupHandlerMap map[string]map[string]common.MessageHandler
- }
- func New(address string, userName string, password string) (*MessageQueue, error) {
- clientID := "baize-" + strutils.SimpleUUID()[0:8]
- mqttClient := mqtt.NewClient(mqtt.NewClientOptions().
- SetAutoReconnect(true).
- SetUsername(password).
- SetPassword(userName).
- AddBroker(address).
- SetClientID(clientID).
- SetOrderMatters(false).
- SetWill(clientID+"/will", "dead", 2, true))
- token := mqttClient.Connect()
- if !token.WaitTimeout(20 * time.Second) {
- return nil, errors.New("连接超时")
- }
- if token.Error() != nil {
- return nil, token.Error()
- }
- for !mqttClient.IsConnected() {
- logger.GetInstance().Info("等待连接MQTT代理...")
- time.Sleep(1 * time.Second)
- }
- return &MessageQueue{
- mqttClient: mqttClient,
- topicGroupMapHandlerMutex: &sync.RWMutex{},
- topicGroupHandlerMap: make(map[string]map[string]common.MessageHandler),
- }, nil
- }
- func Destroy(messageQueue *MessageQueue) {
- if messageQueue == nil {
- return
- }
- messageQueue.mqttClient.Disconnect(250)
- messageQueue.mqttClient = nil
- messageQueue = nil
- }
- func (messageQueue *MessageQueue) Subscribe(group string, topic string, handler common.MessageHandler) error {
- messageQueue.topicGroupMapHandlerMutex.Lock()
- defer messageQueue.topicGroupMapHandlerMutex.Unlock()
- token := messageQueue.mqttClient.Subscribe(topic, 1, func(client mqtt.Client, message mqtt.Message) {
- messageQueue.topicGroupMapHandlerMutex.RLock()
- defer messageQueue.topicGroupMapHandlerMutex.RUnlock()
- event, err := common.UnmarshalCloudEvent([]byte(message.Payload()))
- if err != nil {
- logger.GetInstance().Error(err)
- return
- }
- for _, groupHandler := range messageQueue.topicGroupHandlerMap[topic] {
- go groupHandler(messageQueue, message.Topic(), event)
- }
- })
- if !token.WaitTimeout(20 * time.Second) {
- return errors.New("订阅超时")
- }
- if token.Error() != nil {
- return token.Error()
- }
- groupHandlerMap, ok := messageQueue.topicGroupHandlerMap[topic]
- if !ok {
- groupHandlerMap = make(map[string]common.MessageHandler)
- }
- h, ok := groupHandlerMap[group]
- if !ok {
- h = handler
- }
- groupHandlerMap[group] = h
- messageQueue.topicGroupHandlerMap[topic] = groupHandlerMap
- return nil
- }
- func (messageQueue *MessageQueue) UnSubscribe(group string, topic string) error {
- messageQueue.topicGroupMapHandlerMutex.Lock()
- defer messageQueue.topicGroupMapHandlerMutex.Unlock()
- groupHandlerMap, ok := messageQueue.topicGroupHandlerMap[topic]
- if !ok {
- return nil
- }
- _, ok = groupHandlerMap[group]
- if !ok {
- return nil
- }
- delete(groupHandlerMap, group)
- if len(groupHandlerMap) == 0 {
- delete(messageQueue.topicGroupHandlerMap, topic)
- } else {
- messageQueue.topicGroupHandlerMap[topic] = groupHandlerMap
- }
- token := messageQueue.mqttClient.Unsubscribe(topic)
- if !token.WaitTimeout(20 * time.Second) {
- return errors.New("取消订阅超时")
- }
- if token.Error() != nil {
- return token.Error()
- }
- return nil
- }
- func (messageQueue *MessageQueue) Publish(topic string, event *common.CloudEvent) error {
- eventBytes, err := event.Marshal()
- if err != nil {
- return err
- }
- token := messageQueue.mqttClient.Publish(topic, 1, false, eventBytes)
- if !token.WaitTimeout(20 * time.Second) {
- return errors.New("发布订阅超时")
- }
- if token.Error() != nil {
- return token.Error()
- }
- return nil
- }
|