mqtt.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. package mqtt
  2. import (
  3. "git.sxidc.com/go-framework/baize/framework/core/data_protocol"
  4. "git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger"
  5. "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common"
  6. "git.sxidc.com/go-tools/utils/strutils"
  7. mqtt "github.com/eclipse/paho.mqtt.golang"
  8. "github.com/pkg/errors"
  9. "sync"
  10. "time"
  11. )
  12. type MessageQueue struct {
  13. mqttClient mqtt.Client
  14. topicGroupMapHandlerMutex *sync.RWMutex
  15. topicGroupHandlerMap map[string]map[string]common.MessageHandler
  16. }
  17. func New(address string, userName string, password string) (*MessageQueue, error) {
  18. clientID := "baize-" + strutils.SimpleUUID()[0:8]
  19. mqttClient := mqtt.NewClient(mqtt.NewClientOptions().
  20. SetAutoReconnect(true).
  21. SetUsername(password).
  22. SetPassword(userName).
  23. AddBroker(address).
  24. SetClientID(clientID).
  25. SetOrderMatters(false).
  26. SetWill(clientID+"/will", "dead", 2, true))
  27. token := mqttClient.Connect()
  28. if !token.WaitTimeout(20 * time.Second) {
  29. return nil, errors.New("连接超时")
  30. }
  31. if token.Error() != nil {
  32. return nil, token.Error()
  33. }
  34. for !mqttClient.IsConnected() {
  35. logger.GetInstance().Info("等待连接MQTT代理...")
  36. time.Sleep(1 * time.Second)
  37. }
  38. return &MessageQueue{
  39. mqttClient: mqttClient,
  40. topicGroupMapHandlerMutex: &sync.RWMutex{},
  41. topicGroupHandlerMap: make(map[string]map[string]common.MessageHandler),
  42. }, nil
  43. }
  44. func Destroy(messageQueue *MessageQueue) {
  45. if messageQueue == nil {
  46. return
  47. }
  48. messageQueue.mqttClient.Disconnect(250)
  49. messageQueue.mqttClient = nil
  50. messageQueue = nil
  51. }
  52. func (messageQueue *MessageQueue) Subscribe(group string, topic string, handler common.MessageHandler) error {
  53. messageQueue.topicGroupMapHandlerMutex.Lock()
  54. defer messageQueue.topicGroupMapHandlerMutex.Unlock()
  55. token := messageQueue.mqttClient.Subscribe(topic, 1, func(client mqtt.Client, message mqtt.Message) {
  56. messageQueue.topicGroupMapHandlerMutex.RLock()
  57. defer messageQueue.topicGroupMapHandlerMutex.RUnlock()
  58. event, err := data_protocol.UnmarshalJsonCloudEvent(message.Payload())
  59. if err != nil {
  60. logger.GetInstance().Error(err)
  61. return
  62. }
  63. for _, groupHandler := range messageQueue.topicGroupHandlerMap[topic] {
  64. go groupHandler(messageQueue, message.Topic(), event)
  65. }
  66. })
  67. if !token.WaitTimeout(20 * time.Second) {
  68. return errors.New("订阅超时")
  69. }
  70. if token.Error() != nil {
  71. return token.Error()
  72. }
  73. groupHandlerMap, ok := messageQueue.topicGroupHandlerMap[topic]
  74. if !ok {
  75. groupHandlerMap = make(map[string]common.MessageHandler)
  76. }
  77. h, ok := groupHandlerMap[group]
  78. if !ok {
  79. h = handler
  80. }
  81. groupHandlerMap[group] = h
  82. messageQueue.topicGroupHandlerMap[topic] = groupHandlerMap
  83. return nil
  84. }
  85. func (messageQueue *MessageQueue) UnSubscribe(group string, topic string) error {
  86. messageQueue.topicGroupMapHandlerMutex.Lock()
  87. defer messageQueue.topicGroupMapHandlerMutex.Unlock()
  88. groupHandlerMap, ok := messageQueue.topicGroupHandlerMap[topic]
  89. if !ok {
  90. return nil
  91. }
  92. _, ok = groupHandlerMap[group]
  93. if !ok {
  94. return nil
  95. }
  96. delete(groupHandlerMap, group)
  97. if len(groupHandlerMap) == 0 {
  98. delete(messageQueue.topicGroupHandlerMap, topic)
  99. } else {
  100. messageQueue.topicGroupHandlerMap[topic] = groupHandlerMap
  101. }
  102. token := messageQueue.mqttClient.Unsubscribe(topic)
  103. if !token.WaitTimeout(20 * time.Second) {
  104. return errors.New("取消订阅超时")
  105. }
  106. if token.Error() != nil {
  107. return token.Error()
  108. }
  109. return nil
  110. }
  111. func (messageQueue *MessageQueue) Publish(topic string, event *data_protocol.CloudEvent) error {
  112. eventBytes, err := event.MarshalJson()
  113. if err != nil {
  114. return err
  115. }
  116. token := messageQueue.mqttClient.Publish(topic, 1, false, eventBytes)
  117. if !token.WaitTimeout(20 * time.Second) {
  118. return errors.New("发布订阅超时")
  119. }
  120. if token.Error() != nil {
  121. return token.Error()
  122. }
  123. return nil
  124. }