mqtt_client.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package mqtt_client
  2. import (
  3. "errors"
  4. "fmt"
  5. "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router"
  6. "git.sxidc.com/go-tools/api_binding/utils"
  7. mqtt "github.com/eclipse/paho.mqtt.golang"
  8. "sync"
  9. "time"
  10. )
  11. type MqttClientOptions struct {
  12. UserName string
  13. Password string
  14. Address string
  15. ClientID string
  16. KeepAliveSec time.Duration
  17. PingTimeoutSec time.Duration
  18. }
  19. func (opt *MqttClientOptions) check() error {
  20. if utils.IsStringEmpty(opt.UserName) {
  21. return errors.New("必须传递用户名")
  22. }
  23. if utils.IsStringEmpty(opt.Password) {
  24. return errors.New("必须传递密码")
  25. }
  26. if utils.IsStringEmpty(opt.Address) {
  27. return errors.New("必须传递地址")
  28. }
  29. if utils.IsStringEmpty(opt.ClientID) {
  30. return errors.New("必须传递客户端ID")
  31. }
  32. return nil
  33. }
  34. type MqttClient struct {
  35. client mqtt.Client
  36. clientOptions *mqtt.ClientOptions
  37. routersMutex *sync.Mutex
  38. routers []router.Router
  39. }
  40. func NewMqttClient(opts *MqttClientOptions) (*MqttClient, error) {
  41. if opts == nil {
  42. return nil, errors.New("没有传递mqtt客户端选项")
  43. }
  44. err := opts.check()
  45. if err != nil {
  46. return nil, err
  47. }
  48. mqttOptions := mqtt.NewClientOptions().
  49. SetAutoReconnect(true).
  50. SetUsername(opts.UserName).
  51. SetPassword(opts.Password).
  52. AddBroker(opts.Address).
  53. SetClientID(opts.ClientID).
  54. SetKeepAlive(opts.KeepAliveSec).
  55. SetPingTimeout(opts.PingTimeoutSec).
  56. SetWill(opts.ClientID+"/will", "dead", 2, true)
  57. return &MqttClient{
  58. client: mqtt.NewClient(mqttOptions),
  59. clientOptions: mqttOptions,
  60. routersMutex: &sync.Mutex{},
  61. routers: make([]router.Router, 0),
  62. }, nil
  63. }
  64. func DestroyMqttClient(c *MqttClient) {
  65. if c != nil {
  66. c.client = nil
  67. c.routersMutex.Lock()
  68. for _, r := range c.routers {
  69. router.DestroyRouter(&r)
  70. }
  71. c.routers = nil
  72. c.routersMutex.Unlock()
  73. }
  74. c = nil
  75. }
  76. func (c *MqttClient) Connect() error {
  77. c.clientOptions.SetOnConnectHandler(func(client mqtt.Client) {
  78. c.routersMutex.Lock()
  79. defer c.routersMutex.Unlock()
  80. err := c.rangeRouters(func(r *router.Router) error {
  81. err := r.RangeItem(func(item router.Item) error {
  82. token := c.client.Subscribe(item.Topic, item.Qos, func(client mqtt.Client, message mqtt.Message) {
  83. item.CallHandlers(message.Payload())
  84. })
  85. if token.Wait(); token.Error() != nil {
  86. return token.Error()
  87. }
  88. return nil
  89. })
  90. if err != nil {
  91. return errors.New("SetOnConnectHandler订阅失败: " + err.Error())
  92. }
  93. return nil
  94. })
  95. if err != nil {
  96. fmt.Println(err)
  97. return
  98. }
  99. })
  100. token := c.client.Connect()
  101. if token.Wait(); token.Error() != nil {
  102. return token.Error()
  103. }
  104. return nil
  105. }
  106. func (c *MqttClient) Disconnect() {
  107. c.client.Disconnect(10000)
  108. }
  109. func (c *MqttClient) GetRouter(group string, handlers []router.Handler) *router.Router {
  110. r := router.NewRouter(group, handlers, func(item router.Item) error {
  111. for {
  112. if c.client.IsConnected() {
  113. break
  114. }
  115. time.Sleep(1 * time.Second)
  116. }
  117. token := c.client.Subscribe(item.Topic, item.Qos, func(client mqtt.Client, message mqtt.Message) {
  118. item.CallHandlers(message.Payload())
  119. })
  120. if token.Wait(); token.Error() != nil {
  121. return token.Error()
  122. }
  123. return nil
  124. })
  125. c.addRouter(r)
  126. return r
  127. }
  128. func (c *MqttClient) Response(item *router.Item, data []byte) error {
  129. token := c.client.Publish(item.Topic+"/reply", item.Qos, item.ResponseRetained, data)
  130. if token.Wait(); token.Error() != nil {
  131. return token.Error()
  132. }
  133. return nil
  134. }
  135. func (c *MqttClient) addRouter(router *router.Router) {
  136. c.routersMutex.Lock()
  137. defer c.routersMutex.Unlock()
  138. c.routers = append(c.routers, *router)
  139. }
  140. func (c *MqttClient) rangeRouters(rangeFunc func(router *router.Router) error) error {
  141. c.routersMutex.Lock()
  142. defer c.routersMutex.Unlock()
  143. for _, r := range c.routers {
  144. err := rangeFunc(&r)
  145. if err != nil {
  146. return err
  147. }
  148. }
  149. return nil
  150. }