mqtt.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package mqtt
  2. import (
  3. "git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger"
  4. "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common"
  5. "git.sxidc.com/go-tools/utils/strutils"
  6. mqtt "github.com/eclipse/paho.mqtt.golang"
  7. "github.com/pkg/errors"
  8. "sync"
  9. "time"
  10. )
  11. type MessageQueue struct {
  12. mqttClient mqtt.Client
  13. topicGroupMapHandlerMutex *sync.RWMutex
  14. topicGroupHandlerMap map[string]map[string]common.MessageHandler
  15. }
  16. func New(address string, userName string, password string) (*MessageQueue, error) {
  17. clientID := "baize-" + strutils.SimpleUUID()[0:8]
  18. mqttClient := mqtt.NewClient(mqtt.NewClientOptions().
  19. SetAutoReconnect(true).
  20. SetUsername(password).
  21. SetPassword(userName).
  22. AddBroker(address).
  23. SetClientID(clientID).
  24. SetOrderMatters(false).
  25. SetWill(clientID+"/will", "dead", 2, true))
  26. token := mqttClient.Connect()
  27. if !token.WaitTimeout(20 * time.Second) {
  28. return nil, errors.New("连接超时")
  29. }
  30. if token.Error() != nil {
  31. return nil, token.Error()
  32. }
  33. for !mqttClient.IsConnected() {
  34. logger.GetInstance().Info("等待连接MQTT代理...")
  35. time.Sleep(1 * time.Second)
  36. }
  37. return &MessageQueue{
  38. mqttClient: mqttClient,
  39. topicGroupMapHandlerMutex: &sync.RWMutex{},
  40. topicGroupHandlerMap: make(map[string]map[string]common.MessageHandler),
  41. }, nil
  42. }
  43. func Destroy(messageQueue *MessageQueue) {
  44. if messageQueue == nil {
  45. return
  46. }
  47. messageQueue.mqttClient.Disconnect(250)
  48. messageQueue.mqttClient = nil
  49. messageQueue = nil
  50. }
  51. func (messageQueue *MessageQueue) Subscribe(group string, topic string, handler common.MessageHandler) error {
  52. messageQueue.topicGroupMapHandlerMutex.Lock()
  53. defer messageQueue.topicGroupMapHandlerMutex.Unlock()
  54. token := messageQueue.mqttClient.Subscribe(topic, 1, func(client mqtt.Client, message mqtt.Message) {
  55. messageQueue.topicGroupMapHandlerMutex.RLock()
  56. defer messageQueue.topicGroupMapHandlerMutex.RUnlock()
  57. event, err := common.UnmarshalCloudEvent([]byte(message.Payload()))
  58. if err != nil {
  59. logger.GetInstance().Error(err)
  60. return
  61. }
  62. for _, groupHandler := range messageQueue.topicGroupHandlerMap[topic] {
  63. go groupHandler(messageQueue, message.Topic(), event)
  64. }
  65. })
  66. if !token.WaitTimeout(20 * time.Second) {
  67. return errors.New("订阅超时")
  68. }
  69. if token.Error() != nil {
  70. return token.Error()
  71. }
  72. groupHandlerMap, ok := messageQueue.topicGroupHandlerMap[topic]
  73. if !ok {
  74. groupHandlerMap = make(map[string]common.MessageHandler)
  75. }
  76. h, ok := groupHandlerMap[group]
  77. if !ok {
  78. h = handler
  79. }
  80. groupHandlerMap[group] = h
  81. messageQueue.topicGroupHandlerMap[topic] = groupHandlerMap
  82. return nil
  83. }
  84. func (messageQueue *MessageQueue) UnSubscribe(group string, topic string) error {
  85. messageQueue.topicGroupMapHandlerMutex.Lock()
  86. defer messageQueue.topicGroupMapHandlerMutex.Unlock()
  87. groupHandlerMap, ok := messageQueue.topicGroupHandlerMap[topic]
  88. if !ok {
  89. return nil
  90. }
  91. _, ok = groupHandlerMap[group]
  92. if !ok {
  93. return nil
  94. }
  95. delete(groupHandlerMap, group)
  96. if len(groupHandlerMap) == 0 {
  97. delete(messageQueue.topicGroupHandlerMap, topic)
  98. } else {
  99. messageQueue.topicGroupHandlerMap[topic] = groupHandlerMap
  100. }
  101. token := messageQueue.mqttClient.Unsubscribe(topic)
  102. if !token.WaitTimeout(20 * time.Second) {
  103. return errors.New("取消订阅超时")
  104. }
  105. if token.Error() != nil {
  106. return token.Error()
  107. }
  108. return nil
  109. }
  110. func (messageQueue *MessageQueue) Publish(topic string, event *common.CloudEvent) error {
  111. eventBytes, err := event.Marshal()
  112. if err != nil {
  113. return err
  114. }
  115. token := messageQueue.mqttClient.Publish(topic, 1, false, eventBytes)
  116. if !token.WaitTimeout(20 * time.Second) {
  117. return errors.New("发布订阅超时")
  118. }
  119. if token.Error() != nil {
  120. return token.Error()
  121. }
  122. return nil
  123. }