mqtt_client.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. package mqtt_client
  2. import (
  3. "errors"
  4. "fmt"
  5. "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router"
  6. "git.sxidc.com/go-tools/api_binding/utils"
  7. mqtt "github.com/eclipse/paho.mqtt.golang"
  8. "sync"
  9. "time"
  10. )
  11. type MqttClientOptions struct {
  12. UserName string
  13. Password string
  14. Address string
  15. ClientID string
  16. KeepAliveSec int64
  17. PingTimeoutSec int64
  18. WriteTimeoutSec int64
  19. }
  20. func (opt *MqttClientOptions) check() error {
  21. if utils.IsStringEmpty(opt.UserName) {
  22. return errors.New("必须传递用户名")
  23. }
  24. if utils.IsStringEmpty(opt.Password) {
  25. return errors.New("必须传递密码")
  26. }
  27. if utils.IsStringEmpty(opt.Address) {
  28. return errors.New("必须传递地址")
  29. }
  30. if utils.IsStringEmpty(opt.ClientID) {
  31. return errors.New("必须传递客户端ID")
  32. }
  33. return nil
  34. }
  35. type MqttClient struct {
  36. client mqtt.Client
  37. routersMutex *sync.Mutex
  38. routers []*router.Router
  39. }
  40. func NewMqttClient(opts *MqttClientOptions) (*MqttClient, error) {
  41. if opts == nil {
  42. return nil, errors.New("没有传递mqtt客户端选项")
  43. }
  44. err := opts.check()
  45. if err != nil {
  46. return nil, err
  47. }
  48. mqttClient := &MqttClient{
  49. routersMutex: &sync.Mutex{},
  50. routers: make([]*router.Router, 0),
  51. }
  52. mqttOptions := mqtt.NewClientOptions().
  53. SetAutoReconnect(true).
  54. SetUsername(opts.UserName).
  55. SetPassword(opts.Password).
  56. AddBroker(opts.Address).
  57. SetClientID(opts.ClientID).
  58. SetKeepAlive(time.Duration(opts.KeepAliveSec)*time.Second).
  59. SetPingTimeout(time.Duration(opts.PingTimeoutSec)*time.Second).
  60. SetWriteTimeout(time.Duration(opts.WriteTimeoutSec)*time.Second).
  61. SetWill(opts.ClientID+"/will", "dead", 2, false).
  62. SetOnConnectHandler(func(client mqtt.Client) {
  63. err := mqttClient.onConnect()
  64. if err != nil {
  65. fmt.Println(err)
  66. return
  67. }
  68. }).
  69. SetConnectionLostHandler(func(client mqtt.Client, _ error) {
  70. err := mqttClient.onConnectLost()
  71. if err != nil {
  72. fmt.Println(err)
  73. return
  74. }
  75. })
  76. mqttClient.client = mqtt.NewClient(mqttOptions)
  77. return mqttClient, nil
  78. }
  79. func DestroyMqttClient(c *MqttClient) {
  80. if c != nil {
  81. c.client = nil
  82. c.routersMutex.Lock()
  83. for _, r := range c.routers {
  84. router.DestroyRouter(r)
  85. }
  86. c.routers = nil
  87. c.routersMutex.Unlock()
  88. }
  89. c = nil
  90. }
  91. func (c *MqttClient) Connect() error {
  92. token := c.client.Connect()
  93. if token.Wait(); token.Error() != nil {
  94. return token.Error()
  95. }
  96. return nil
  97. }
  98. func (c *MqttClient) Disconnect() {
  99. c.client.Disconnect(250)
  100. }
  101. func (c *MqttClient) GetRouter(group string, handlers []router.Handler) *router.Router {
  102. r := router.NewRouter(group, handlers, func(item *router.Item) error {
  103. for {
  104. if c.client.IsConnected() {
  105. break
  106. }
  107. time.Sleep(1 * time.Second)
  108. }
  109. err := item.DoIfUnSubscribe(func() error {
  110. return c.subscribe(item)
  111. })
  112. if err != nil {
  113. return err
  114. }
  115. return nil
  116. })
  117. c.addRouter(r)
  118. return r
  119. }
  120. func (c *MqttClient) Response(item *router.Item, data []byte) error {
  121. token := c.client.Publish(item.Topic+"/reply", 2, false, data)
  122. if token.Wait(); token.Error() != nil {
  123. return token.Error()
  124. }
  125. return nil
  126. }
  127. func (c *MqttClient) onConnect() error {
  128. err := c.rangeRouters(func(r *router.Router) error {
  129. err := r.RangeItem(func(item *router.Item) error {
  130. err := item.DoIfUnSubscribe(func() error {
  131. return c.subscribe(item)
  132. })
  133. if err != nil {
  134. return err
  135. }
  136. return nil
  137. })
  138. if err != nil {
  139. return errors.New("SetOnConnectHandler订阅失败: " + err.Error())
  140. }
  141. return nil
  142. })
  143. if err != nil {
  144. return err
  145. }
  146. return nil
  147. }
  148. func (c *MqttClient) onConnectLost() error {
  149. err := c.rangeRouters(func(r *router.Router) error {
  150. err := r.RangeItem(func(item *router.Item) error {
  151. item.SetUnSubscribe()
  152. return nil
  153. })
  154. if err != nil {
  155. return err
  156. }
  157. return nil
  158. })
  159. if err != nil {
  160. return errors.New("SetOnConnectHandler订阅失败: " + err.Error())
  161. }
  162. return nil
  163. }
  164. func (c *MqttClient) addRouter(router *router.Router) {
  165. c.routersMutex.Lock()
  166. defer c.routersMutex.Unlock()
  167. c.routers = append(c.routers, router)
  168. }
  169. func (c *MqttClient) rangeRouters(rangeFunc func(router *router.Router) error) error {
  170. c.routersMutex.Lock()
  171. defer c.routersMutex.Unlock()
  172. for _, r := range c.routers {
  173. err := rangeFunc(r)
  174. if err != nil {
  175. return err
  176. }
  177. }
  178. return nil
  179. }
  180. func (c *MqttClient) subscribe(item *router.Item) error {
  181. token := c.client.Subscribe(item.Topic, 2, func(client mqtt.Client, message mqtt.Message) {
  182. item.CallHandlers(message.Payload())
  183. })
  184. if token.Wait(); token.Error() != nil {
  185. return token.Error()
  186. }
  187. fmt.Println("[MQTT] Subscribe Topic: " + item.Topic)
  188. return nil
  189. }