mqtt_client.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. package mqtt_client
  2. import (
  3. "errors"
  4. "fmt"
  5. "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router"
  6. "git.sxidc.com/go-tools/api_binding/utils"
  7. mqtt "github.com/eclipse/paho.mqtt.golang"
  8. "sync"
  9. "time"
  10. )
  11. type MqttClientOptions struct {
  12. UserName string
  13. Password string
  14. Address string
  15. ClientID string
  16. KeepAliveSec int64
  17. PingTimeoutSec int64
  18. WriteTimeoutSec int64
  19. SubscribeRoutineCount int
  20. }
  21. func (opt *MqttClientOptions) check() error {
  22. if utils.IsStringEmpty(opt.UserName) {
  23. return errors.New("必须传递用户名")
  24. }
  25. if utils.IsStringEmpty(opt.Password) {
  26. return errors.New("必须传递密码")
  27. }
  28. if utils.IsStringEmpty(opt.Address) {
  29. return errors.New("必须传递地址")
  30. }
  31. if utils.IsStringEmpty(opt.ClientID) {
  32. return errors.New("必须传递客户端ID")
  33. }
  34. if opt.SubscribeRoutineCount == 0 {
  35. opt.SubscribeRoutineCount = 10
  36. }
  37. return nil
  38. }
  39. type subscribePayload struct {
  40. item *router.Item
  41. payload []byte
  42. }
  43. type MqttClient struct {
  44. client mqtt.Client
  45. routersMutex *sync.Mutex
  46. routers []*router.Router
  47. subscribeChan chan *subscribePayload
  48. subscribeDoneChan chan any
  49. }
  50. func NewMqttClient(opts *MqttClientOptions) (*MqttClient, error) {
  51. if opts == nil {
  52. return nil, errors.New("没有传递mqtt客户端选项")
  53. }
  54. err := opts.check()
  55. if err != nil {
  56. return nil, err
  57. }
  58. mqttClient := &MqttClient{
  59. routersMutex: &sync.Mutex{},
  60. routers: make([]*router.Router, 0),
  61. subscribeChan: make(chan *subscribePayload, 1024),
  62. subscribeDoneChan: make(chan any),
  63. }
  64. mqttOptions := mqtt.NewClientOptions().
  65. SetAutoReconnect(true).
  66. SetUsername(opts.UserName).
  67. SetPassword(opts.Password).
  68. AddBroker(opts.Address).
  69. SetClientID(opts.ClientID).
  70. SetKeepAlive(time.Duration(opts.KeepAliveSec)*time.Second).
  71. SetPingTimeout(time.Duration(opts.PingTimeoutSec)*time.Second).
  72. SetWriteTimeout(time.Duration(opts.WriteTimeoutSec)*time.Second).
  73. SetWill(opts.ClientID+"/will", "dead", 2, false).
  74. SetOnConnectHandler(func(client mqtt.Client) {
  75. err := mqttClient.onConnect()
  76. if err != nil {
  77. fmt.Println(err)
  78. return
  79. }
  80. }).
  81. SetConnectionLostHandler(func(client mqtt.Client, _ error) {
  82. err := mqttClient.onConnectLost()
  83. if err != nil {
  84. fmt.Println(err)
  85. return
  86. }
  87. })
  88. mqttClient.client = mqtt.NewClient(mqttOptions)
  89. for i := 0; i < opts.SubscribeRoutineCount; i++ {
  90. go mqttClient.subscribeRoutine()
  91. }
  92. return mqttClient, nil
  93. }
  94. func DestroyMqttClient(c *MqttClient) {
  95. if c != nil {
  96. close(c.subscribeDoneChan)
  97. c.subscribeDoneChan = nil
  98. close(c.subscribeChan)
  99. c.subscribeChan = nil
  100. c.client = nil
  101. c.routersMutex.Lock()
  102. for _, r := range c.routers {
  103. router.DestroyRouter(r)
  104. }
  105. c.routers = nil
  106. c.routersMutex.Unlock()
  107. }
  108. c = nil
  109. }
  110. func (c *MqttClient) Connect() error {
  111. token := c.client.Connect()
  112. if token.Wait(); token.Error() != nil {
  113. return token.Error()
  114. }
  115. return nil
  116. }
  117. func (c *MqttClient) Disconnect() {
  118. c.client.Disconnect(250)
  119. }
  120. func (c *MqttClient) GetRouter(group string, handlers []router.Handler) *router.Router {
  121. r := router.NewRouter(group, handlers, func(item *router.Item) error {
  122. for {
  123. if c.client.IsConnected() {
  124. break
  125. }
  126. time.Sleep(1 * time.Second)
  127. }
  128. err := item.DoIfUnSubscribe(func() error {
  129. return c.subscribe(item)
  130. })
  131. if err != nil {
  132. return err
  133. }
  134. return nil
  135. })
  136. c.addRouter(r)
  137. return r
  138. }
  139. func (c *MqttClient) Response(item *router.Item, data []byte) error {
  140. token := c.client.Publish(item.Topic+"/reply", 2, false, data)
  141. if token.Wait(); token.Error() != nil {
  142. return token.Error()
  143. }
  144. return nil
  145. }
  146. func (c *MqttClient) onConnect() error {
  147. err := c.rangeRouters(func(r *router.Router) error {
  148. err := r.RangeItem(func(item *router.Item) error {
  149. err := item.DoIfUnSubscribe(func() error {
  150. return c.subscribe(item)
  151. })
  152. if err != nil {
  153. return err
  154. }
  155. return nil
  156. })
  157. if err != nil {
  158. return errors.New("SetOnConnectHandler订阅失败: " + err.Error())
  159. }
  160. return nil
  161. })
  162. if err != nil {
  163. return err
  164. }
  165. return nil
  166. }
  167. func (c *MqttClient) onConnectLost() error {
  168. err := c.rangeRouters(func(r *router.Router) error {
  169. err := r.RangeItem(func(item *router.Item) error {
  170. item.SetUnSubscribe()
  171. return nil
  172. })
  173. if err != nil {
  174. return err
  175. }
  176. return nil
  177. })
  178. if err != nil {
  179. return errors.New("SetOnConnectHandler订阅失败: " + err.Error())
  180. }
  181. return nil
  182. }
  183. func (c *MqttClient) addRouter(router *router.Router) {
  184. c.routersMutex.Lock()
  185. defer c.routersMutex.Unlock()
  186. c.routers = append(c.routers, router)
  187. }
  188. func (c *MqttClient) rangeRouters(rangeFunc func(router *router.Router) error) error {
  189. c.routersMutex.Lock()
  190. defer c.routersMutex.Unlock()
  191. for _, r := range c.routers {
  192. err := rangeFunc(r)
  193. if err != nil {
  194. return err
  195. }
  196. }
  197. return nil
  198. }
  199. func (c *MqttClient) subscribe(item *router.Item) error {
  200. token := c.client.Subscribe(item.Topic, 2, func(client mqtt.Client, message mqtt.Message) {
  201. c.subscribeChan <- &subscribePayload{
  202. item: item,
  203. payload: message.Payload(),
  204. }
  205. })
  206. if token.Wait(); token.Error() != nil {
  207. return token.Error()
  208. }
  209. fmt.Println("[MQTT] Subscribe Topic: " + item.Topic)
  210. return nil
  211. }
  212. func (c *MqttClient) subscribeRoutine() {
  213. for {
  214. select {
  215. case <-c.subscribeDoneChan:
  216. case payload := <-c.subscribeChan:
  217. payload.item.CallHandlers(payload.payload)
  218. }
  219. }
  220. }