Prechádzať zdrojové kódy

完成mqtt消息队列开发

yjp 3 mesiacov pred
rodič
commit
d4afec3c67

+ 149 - 0
framework/core/infrastructure/message_queue/mqtt/mqtt.go

@@ -0,0 +1,149 @@
+package mqtt
+
+import (
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger"
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common"
+	"git.sxidc.com/go-tools/utils/strutils"
+	mqtt "github.com/eclipse/paho.mqtt.golang"
+	"github.com/pkg/errors"
+	"sync"
+	"time"
+)
+
+type MessageQueue struct {
+	mqttClient mqtt.Client
+
+	topicGroupMapHandlerMutex *sync.RWMutex
+	topicGroupHandlerMap      map[string]map[string]common.MessageHandler
+}
+
+func New(address string, userName string, password string) (*MessageQueue, error) {
+	clientID := "baize-" + strutils.SimpleUUID()[0:8]
+
+	mqttClient := mqtt.NewClient(mqtt.NewClientOptions().
+		SetAutoReconnect(true).
+		SetUsername(password).
+		SetPassword(userName).
+		AddBroker(address).
+		SetClientID(clientID).
+		SetOrderMatters(false).
+		SetWill(clientID+"/will", "dead", 2, true))
+
+	token := mqttClient.Connect()
+
+	if !token.WaitTimeout(20 * time.Second) {
+		return nil, errors.New("连接超时")
+	}
+
+	if token.Error() != nil {
+		return nil, token.Error()
+	}
+
+	for !mqttClient.IsConnected() {
+		logger.GetInstance().Info("等待连接MQTT代理...")
+		time.Sleep(1 * time.Second)
+	}
+
+	return &MessageQueue{
+		mqttClient:                mqttClient,
+		topicGroupMapHandlerMutex: &sync.RWMutex{},
+		topicGroupHandlerMap:      make(map[string]map[string]common.MessageHandler),
+	}, nil
+}
+
+func Destroy(messageQueue *MessageQueue) {
+	if messageQueue == nil {
+		return
+	}
+
+	messageQueue.mqttClient.Disconnect(250)
+	messageQueue.mqttClient = nil
+
+	messageQueue = nil
+}
+
+func (messageQueue *MessageQueue) Subscribe(group string, topic string, handler common.MessageHandler) error {
+	messageQueue.topicGroupMapHandlerMutex.Lock()
+	defer messageQueue.topicGroupMapHandlerMutex.Unlock()
+
+	token := messageQueue.mqttClient.Subscribe(topic, 1, func(client mqtt.Client, message mqtt.Message) {
+		messageQueue.topicGroupMapHandlerMutex.RLock()
+		defer messageQueue.topicGroupMapHandlerMutex.RUnlock()
+
+		for _, groupHandler := range messageQueue.topicGroupHandlerMap[topic] {
+			go groupHandler(messageQueue, message.Topic(), string(message.Payload()))
+		}
+	})
+
+	if !token.WaitTimeout(20 * time.Second) {
+		return errors.New("订阅超时")
+	}
+
+	if token.Error() != nil {
+		return token.Error()
+	}
+
+	groupHandlerMap, ok := messageQueue.topicGroupHandlerMap[topic]
+	if !ok {
+		groupHandlerMap = make(map[string]common.MessageHandler)
+	}
+
+	h, ok := groupHandlerMap[group]
+	if !ok {
+		h = handler
+	}
+
+	groupHandlerMap[group] = h
+	messageQueue.topicGroupHandlerMap[topic] = groupHandlerMap
+
+	return nil
+}
+
+func (messageQueue *MessageQueue) UnSubscribe(group string, topic string) error {
+	messageQueue.topicGroupMapHandlerMutex.Lock()
+	defer messageQueue.topicGroupMapHandlerMutex.Unlock()
+
+	groupHandlerMap, ok := messageQueue.topicGroupHandlerMap[topic]
+	if !ok {
+		return nil
+	}
+
+	_, ok = groupHandlerMap[group]
+	if !ok {
+		return nil
+	}
+
+	delete(groupHandlerMap, group)
+
+	if len(groupHandlerMap) == 0 {
+		delete(messageQueue.topicGroupHandlerMap, topic)
+	} else {
+		messageQueue.topicGroupHandlerMap[topic] = groupHandlerMap
+	}
+
+	token := messageQueue.mqttClient.Unsubscribe(topic)
+
+	if !token.WaitTimeout(20 * time.Second) {
+		return errors.New("取消订阅超时")
+	}
+
+	if token.Error() != nil {
+		return token.Error()
+	}
+
+	return nil
+}
+
+func (messageQueue *MessageQueue) Publish(topic string, data string) error {
+	token := messageQueue.mqttClient.Publish(topic, 1, false, data)
+
+	if !token.WaitTimeout(20 * time.Second) {
+		return errors.New("发布订阅超时")
+	}
+
+	if token.Error() != nil {
+		return token.Error()
+	}
+
+	return nil
+}

+ 1 - 1
framework/core/infrastructure/message_queue/redis/redis.go

@@ -123,7 +123,7 @@ func (messageQueue *MessageQueue) Subscribe(group string, topic string, handler
 			return nil
 		}
 
-		handler(messageQueue, topic, data)
+		handler(messageQueue, message.Stream, data)
 		return nil
 	})