mqtt_client.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531
  1. package mqtt_client
  2. import (
  3. "encoding/json"
  4. "git.sxidc.com/go-tools/utils/strutils"
  5. mqtt "github.com/eclipse/paho.mqtt.golang"
  6. "github.com/pkg/errors"
  7. "log"
  8. "sync"
  9. "time"
  10. )
  11. var (
  12. ErrMessageIgnore = errors.New("mqtt消息忽略")
  13. )
  14. type MessageHandler func(client *Client, token *SubscribeToken, topic string, data []byte) error
  15. type ClientOptions struct {
  16. UserName string
  17. Password string
  18. Address string
  19. ClientID string
  20. KeepAliveSec int64
  21. PingTimeoutSec int64
  22. WriteTimeoutSec int64
  23. }
  24. func (opts *ClientOptions) check() error {
  25. if strutils.IsStringEmpty(opts.UserName) {
  26. return errors.New("必须传递用户名")
  27. }
  28. if strutils.IsStringEmpty(opts.Password) {
  29. return errors.New("必须传递密码")
  30. }
  31. if strutils.IsStringEmpty(opts.Address) {
  32. return errors.New("必须传递地址")
  33. }
  34. if strutils.IsStringEmpty(opts.ClientID) {
  35. return errors.New("必须传递客户端ID")
  36. }
  37. return nil
  38. }
  39. type SubscribeToken struct {
  40. messageHandler MessageHandler
  41. successHandleCount int
  42. handleCount int
  43. }
  44. func (token *SubscribeToken) SuccessHandleCount() int {
  45. return token.successHandleCount
  46. }
  47. func (token *SubscribeToken) HandleCount() int {
  48. return token.handleCount
  49. }
  50. type subscribeTopic struct {
  51. topic string
  52. tokens []*SubscribeToken
  53. }
  54. type Client struct {
  55. client mqtt.Client
  56. opts *ClientOptions
  57. topicsMutex *sync.Mutex
  58. topics []*subscribeTopic
  59. publishAndReceiveTopicMapMutex sync.Mutex
  60. publishAndReceiveTopicMap map[string]chan any
  61. }
  62. func New(opts *ClientOptions) (*Client, error) {
  63. if opts == nil {
  64. return nil, errors.New("必须传递参数")
  65. }
  66. err := opts.check()
  67. if err != nil {
  68. return nil, err
  69. }
  70. if opts.WriteTimeoutSec == 0 {
  71. opts.WriteTimeoutSec = 60
  72. }
  73. client := &Client{
  74. opts: opts,
  75. topicsMutex: &sync.Mutex{},
  76. topics: make([]*subscribeTopic, 0),
  77. publishAndReceiveTopicMap: make(map[string]chan any),
  78. }
  79. mqttClient := mqtt.NewClient(mqtt.NewClientOptions().
  80. SetAutoReconnect(true).
  81. SetUsername(opts.UserName).
  82. SetPassword(opts.Password).
  83. AddBroker(opts.Address).
  84. SetClientID(opts.ClientID).
  85. SetKeepAlive(time.Duration(opts.KeepAliveSec)*time.Second).
  86. SetPingTimeout(time.Duration(opts.PingTimeoutSec)*time.Second).
  87. SetWriteTimeout(time.Duration(opts.WriteTimeoutSec)*time.Second).
  88. SetOrderMatters(false).
  89. SetWill(opts.ClientID+"/will", "dead", 2, true).
  90. SetOnConnectHandler(func(mqttClient mqtt.Client) {
  91. err := client.subscribeAll()
  92. if err != nil {
  93. log.Println("SetOnConnectHandler:", err.Error())
  94. return
  95. }
  96. }))
  97. token := mqttClient.Connect()
  98. if !token.WaitTimeout(time.Duration(opts.WriteTimeoutSec) * time.Second) {
  99. return nil, errors.New("连接超时")
  100. }
  101. if token.Error() != nil {
  102. return nil, token.Error()
  103. }
  104. client.client = mqttClient
  105. return client, nil
  106. }
  107. func Destroy(client *Client) {
  108. client.client.Disconnect(250)
  109. client.topicsMutex.Lock()
  110. for _, subscribedTopic := range client.topics {
  111. token := client.client.Unsubscribe(subscribedTopic.topic)
  112. if !token.WaitTimeout(time.Duration(client.opts.WriteTimeoutSec) * time.Second) {
  113. continue
  114. }
  115. if token.Error() != nil {
  116. log.Println("Destroy: ", token.Error())
  117. continue
  118. }
  119. }
  120. client.topics = nil
  121. client.topicsMutex.Unlock()
  122. client = nil
  123. }
  124. func (client *Client) Publish(topic string, qos byte, retained bool, payload any) error {
  125. if strutils.IsStringEmpty(topic) {
  126. return errors.New("没有传递发布主题")
  127. }
  128. if payload == nil {
  129. return errors.New("发布的payload不能为nil")
  130. }
  131. client.publish(topic, qos, retained, payload)
  132. return nil
  133. }
  134. func (client *Client) Subscribe(topic string, handlerFunc MessageHandler) (*SubscribeToken, error) {
  135. if strutils.IsStringEmpty(topic) {
  136. return nil, errors.New("没有传递订阅主题")
  137. }
  138. if handlerFunc == nil {
  139. return nil, errors.New("必须传递处理函数")
  140. }
  141. return client.subscribe(topic, handlerFunc)
  142. }
  143. func (client *Client) Unsubscribe(topic string, token *SubscribeToken) error {
  144. if strutils.IsStringEmpty(topic) {
  145. return errors.New("没有传递取消订阅主题")
  146. }
  147. return client.unsubscribe(topic, token)
  148. }
  149. type PublishAndReceiveReplyParams struct {
  150. Topic string
  151. ReplyTopic string
  152. PublishData []byte
  153. RepublishDurationSec int64
  154. TryTimes int
  155. StopBefore bool
  156. }
  157. func (params *PublishAndReceiveReplyParams) Check() error {
  158. if strutils.IsStringEmpty(params.Topic) {
  159. return errors.New("没有传递订阅主题")
  160. }
  161. if strutils.IsStringEmpty(params.ReplyTopic) {
  162. return errors.New("没有传递响应订阅主题")
  163. }
  164. if params.PublishData == nil {
  165. return errors.New("发布的数据不能为nil")
  166. }
  167. return nil
  168. }
  169. type msgResponse struct {
  170. Success bool `json:"success"`
  171. Msg string `json:"msg"`
  172. }
  173. func (client *Client) PublishAndReceiveReplyMsgResponse(params *PublishAndReceiveReplyParams) error {
  174. err := params.Check()
  175. if err != nil {
  176. return err
  177. }
  178. err = client.publishAndReceiveReply(params, func(payload []byte) error {
  179. resp := new(msgResponse)
  180. err := json.Unmarshal(payload, resp)
  181. if err != nil {
  182. return err
  183. }
  184. if !resp.Success {
  185. return errors.New(resp.Msg)
  186. }
  187. return nil
  188. })
  189. if err != nil {
  190. return err
  191. }
  192. return nil
  193. }
  194. func (client *Client) PublishAndReceiveReply(params *PublishAndReceiveReplyParams, payloadDealFunc func(payload []byte) error) error {
  195. err := params.Check()
  196. if err != nil {
  197. return err
  198. }
  199. return client.publishAndReceiveReply(params, payloadDealFunc)
  200. }
  201. func (client *Client) publish(topic string, qos byte, retained bool, payload any) {
  202. client.waitConnected()
  203. token := client.client.Publish(topic, qos, retained, payload)
  204. if !token.WaitTimeout(time.Duration(client.opts.WriteTimeoutSec) * time.Second) {
  205. log.Println("发布超时")
  206. return
  207. }
  208. if token.Error() != nil {
  209. log.Println("publish:", token.Error())
  210. return
  211. }
  212. }
  213. func (client *Client) subscribe(topic string, handlerFunc MessageHandler) (*SubscribeToken, error) {
  214. client.waitConnected()
  215. return client.addSubscribedTopic(topic, handlerFunc, func(subscribedTopic *subscribeTopic) error {
  216. return client.doSubscribe(subscribedTopic)
  217. })
  218. }
  219. func (client *Client) unsubscribe(topic string, token *SubscribeToken) error {
  220. client.waitConnected()
  221. err := client.removeSubscribedTopic(topic, token, func(subscribedTopic *subscribeTopic) error {
  222. token := client.client.Unsubscribe(subscribedTopic.topic)
  223. if !token.WaitTimeout(time.Duration(client.opts.WriteTimeoutSec) * time.Second) {
  224. return errors.New("取消订阅超时")
  225. }
  226. if token.Error() != nil {
  227. return token.Error()
  228. }
  229. return nil
  230. })
  231. if err != nil {
  232. return err
  233. }
  234. return nil
  235. }
  236. func (client *Client) publishAndReceiveReply(params *PublishAndReceiveReplyParams, payloadDealFunc func(payload []byte) error) error {
  237. doneChan := make(chan any)
  238. if params.StopBefore {
  239. client.publishAndReceiveTopicMapMutex.Lock()
  240. existDoneChan, ok := client.publishAndReceiveTopicMap[params.Topic]
  241. if ok {
  242. close(existDoneChan)
  243. delete(client.publishAndReceiveTopicMap, params.Topic)
  244. } else {
  245. client.publishAndReceiveTopicMap[params.Topic] = doneChan
  246. }
  247. client.publishAndReceiveTopicMapMutex.Unlock()
  248. }
  249. // 订阅响应主题
  250. token, err := client.subscribe(params.ReplyTopic, func(c *Client, token *SubscribeToken, topic string, data []byte) error {
  251. defer func() {
  252. client.publishAndReceiveTopicMapMutex.Lock()
  253. existDoneChan, ok := client.publishAndReceiveTopicMap[params.Topic]
  254. if ok {
  255. close(existDoneChan)
  256. delete(client.publishAndReceiveTopicMap, params.Topic)
  257. }
  258. client.publishAndReceiveTopicMapMutex.Unlock()
  259. }()
  260. if token.SuccessHandleCount() >= 1 {
  261. return ErrMessageIgnore
  262. }
  263. if payloadDealFunc != nil {
  264. err := payloadDealFunc(data)
  265. if err != nil {
  266. return err
  267. }
  268. }
  269. return nil
  270. })
  271. if err != nil {
  272. return err
  273. }
  274. go func() {
  275. client.publish(params.Topic, 2, false, params.PublishData)
  276. currentTryTime := 1
  277. timer := time.NewTimer(time.Duration(params.RepublishDurationSec) * time.Second)
  278. defer timer.Stop()
  279. for {
  280. select {
  281. case <-doneChan:
  282. err := client.unsubscribe(params.ReplyTopic, token)
  283. if err != nil {
  284. log.Println("publishAndReceiveReply done:", err.Error())
  285. return
  286. }
  287. client.publishAndReceiveTopicMapMutex.Lock()
  288. delete(client.publishAndReceiveTopicMap, params.Topic)
  289. client.publishAndReceiveTopicMapMutex.Unlock()
  290. return
  291. case <-timer.C:
  292. client.publish(params.Topic, 2, false, params.PublishData)
  293. if params.TryTimes != 0 {
  294. currentTryTime++
  295. }
  296. if params.TryTimes != 0 && currentTryTime > params.TryTimes {
  297. client.publishAndReceiveTopicMapMutex.Lock()
  298. existDoneChan, ok := client.publishAndReceiveTopicMap[params.Topic]
  299. if ok {
  300. close(existDoneChan)
  301. delete(client.publishAndReceiveTopicMap, params.Topic)
  302. }
  303. client.publishAndReceiveTopicMapMutex.Unlock()
  304. return
  305. }
  306. resetDuration := time.Duration(params.RepublishDurationSec*int64(currentTryTime)) * time.Second
  307. timer.Reset(resetDuration)
  308. }
  309. }
  310. }()
  311. return nil
  312. }
  313. func (client *Client) waitConnected() {
  314. for {
  315. if client.client.IsConnected() {
  316. break
  317. }
  318. time.Sleep(1 * time.Second)
  319. }
  320. }
  321. func (client *Client) subscribeAll() error {
  322. err := client.rangeSubscribedTopics(func(subscribedTopic *subscribeTopic) error {
  323. return client.doSubscribe(subscribedTopic)
  324. })
  325. if err != nil {
  326. return err
  327. }
  328. return nil
  329. }
  330. func (client *Client) doSubscribe(subscribedTopic *subscribeTopic) error {
  331. token := client.client.Subscribe(subscribedTopic.topic, 2, func(mqttClient mqtt.Client, message mqtt.Message) {
  332. wg := sync.WaitGroup{}
  333. wg.Add(len(subscribedTopic.tokens))
  334. for _, token := range subscribedTopic.tokens {
  335. go func(token *SubscribeToken, message mqtt.Message) {
  336. defer func() {
  337. token.handleCount++
  338. wg.Done()
  339. }()
  340. err := token.messageHandler(client, token, subscribedTopic.topic, message.Payload())
  341. if err != nil && !errors.Is(err, ErrMessageIgnore) {
  342. log.Println("doSubscribe token.messageHandler:", err.Error())
  343. return
  344. }
  345. if err != nil && errors.Is(err, ErrMessageIgnore) {
  346. return
  347. }
  348. token.successHandleCount++
  349. }(token, message)
  350. }
  351. wg.Wait()
  352. })
  353. if !token.WaitTimeout(time.Duration(client.opts.WriteTimeoutSec) * time.Second) {
  354. return errors.New("订阅超时")
  355. }
  356. if token.Error() != nil {
  357. return token.Error()
  358. }
  359. return nil
  360. }
  361. func (client *Client) addSubscribedTopic(topic string, handler MessageHandler, addNew func(subscribedTopic *subscribeTopic) error) (*SubscribeToken, error) {
  362. client.topicsMutex.Lock()
  363. defer client.topicsMutex.Unlock()
  364. newToken := &SubscribeToken{messageHandler: handler}
  365. for _, savedTopic := range client.topics {
  366. if savedTopic.topic == topic {
  367. savedTopic.tokens = append(savedTopic.tokens, newToken)
  368. return newToken, nil
  369. }
  370. }
  371. subscribedTopic := &subscribeTopic{
  372. topic: topic,
  373. tokens: []*SubscribeToken{newToken},
  374. }
  375. err := addNew(subscribedTopic)
  376. if err != nil {
  377. return nil, err
  378. }
  379. client.topics = append(client.topics, subscribedTopic)
  380. return newToken, nil
  381. }
  382. func (client *Client) removeSubscribedTopic(topic string, token *SubscribeToken, noTokens func(subscribedTopic *subscribeTopic) error) error {
  383. client.topicsMutex.Lock()
  384. defer client.topicsMutex.Unlock()
  385. findSubscribeTopicIndex := -1
  386. for index, subscribedTopic := range client.topics {
  387. if subscribedTopic.topic == topic {
  388. findSubscribeTopicIndex = index
  389. }
  390. }
  391. if findSubscribeTopicIndex == -1 {
  392. return nil
  393. }
  394. subscribedTopic := client.topics[findSubscribeTopicIndex]
  395. if subscribedTopic.tokens != nil && len(subscribedTopic.tokens) == 1 {
  396. err := noTokens(subscribedTopic)
  397. if err != nil {
  398. return err
  399. }
  400. client.topics = append(client.topics[:findSubscribeTopicIndex], client.topics[findSubscribeTopicIndex+1:]...)
  401. return nil
  402. }
  403. findTokenIndex := -1
  404. for index, savedToken := range subscribedTopic.tokens {
  405. if savedToken == token {
  406. findTokenIndex = index
  407. }
  408. }
  409. if findTokenIndex == -1 {
  410. return nil
  411. }
  412. subscribedTopic.tokens = append(subscribedTopic.tokens[:findTokenIndex], subscribedTopic.tokens[findTokenIndex+1:]...)
  413. return nil
  414. }
  415. func (client *Client) rangeSubscribedTopics(rangeFunc func(subscribedTopic *subscribeTopic) error) error {
  416. client.topicsMutex.Lock()
  417. defer client.topicsMutex.Unlock()
  418. for _, subscribedTopic := range client.topics {
  419. tempTopic := subscribedTopic
  420. err := rangeFunc(tempTopic)
  421. if err != nil {
  422. return err
  423. }
  424. }
  425. return nil
  426. }