router.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. package mqtt_api
  2. import (
  3. "git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger"
  4. "sync"
  5. )
  6. type Handler func(c *Context)
  7. type subscribeItem struct {
  8. handlers []Handler
  9. subscribed bool
  10. }
  11. type Router struct {
  12. Group string
  13. mqttClient *MqttClient
  14. contextsMutex *sync.Mutex
  15. contexts []*Context
  16. globalHandlers []Handler
  17. subscribeItemsMutex *sync.Mutex
  18. subscribeItems map[string]*subscribeItem
  19. }
  20. func NewRouter(group string, globalHandlers []Handler) *Router {
  21. return &Router{
  22. Group: group,
  23. mqttClient: nil,
  24. contextsMutex: &sync.Mutex{},
  25. contexts: make([]*Context, 0),
  26. globalHandlers: globalHandlers,
  27. subscribeItemsMutex: &sync.Mutex{},
  28. subscribeItems: make(map[string]*subscribeItem),
  29. }
  30. }
  31. func DestroyRouter(router *Router) {
  32. if router == nil {
  33. return
  34. }
  35. router.contextsMutex.Lock()
  36. for _, context := range router.contexts {
  37. destroyContext(context)
  38. }
  39. router.contexts = nil
  40. router.contextsMutex.Unlock()
  41. router = nil
  42. }
  43. func (router *Router) Start(mqttOptions *MqttClientOptions) error {
  44. mqttClient, err := newMqttClient(mqttOptions, func(client *MqttClient) {
  45. router.subscribeTopics(client)
  46. }, func(client *MqttClient) {
  47. router.unsubscribeTopics(client)
  48. })
  49. if err != nil {
  50. return err
  51. }
  52. err = mqttClient.connect()
  53. if err != nil {
  54. return err
  55. }
  56. router.mqttClient = mqttClient
  57. return nil
  58. }
  59. func (router *Router) Finish() {
  60. if router.mqttClient == nil {
  61. return
  62. }
  63. router.unsubscribeTopics(router.mqttClient)
  64. router.mqttClient.disconnect()
  65. destroyMqttClient(router.mqttClient)
  66. router.mqttClient = nil
  67. }
  68. func (router *Router) AddGlobalHandlers(handlers ...Handler) {
  69. router.globalHandlers = append(router.globalHandlers, handlers...)
  70. }
  71. func (router *Router) AddTopic(topic string, handlers ...Handler) error {
  72. allHandlers := append(router.globalHandlers, handlers...)
  73. err := router.addAndSubscribeTopicHandlers(router.Group+topic, allHandlers...)
  74. if err != nil {
  75. return err
  76. }
  77. return nil
  78. }
  79. func (router *Router) subscribeTopics(client *MqttClient) {
  80. if !client.mqttClient.IsConnected() {
  81. return
  82. }
  83. router.rangeTopicHandlers(func(topic string, handlers []Handler) {
  84. err := router.subscribeMqttClient(client, topic, handlers)
  85. if err != nil {
  86. logger.GetInstance().Error(err)
  87. return
  88. }
  89. })
  90. }
  91. func (router *Router) unsubscribeTopics(client *MqttClient) {
  92. if !client.mqttClient.IsConnected() {
  93. return
  94. }
  95. router.rangeTopicHandlers(func(topic string, handlers []Handler) {
  96. err := client.unsubscribe(topic)
  97. if err != nil {
  98. logger.GetInstance().Error(err)
  99. return
  100. }
  101. })
  102. }
  103. func (router *Router) subscribeMqttClient(client *MqttClient, topic string, handlers []Handler) error {
  104. err := client.subscribe(topic, func(topic string, data []byte) {
  105. c, err := newContext(router.mqttClient, topic, data, handlers)
  106. if err != nil {
  107. logger.GetInstance().Error(err)
  108. return
  109. }
  110. c.callHandlers()
  111. c.Next()
  112. })
  113. if err != nil {
  114. return err
  115. }
  116. return nil
  117. }
  118. func (router *Router) addAndSubscribeTopicHandlers(topic string, handlers ...Handler) error {
  119. router.subscribeItemsMutex.Lock()
  120. defer router.subscribeItemsMutex.Unlock()
  121. router.subscribeItems[topic] = &subscribeItem{
  122. handlers: handlers,
  123. subscribed: false,
  124. }
  125. if router.mqttClient == nil {
  126. return nil
  127. }
  128. if !router.mqttClient.mqttClient.IsConnected() {
  129. return nil
  130. }
  131. err := router.subscribeMqttClient(router.mqttClient, topic, handlers)
  132. if err != nil {
  133. return err
  134. }
  135. router.subscribeItems[topic].subscribed = true
  136. return nil
  137. }
  138. func (router *Router) removeTopicHandlers(topic string) {
  139. router.subscribeItemsMutex.Lock()
  140. defer router.subscribeItemsMutex.Unlock()
  141. if router.subscribeItems[topic] == nil {
  142. return
  143. }
  144. delete(router.subscribeItems, topic)
  145. }
  146. func (router *Router) rangeTopicHandlers(callback func(topic string, handlers []Handler)) {
  147. router.subscribeItemsMutex.Lock()
  148. defer router.subscribeItemsMutex.Unlock()
  149. for topic, item := range router.subscribeItems {
  150. if !item.subscribed {
  151. callback(topic, item.handlers)
  152. }
  153. }
  154. }