mqtt_client.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. package mqtt_client
  2. import (
  3. "errors"
  4. "fmt"
  5. "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router"
  6. "git.sxidc.com/go-tools/api_binding/utils"
  7. mqtt "github.com/eclipse/paho.mqtt.golang"
  8. "sync"
  9. "time"
  10. )
  11. type MqttClientOptions struct {
  12. UserName string
  13. Password string
  14. Address string
  15. ClientID string
  16. KeepAliveSec time.Duration
  17. PingTimeoutSec time.Duration
  18. }
  19. func (opt *MqttClientOptions) check() error {
  20. if utils.IsStringEmpty(opt.UserName) {
  21. return errors.New("必须传递用户名")
  22. }
  23. if utils.IsStringEmpty(opt.Password) {
  24. return errors.New("必须传递密码")
  25. }
  26. if utils.IsStringEmpty(opt.Address) {
  27. return errors.New("必须传递地址")
  28. }
  29. if utils.IsStringEmpty(opt.ClientID) {
  30. return errors.New("必须传递客户端ID")
  31. }
  32. return nil
  33. }
  34. type MqttClient struct {
  35. client mqtt.Client
  36. routersMutex *sync.Mutex
  37. routers []*router.Router
  38. }
  39. func NewMqttClient(opts *MqttClientOptions) (*MqttClient, error) {
  40. if opts == nil {
  41. return nil, errors.New("没有传递mqtt客户端选项")
  42. }
  43. err := opts.check()
  44. if err != nil {
  45. return nil, err
  46. }
  47. mqttClient := &MqttClient{
  48. routersMutex: &sync.Mutex{},
  49. routers: make([]*router.Router, 0),
  50. }
  51. mqttOptions := mqtt.NewClientOptions().
  52. SetAutoReconnect(true).
  53. SetUsername(opts.UserName).
  54. SetPassword(opts.Password).
  55. AddBroker(opts.Address).
  56. SetClientID(opts.ClientID).
  57. SetKeepAlive(opts.KeepAliveSec*time.Second).
  58. SetPingTimeout(opts.PingTimeoutSec*time.Second).
  59. SetWill(opts.ClientID+"/will", "dead", 2, false).
  60. SetOnConnectHandler(func(client mqtt.Client) {
  61. err := mqttClient.onConnect()
  62. if err != nil {
  63. fmt.Println(err)
  64. return
  65. }
  66. }).
  67. SetConnectionLostHandler(func(client mqtt.Client, _ error) {
  68. err := mqttClient.onConnectLost()
  69. if err != nil {
  70. fmt.Println(err)
  71. return
  72. }
  73. })
  74. mqttClient.client = mqtt.NewClient(mqttOptions)
  75. return mqttClient, nil
  76. }
  77. func DestroyMqttClient(c *MqttClient) {
  78. if c != nil {
  79. c.client = nil
  80. c.routersMutex.Lock()
  81. for _, r := range c.routers {
  82. router.DestroyRouter(r)
  83. }
  84. c.routers = nil
  85. c.routersMutex.Unlock()
  86. }
  87. c = nil
  88. }
  89. func (c *MqttClient) Connect() error {
  90. token := c.client.Connect()
  91. if token.Wait(); token.Error() != nil {
  92. return token.Error()
  93. }
  94. return nil
  95. }
  96. func (c *MqttClient) Disconnect() {
  97. c.client.Disconnect(10000)
  98. }
  99. func (c *MqttClient) GetRouter(group string, handlers []router.Handler) *router.Router {
  100. r := router.NewRouter(group, handlers, func(item *router.Item) error {
  101. for {
  102. if c.client.IsConnected() {
  103. break
  104. }
  105. time.Sleep(1 * time.Second)
  106. }
  107. err := item.DoIfUnSubscribe(func() error {
  108. token := c.client.Subscribe(item.Topic, 2, func(client mqtt.Client, message mqtt.Message) {
  109. item.CallHandlers(message.Payload())
  110. })
  111. if token.Wait(); token.Error() != nil {
  112. return token.Error()
  113. }
  114. return nil
  115. })
  116. if err != nil {
  117. return err
  118. }
  119. return nil
  120. })
  121. c.addRouter(r)
  122. return r
  123. }
  124. func (c *MqttClient) Response(item *router.Item, data []byte) error {
  125. token := c.client.Publish(item.Topic+"/reply", 2, false, data)
  126. if token.Wait(); token.Error() != nil {
  127. return token.Error()
  128. }
  129. return nil
  130. }
  131. func (c *MqttClient) onConnect() error {
  132. err := c.rangeRouters(func(r *router.Router) error {
  133. err := r.RangeItem(func(item *router.Item) error {
  134. err := item.DoIfUnSubscribe(func() error {
  135. token := c.client.Subscribe(item.Topic, 2, func(client mqtt.Client, message mqtt.Message) {
  136. item.CallHandlers(message.Payload())
  137. })
  138. if token.Wait(); token.Error() != nil {
  139. return token.Error()
  140. }
  141. fmt.Println("[MQTT] Subscribe Topic: " + item.Topic)
  142. return nil
  143. })
  144. if err != nil {
  145. return err
  146. }
  147. return nil
  148. })
  149. if err != nil {
  150. return errors.New("SetOnConnectHandler订阅失败: " + err.Error())
  151. }
  152. return nil
  153. })
  154. if err != nil {
  155. return err
  156. }
  157. return nil
  158. }
  159. func (c *MqttClient) onConnectLost() error {
  160. err := c.rangeRouters(func(r *router.Router) error {
  161. err := r.RangeItem(func(item *router.Item) error {
  162. item.SetUnSubscribe()
  163. return nil
  164. })
  165. if err != nil {
  166. return err
  167. }
  168. return nil
  169. })
  170. if err != nil {
  171. return errors.New("SetOnConnectHandler订阅失败: " + err.Error())
  172. }
  173. return nil
  174. }
  175. func (c *MqttClient) addRouter(router *router.Router) {
  176. c.routersMutex.Lock()
  177. defer c.routersMutex.Unlock()
  178. c.routers = append(c.routers, router)
  179. }
  180. func (c *MqttClient) rangeRouters(rangeFunc func(router *router.Router) error) error {
  181. c.routersMutex.Lock()
  182. defer c.routersMutex.Unlock()
  183. for _, r := range c.routers {
  184. err := rangeFunc(r)
  185. if err != nil {
  186. return err
  187. }
  188. }
  189. return nil
  190. }