mqtt_client.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. package mqtt_client
  2. import (
  3. "errors"
  4. "fmt"
  5. "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router"
  6. "git.sxidc.com/go-tools/utils/strutils"
  7. mqtt "github.com/eclipse/paho.mqtt.golang"
  8. "sync"
  9. "time"
  10. )
  11. type MqttClientOptions struct {
  12. UserName string
  13. Password string
  14. Address string
  15. ClientID string
  16. KeepAliveSec int64
  17. PingTimeoutSec int64
  18. WriteTimeoutSec int64
  19. SubscribeRoutineCount int
  20. SubscribeBufferSize int
  21. }
  22. func (opts *MqttClientOptions) check() error {
  23. if strutils.IsStringEmpty(opts.UserName) {
  24. return errors.New("必须传递用户名")
  25. }
  26. if strutils.IsStringEmpty(opts.Password) {
  27. return errors.New("必须传递密码")
  28. }
  29. if strutils.IsStringEmpty(opts.Address) {
  30. return errors.New("必须传递地址")
  31. }
  32. if strutils.IsStringEmpty(opts.ClientID) {
  33. return errors.New("必须传递客户端ID")
  34. }
  35. return nil
  36. }
  37. type subscribePayload struct {
  38. item *router.Item
  39. payload []byte
  40. }
  41. type MqttClient struct {
  42. client mqtt.Client
  43. opts *MqttClientOptions
  44. routersMutex *sync.Mutex
  45. routers []*router.Router
  46. subscribeChan chan *subscribePayload
  47. subscribeDoneChan chan any
  48. }
  49. func NewMqttClient(opts *MqttClientOptions) (*MqttClient, error) {
  50. if opts == nil {
  51. return nil, errors.New("没有传递mqtt客户端选项")
  52. }
  53. err := opts.check()
  54. if err != nil {
  55. return nil, err
  56. }
  57. if opts.SubscribeRoutineCount == 0 {
  58. opts.SubscribeRoutineCount = 10
  59. }
  60. if opts.SubscribeBufferSize == 0 {
  61. opts.SubscribeRoutineCount = 1024
  62. }
  63. if opts.WriteTimeoutSec == 0 {
  64. opts.WriteTimeoutSec = 60
  65. }
  66. mqttClient := &MqttClient{
  67. opts: opts,
  68. routersMutex: &sync.Mutex{},
  69. routers: make([]*router.Router, 0),
  70. subscribeChan: make(chan *subscribePayload, opts.SubscribeRoutineCount),
  71. subscribeDoneChan: make(chan any),
  72. }
  73. mqttOptions := mqtt.NewClientOptions().
  74. SetAutoReconnect(true).
  75. SetUsername(opts.UserName).
  76. SetPassword(opts.Password).
  77. AddBroker(opts.Address).
  78. SetClientID(opts.ClientID).
  79. SetKeepAlive(time.Duration(opts.KeepAliveSec)*time.Second).
  80. SetPingTimeout(time.Duration(opts.PingTimeoutSec)*time.Second).
  81. SetWriteTimeout(time.Duration(opts.WriteTimeoutSec)*time.Second).
  82. SetWill(opts.ClientID+"/will", "dead", 2, false).
  83. SetOnConnectHandler(func(client mqtt.Client) {
  84. err := mqttClient.onConnect()
  85. if err != nil {
  86. fmt.Println(err)
  87. return
  88. }
  89. }).
  90. SetConnectionLostHandler(func(client mqtt.Client, _ error) {
  91. err := mqttClient.onConnectLost()
  92. if err != nil {
  93. fmt.Println(err)
  94. return
  95. }
  96. })
  97. mqttClient.client = mqtt.NewClient(mqttOptions)
  98. for i := 0; i < opts.SubscribeRoutineCount; i++ {
  99. go mqttClient.subscribeRoutine()
  100. }
  101. return mqttClient, nil
  102. }
  103. func DestroyMqttClient(c *MqttClient) {
  104. if c != nil {
  105. close(c.subscribeDoneChan)
  106. c.subscribeDoneChan = nil
  107. close(c.subscribeChan)
  108. c.subscribeChan = nil
  109. c.client = nil
  110. c.routersMutex.Lock()
  111. for _, r := range c.routers {
  112. router.DestroyRouter(r)
  113. }
  114. c.routers = nil
  115. c.routersMutex.Unlock()
  116. }
  117. c = nil
  118. }
  119. func (c *MqttClient) Connect() error {
  120. token := c.client.Connect()
  121. if !token.WaitTimeout(time.Duration(c.opts.WriteTimeoutSec) * time.Second) {
  122. return errors.New("连接超时")
  123. }
  124. if token.Error() != nil {
  125. return token.Error()
  126. }
  127. return nil
  128. }
  129. func (c *MqttClient) Disconnect() {
  130. c.client.Disconnect(250)
  131. }
  132. func (c *MqttClient) GetRouter(group string, handlers []router.Handler) *router.Router {
  133. r := router.NewRouter(group, handlers, func(item *router.Item) error {
  134. for {
  135. if c.client.IsConnected() {
  136. break
  137. }
  138. time.Sleep(1 * time.Second)
  139. }
  140. err := item.DoIfUnSubscribe(func() error {
  141. return c.subscribe(item)
  142. })
  143. if err != nil {
  144. return err
  145. }
  146. return nil
  147. })
  148. c.addRouter(r)
  149. return r
  150. }
  151. func (c *MqttClient) Response(item *router.Item, responseIdentifier string, data []byte) error {
  152. replyTopic := item.Topic + "/reply"
  153. if strutils.IsStringNotEmpty(responseIdentifier) {
  154. replyTopic = fmt.Sprintf("%s/%s/reply", item.Topic, responseIdentifier)
  155. }
  156. token := c.client.Publish(replyTopic, 2, false, data)
  157. if !token.WaitTimeout(time.Duration(c.opts.WriteTimeoutSec) * time.Second) {
  158. return errors.New("发布超时")
  159. }
  160. if token.Error() != nil {
  161. return token.Error()
  162. }
  163. return nil
  164. }
  165. func (c *MqttClient) onConnect() error {
  166. err := c.rangeRouters(func(r *router.Router) error {
  167. err := r.RangeItem(func(item *router.Item) error {
  168. err := item.DoIfUnSubscribe(func() error {
  169. return c.subscribe(item)
  170. })
  171. if err != nil {
  172. return err
  173. }
  174. return nil
  175. })
  176. if err != nil {
  177. return errors.New("SetOnConnectHandler订阅失败: " + err.Error())
  178. }
  179. return nil
  180. })
  181. if err != nil {
  182. return err
  183. }
  184. return nil
  185. }
  186. func (c *MqttClient) onConnectLost() error {
  187. err := c.rangeRouters(func(r *router.Router) error {
  188. err := r.RangeItem(func(item *router.Item) error {
  189. item.SetUnSubscribe()
  190. return nil
  191. })
  192. if err != nil {
  193. return err
  194. }
  195. return nil
  196. })
  197. if err != nil {
  198. return errors.New("SetOnConnectHandler订阅失败: " + err.Error())
  199. }
  200. return nil
  201. }
  202. func (c *MqttClient) addRouter(router *router.Router) {
  203. c.routersMutex.Lock()
  204. defer c.routersMutex.Unlock()
  205. c.routers = append(c.routers, router)
  206. }
  207. func (c *MqttClient) rangeRouters(rangeFunc func(router *router.Router) error) error {
  208. c.routersMutex.Lock()
  209. defer c.routersMutex.Unlock()
  210. for _, r := range c.routers {
  211. err := rangeFunc(r)
  212. if err != nil {
  213. return err
  214. }
  215. }
  216. return nil
  217. }
  218. func (c *MqttClient) subscribe(item *router.Item) error {
  219. token := c.client.Subscribe(item.Topic, 2, func(client mqtt.Client, message mqtt.Message) {
  220. c.subscribeChan <- &subscribePayload{
  221. item: item,
  222. payload: message.Payload(),
  223. }
  224. })
  225. if !token.WaitTimeout(time.Duration(c.opts.WriteTimeoutSec) * time.Second) {
  226. return errors.New("订阅超时")
  227. }
  228. if token.Error() != nil {
  229. return token.Error()
  230. }
  231. fmt.Println("[MQTT] Subscribe Topic: " + item.Topic)
  232. return nil
  233. }
  234. func (c *MqttClient) subscribeRoutine() {
  235. for {
  236. select {
  237. case <-c.subscribeDoneChan:
  238. return
  239. case payload := <-c.subscribeChan:
  240. payload.item.CallHandlers(payload.payload)
  241. }
  242. }
  243. }