mqtt_client.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. package mqtt_client
  2. import (
  3. "errors"
  4. "fmt"
  5. "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router"
  6. "git.sxidc.com/go-tools/api_binding/utils"
  7. mqtt "github.com/eclipse/paho.mqtt.golang"
  8. "sync"
  9. "time"
  10. )
  11. type MqttClientOptions struct {
  12. UserName string
  13. Password string
  14. Address string
  15. ClientID string
  16. KeepAliveSec time.Duration
  17. PingTimeoutSec time.Duration
  18. }
  19. func (opt *MqttClientOptions) check() error {
  20. if utils.IsStringEmpty(opt.UserName) {
  21. return errors.New("必须传递用户名")
  22. }
  23. if utils.IsStringEmpty(opt.Password) {
  24. return errors.New("必须传递密码")
  25. }
  26. if utils.IsStringEmpty(opt.Address) {
  27. return errors.New("必须传递地址")
  28. }
  29. if utils.IsStringEmpty(opt.ClientID) {
  30. return errors.New("必须传递客户端ID")
  31. }
  32. return nil
  33. }
  34. type MqttClient struct {
  35. client mqtt.Client
  36. clientOptions *mqtt.ClientOptions
  37. routersMutex *sync.Mutex
  38. routers []router.Router
  39. }
  40. func NewMqttClient(opts *MqttClientOptions) (*MqttClient, error) {
  41. if opts == nil {
  42. return nil, errors.New("没有传递mqtt客户端选项")
  43. }
  44. err := opts.check()
  45. if err != nil {
  46. return nil, err
  47. }
  48. mqttOptions := mqtt.NewClientOptions().
  49. SetAutoReconnect(true).
  50. SetUsername(opts.UserName).
  51. SetPassword(opts.Password).
  52. AddBroker(opts.Address).
  53. SetClientID(opts.ClientID).
  54. SetKeepAlive(opts.KeepAliveSec).
  55. SetPingTimeout(opts.PingTimeoutSec).
  56. SetWill(opts.ClientID+"/will", "dead", 2, true)
  57. return &MqttClient{
  58. client: mqtt.NewClient(mqttOptions),
  59. clientOptions: mqttOptions,
  60. routersMutex: &sync.Mutex{},
  61. routers: make([]router.Router, 0),
  62. }, nil
  63. }
  64. func DestroyMqttClient(c *MqttClient) {
  65. if c != nil {
  66. c.client = nil
  67. c.routersMutex.Lock()
  68. for _, r := range c.routers {
  69. router.DestroyRouter(&r)
  70. }
  71. c.routers = nil
  72. c.routersMutex.Unlock()
  73. }
  74. c = nil
  75. }
  76. func (c *MqttClient) Connect() error {
  77. c.clientOptions.SetOnConnectHandler(func(client mqtt.Client) {
  78. c.routersMutex.Lock()
  79. defer c.routersMutex.Unlock()
  80. err := c.rangeRouters(func(r *router.Router) error {
  81. err := r.RangeItem(func(item router.Item) error {
  82. topic := r.Group + item.Topic
  83. token := c.client.Subscribe(topic, item.Qos, func(client mqtt.Client, message mqtt.Message) {
  84. item.CallHandlers(message.Payload())
  85. })
  86. if token.Wait(); token.Error() != nil {
  87. return token.Error()
  88. }
  89. return nil
  90. })
  91. if err != nil {
  92. return errors.New("SetOnConnectHandler订阅失败: " + err.Error())
  93. }
  94. return nil
  95. })
  96. if err != nil {
  97. fmt.Println(err)
  98. return
  99. }
  100. })
  101. token := c.client.Connect()
  102. if token.Wait(); token.Error() != nil {
  103. return token.Error()
  104. }
  105. return nil
  106. }
  107. func (c *MqttClient) Disconnect() {
  108. c.client.Disconnect(10000)
  109. }
  110. func (c *MqttClient) GetRouter(group string, handlers []router.Handler) *router.Router {
  111. r := router.NewRouter(group, handlers, func(item router.Item) error {
  112. for {
  113. if c.client.IsConnected() {
  114. break
  115. }
  116. time.Sleep(1 * time.Second)
  117. }
  118. topic := group + item.Topic
  119. token := c.client.Subscribe(topic, item.Qos, func(client mqtt.Client, message mqtt.Message) {
  120. item.CallHandlers(message.Payload())
  121. })
  122. if token.Wait(); token.Error() != nil {
  123. return token.Error()
  124. }
  125. return nil
  126. })
  127. c.addRouter(r)
  128. return r
  129. }
  130. func (c *MqttClient) Response(item *router.Item, data []byte) error {
  131. token := c.client.Publish(item.Topic+"/reply", item.Qos, item.ResponseRetained, data)
  132. if token.Wait(); token.Error() != nil {
  133. return token.Error()
  134. }
  135. return nil
  136. }
  137. func (c *MqttClient) addRouter(router *router.Router) {
  138. c.routersMutex.Lock()
  139. defer c.routersMutex.Unlock()
  140. c.routers = append(c.routers, *router)
  141. }
  142. func (c *MqttClient) rangeRouters(rangeFunc func(router *router.Router) error) error {
  143. c.routersMutex.Lock()
  144. defer c.routersMutex.Unlock()
  145. for _, r := range c.routers {
  146. err := rangeFunc(&r)
  147. if err != nil {
  148. return err
  149. }
  150. }
  151. return nil
  152. }