mqtt.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package mqtt
  2. import (
  3. "git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger"
  4. "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common"
  5. "git.sxidc.com/go-tools/utils/strutils"
  6. mqtt "github.com/eclipse/paho.mqtt.golang"
  7. "github.com/pkg/errors"
  8. "sync"
  9. "time"
  10. )
  11. type MessageQueue struct {
  12. mqttClient mqtt.Client
  13. topicGroupMapHandlerMutex *sync.RWMutex
  14. topicGroupHandlerMap map[string]map[string]common.MessageHandler
  15. }
  16. func New(address string, userName string, password string) (*MessageQueue, error) {
  17. clientID := "baize-" + strutils.SimpleUUID()[0:8]
  18. mqttClient := mqtt.NewClient(mqtt.NewClientOptions().
  19. SetAutoReconnect(true).
  20. SetUsername(password).
  21. SetPassword(userName).
  22. AddBroker(address).
  23. SetClientID(clientID).
  24. SetOrderMatters(false).
  25. SetWill(clientID+"/will", "dead", 2, true))
  26. token := mqttClient.Connect()
  27. if !token.WaitTimeout(20 * time.Second) {
  28. return nil, errors.New("连接超时")
  29. }
  30. if token.Error() != nil {
  31. return nil, token.Error()
  32. }
  33. for !mqttClient.IsConnected() {
  34. logger.GetInstance().Info("等待连接MQTT代理...")
  35. time.Sleep(1 * time.Second)
  36. }
  37. return &MessageQueue{
  38. mqttClient: mqttClient,
  39. topicGroupMapHandlerMutex: &sync.RWMutex{},
  40. topicGroupHandlerMap: make(map[string]map[string]common.MessageHandler),
  41. }, nil
  42. }
  43. func Destroy(messageQueue *MessageQueue) {
  44. if messageQueue == nil {
  45. return
  46. }
  47. messageQueue.mqttClient.Disconnect(250)
  48. messageQueue.mqttClient = nil
  49. messageQueue = nil
  50. }
  51. func (messageQueue *MessageQueue) Subscribe(group string, topic string, handler common.MessageHandler) error {
  52. messageQueue.topicGroupMapHandlerMutex.Lock()
  53. defer messageQueue.topicGroupMapHandlerMutex.Unlock()
  54. token := messageQueue.mqttClient.Subscribe(topic, 1, func(client mqtt.Client, message mqtt.Message) {
  55. messageQueue.topicGroupMapHandlerMutex.RLock()
  56. defer messageQueue.topicGroupMapHandlerMutex.RUnlock()
  57. for _, groupHandler := range messageQueue.topicGroupHandlerMap[topic] {
  58. go groupHandler(messageQueue, message.Topic(), string(message.Payload()))
  59. }
  60. })
  61. if !token.WaitTimeout(20 * time.Second) {
  62. return errors.New("订阅超时")
  63. }
  64. if token.Error() != nil {
  65. return token.Error()
  66. }
  67. groupHandlerMap, ok := messageQueue.topicGroupHandlerMap[topic]
  68. if !ok {
  69. groupHandlerMap = make(map[string]common.MessageHandler)
  70. }
  71. h, ok := groupHandlerMap[group]
  72. if !ok {
  73. h = handler
  74. }
  75. groupHandlerMap[group] = h
  76. messageQueue.topicGroupHandlerMap[topic] = groupHandlerMap
  77. return nil
  78. }
  79. func (messageQueue *MessageQueue) UnSubscribe(group string, topic string) error {
  80. messageQueue.topicGroupMapHandlerMutex.Lock()
  81. defer messageQueue.topicGroupMapHandlerMutex.Unlock()
  82. groupHandlerMap, ok := messageQueue.topicGroupHandlerMap[topic]
  83. if !ok {
  84. return nil
  85. }
  86. _, ok = groupHandlerMap[group]
  87. if !ok {
  88. return nil
  89. }
  90. delete(groupHandlerMap, group)
  91. if len(groupHandlerMap) == 0 {
  92. delete(messageQueue.topicGroupHandlerMap, topic)
  93. } else {
  94. messageQueue.topicGroupHandlerMap[topic] = groupHandlerMap
  95. }
  96. token := messageQueue.mqttClient.Unsubscribe(topic)
  97. if !token.WaitTimeout(20 * time.Second) {
  98. return errors.New("取消订阅超时")
  99. }
  100. if token.Error() != nil {
  101. return token.Error()
  102. }
  103. return nil
  104. }
  105. func (messageQueue *MessageQueue) Publish(topic string, data string) error {
  106. token := messageQueue.mqttClient.Publish(topic, 1, false, data)
  107. if !token.WaitTimeout(20 * time.Second) {
  108. return errors.New("发布订阅超时")
  109. }
  110. if token.Error() != nil {
  111. return token.Error()
  112. }
  113. return nil
  114. }