mqtt_client.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. package mqtt_client
  2. import (
  3. "errors"
  4. "fmt"
  5. "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router"
  6. "git.sxidc.com/go-tools/api_binding/utils"
  7. mqtt "github.com/eclipse/paho.mqtt.golang"
  8. "sync"
  9. "time"
  10. )
  11. type MqttClientOptions struct {
  12. UserName string
  13. Password string
  14. Address string
  15. ClientID string
  16. KeepAliveSec int64
  17. PingTimeoutSec int64
  18. WriteTimeoutSec int64
  19. SubscribeRoutineCount int
  20. SubscribeBufferSize int
  21. }
  22. func (opts *MqttClientOptions) check() error {
  23. if utils.IsStringEmpty(opts.UserName) {
  24. return errors.New("必须传递用户名")
  25. }
  26. if utils.IsStringEmpty(opts.Password) {
  27. return errors.New("必须传递密码")
  28. }
  29. if utils.IsStringEmpty(opts.Address) {
  30. return errors.New("必须传递地址")
  31. }
  32. if utils.IsStringEmpty(opts.ClientID) {
  33. return errors.New("必须传递客户端ID")
  34. }
  35. return nil
  36. }
  37. type subscribePayload struct {
  38. item *router.Item
  39. payload []byte
  40. }
  41. type MqttClient struct {
  42. client mqtt.Client
  43. routersMutex *sync.Mutex
  44. routers []*router.Router
  45. subscribeChan chan *subscribePayload
  46. subscribeDoneChan chan any
  47. }
  48. func NewMqttClient(opts *MqttClientOptions) (*MqttClient, error) {
  49. if opts == nil {
  50. return nil, errors.New("没有传递mqtt客户端选项")
  51. }
  52. err := opts.check()
  53. if err != nil {
  54. return nil, err
  55. }
  56. if opts.SubscribeRoutineCount == 0 {
  57. opts.SubscribeRoutineCount = 10
  58. }
  59. if opts.SubscribeBufferSize == 0 {
  60. opts.SubscribeRoutineCount = 1024
  61. }
  62. mqttClient := &MqttClient{
  63. routersMutex: &sync.Mutex{},
  64. routers: make([]*router.Router, 0),
  65. subscribeChan: make(chan *subscribePayload, opts.SubscribeRoutineCount),
  66. subscribeDoneChan: make(chan any),
  67. }
  68. mqttOptions := mqtt.NewClientOptions().
  69. SetAutoReconnect(true).
  70. SetUsername(opts.UserName).
  71. SetPassword(opts.Password).
  72. AddBroker(opts.Address).
  73. SetClientID(opts.ClientID).
  74. SetKeepAlive(time.Duration(opts.KeepAliveSec)*time.Second).
  75. SetPingTimeout(time.Duration(opts.PingTimeoutSec)*time.Second).
  76. SetWriteTimeout(time.Duration(opts.WriteTimeoutSec)*time.Second).
  77. SetWill(opts.ClientID+"/will", "dead", 2, false).
  78. SetOnConnectHandler(func(client mqtt.Client) {
  79. err := mqttClient.onConnect()
  80. if err != nil {
  81. fmt.Println(err)
  82. return
  83. }
  84. }).
  85. SetConnectionLostHandler(func(client mqtt.Client, _ error) {
  86. err := mqttClient.onConnectLost()
  87. if err != nil {
  88. fmt.Println(err)
  89. return
  90. }
  91. })
  92. mqttClient.client = mqtt.NewClient(mqttOptions)
  93. for i := 0; i < opts.SubscribeRoutineCount; i++ {
  94. go mqttClient.subscribeRoutine()
  95. }
  96. return mqttClient, nil
  97. }
  98. func DestroyMqttClient(c *MqttClient) {
  99. if c != nil {
  100. close(c.subscribeDoneChan)
  101. c.subscribeDoneChan = nil
  102. close(c.subscribeChan)
  103. c.subscribeChan = nil
  104. c.client = nil
  105. c.routersMutex.Lock()
  106. for _, r := range c.routers {
  107. router.DestroyRouter(r)
  108. }
  109. c.routers = nil
  110. c.routersMutex.Unlock()
  111. }
  112. c = nil
  113. }
  114. func (c *MqttClient) Connect() error {
  115. token := c.client.Connect()
  116. if token.Wait(); token.Error() != nil {
  117. return token.Error()
  118. }
  119. return nil
  120. }
  121. func (c *MqttClient) Disconnect() {
  122. c.client.Disconnect(250)
  123. }
  124. func (c *MqttClient) GetRouter(group string, handlers []router.Handler) *router.Router {
  125. r := router.NewRouter(group, handlers, func(item *router.Item) error {
  126. for {
  127. if c.client.IsConnected() {
  128. break
  129. }
  130. time.Sleep(1 * time.Second)
  131. }
  132. err := item.DoIfUnSubscribe(func() error {
  133. return c.subscribe(item)
  134. })
  135. if err != nil {
  136. return err
  137. }
  138. return nil
  139. })
  140. c.addRouter(r)
  141. return r
  142. }
  143. func (c *MqttClient) Response(item *router.Item, data []byte) error {
  144. token := c.client.Publish(item.Topic+"/reply", 2, false, data)
  145. if token.Wait(); token.Error() != nil {
  146. return token.Error()
  147. }
  148. return nil
  149. }
  150. func (c *MqttClient) onConnect() error {
  151. err := c.rangeRouters(func(r *router.Router) error {
  152. err := r.RangeItem(func(item *router.Item) error {
  153. err := item.DoIfUnSubscribe(func() error {
  154. return c.subscribe(item)
  155. })
  156. if err != nil {
  157. return err
  158. }
  159. return nil
  160. })
  161. if err != nil {
  162. return errors.New("SetOnConnectHandler订阅失败: " + err.Error())
  163. }
  164. return nil
  165. })
  166. if err != nil {
  167. return err
  168. }
  169. return nil
  170. }
  171. func (c *MqttClient) onConnectLost() error {
  172. err := c.rangeRouters(func(r *router.Router) error {
  173. err := r.RangeItem(func(item *router.Item) error {
  174. item.SetUnSubscribe()
  175. return nil
  176. })
  177. if err != nil {
  178. return err
  179. }
  180. return nil
  181. })
  182. if err != nil {
  183. return errors.New("SetOnConnectHandler订阅失败: " + err.Error())
  184. }
  185. return nil
  186. }
  187. func (c *MqttClient) addRouter(router *router.Router) {
  188. c.routersMutex.Lock()
  189. defer c.routersMutex.Unlock()
  190. c.routers = append(c.routers, router)
  191. }
  192. func (c *MqttClient) rangeRouters(rangeFunc func(router *router.Router) error) error {
  193. c.routersMutex.Lock()
  194. defer c.routersMutex.Unlock()
  195. for _, r := range c.routers {
  196. err := rangeFunc(r)
  197. if err != nil {
  198. return err
  199. }
  200. }
  201. return nil
  202. }
  203. func (c *MqttClient) subscribe(item *router.Item) error {
  204. token := c.client.Subscribe(item.Topic, 2, func(client mqtt.Client, message mqtt.Message) {
  205. c.subscribeChan <- &subscribePayload{
  206. item: item,
  207. payload: message.Payload(),
  208. }
  209. })
  210. if token.Wait(); token.Error() != nil {
  211. return token.Error()
  212. }
  213. fmt.Println("[MQTT] Subscribe Topic: " + item.Topic)
  214. return nil
  215. }
  216. func (c *MqttClient) subscribeRoutine() {
  217. for {
  218. select {
  219. case <-c.subscribeDoneChan:
  220. return
  221. case payload := <-c.subscribeChan:
  222. payload.item.CallHandlers(payload.payload)
  223. }
  224. }
  225. }