mqtt_client.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. package mqtt_client
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "git.sxidc.com/go-tools/utils/strutils"
  6. "git.sxidc.com/service-supports/fslog"
  7. mqtt "github.com/eclipse/paho.mqtt.golang"
  8. "log"
  9. "sync"
  10. "time"
  11. )
  12. var (
  13. ErrMessageIgnore = errors.New("mqtt消息忽略")
  14. )
  15. type MessageHandler func(client *Client, token *SubscribeToken, topic string, data []byte) error
  16. type ClientOptions struct {
  17. UserName string
  18. Password string
  19. Address string
  20. ClientID string
  21. KeepAliveSec int64
  22. PingTimeoutSec int64
  23. WriteTimeoutSec int64
  24. }
  25. func (opts *ClientOptions) check() error {
  26. if strutils.IsStringEmpty(opts.UserName) {
  27. return errors.New("必须传递用户名")
  28. }
  29. if strutils.IsStringEmpty(opts.Password) {
  30. return errors.New("必须传递密码")
  31. }
  32. if strutils.IsStringEmpty(opts.Address) {
  33. return errors.New("必须传递地址")
  34. }
  35. if strutils.IsStringEmpty(opts.ClientID) {
  36. return errors.New("必须传递客户端ID")
  37. }
  38. return nil
  39. }
  40. type SubscribeToken struct {
  41. messageHandler MessageHandler
  42. successHandleCount int
  43. handleCount int
  44. }
  45. func (token *SubscribeToken) SuccessHandleCount() int {
  46. return token.successHandleCount
  47. }
  48. func (token *SubscribeToken) HandleCount() int {
  49. return token.handleCount
  50. }
  51. type subscribeTopic struct {
  52. topic string
  53. tokens []*SubscribeToken
  54. }
  55. type Client struct {
  56. client mqtt.Client
  57. opts *ClientOptions
  58. topicsMutex *sync.Mutex
  59. topics []*subscribeTopic
  60. publishAndReceiveTopicMapMutex sync.Mutex
  61. publishAndReceiveTopicMap map[string]chan any
  62. }
  63. func New(opts *ClientOptions) (*Client, error) {
  64. if opts == nil {
  65. return nil, errors.New("必须传递参数")
  66. }
  67. err := opts.check()
  68. if err != nil {
  69. return nil, err
  70. }
  71. if opts.WriteTimeoutSec == 0 {
  72. opts.WriteTimeoutSec = 60
  73. }
  74. client := &Client{
  75. opts: opts,
  76. topicsMutex: &sync.Mutex{},
  77. topics: make([]*subscribeTopic, 0),
  78. publishAndReceiveTopicMap: make(map[string]chan any),
  79. }
  80. mqttClient := mqtt.NewClient(mqtt.NewClientOptions().
  81. SetAutoReconnect(true).
  82. SetUsername(opts.UserName).
  83. SetPassword(opts.Password).
  84. AddBroker(opts.Address).
  85. SetClientID(opts.ClientID).
  86. SetKeepAlive(time.Duration(opts.KeepAliveSec)*time.Second).
  87. SetPingTimeout(time.Duration(opts.PingTimeoutSec)*time.Second).
  88. SetWriteTimeout(time.Duration(opts.WriteTimeoutSec)*time.Second).
  89. SetOrderMatters(false).
  90. SetWill(opts.ClientID+"/will", "dead", 2, true).
  91. SetOnConnectHandler(func(mqttClient mqtt.Client) {
  92. err := client.subscribeAll()
  93. if err != nil {
  94. fslog.Error("SetOnConnectHandler: " + err.Error())
  95. return
  96. }
  97. }))
  98. token := mqttClient.Connect()
  99. if !token.WaitTimeout(time.Duration(opts.WriteTimeoutSec) * time.Second) {
  100. return nil, errors.New("连接超时")
  101. }
  102. if token.Error() != nil {
  103. return nil, token.Error()
  104. }
  105. client.client = mqttClient
  106. return client, nil
  107. }
  108. func Destroy(client *Client) {
  109. client.client.Disconnect(250)
  110. client.topicsMutex.Lock()
  111. for _, subscribedTopic := range client.topics {
  112. token := client.client.Unsubscribe(subscribedTopic.topic)
  113. if !token.WaitTimeout(time.Duration(client.opts.WriteTimeoutSec) * time.Second) {
  114. continue
  115. }
  116. if token.Error() != nil {
  117. fslog.Error(token.Error())
  118. continue
  119. }
  120. }
  121. client.topics = nil
  122. client.topicsMutex.Unlock()
  123. client = nil
  124. }
  125. func (client *Client) Publish(topic string, qos byte, retained bool, payload any) error {
  126. if strutils.IsStringEmpty(topic) {
  127. return errors.New("没有传递发布主题")
  128. }
  129. if payload == nil {
  130. return errors.New("发布的payload不能为nil")
  131. }
  132. client.publish(topic, qos, retained, payload)
  133. return nil
  134. }
  135. func (client *Client) Subscribe(topic string, handlerFunc MessageHandler) (*SubscribeToken, error) {
  136. if strutils.IsStringEmpty(topic) {
  137. return nil, errors.New("没有传递订阅主题")
  138. }
  139. if handlerFunc == nil {
  140. return nil, errors.New("必须传递处理函数")
  141. }
  142. return client.subscribe(topic, handlerFunc)
  143. }
  144. func (client *Client) Unsubscribe(topic string, token *SubscribeToken) error {
  145. if strutils.IsStringEmpty(topic) {
  146. return errors.New("没有传递取消订阅主题")
  147. }
  148. return client.unsubscribe(topic, token)
  149. }
  150. type PublishAndReceiveReplyParams struct {
  151. Topic string
  152. ReplyTopic string
  153. PublishData []byte
  154. RepublishDurationSec int64
  155. TryTimes int
  156. StopBefore bool
  157. }
  158. func (params *PublishAndReceiveReplyParams) Check() error {
  159. if strutils.IsStringEmpty(params.Topic) {
  160. return errors.New("没有传递订阅主题")
  161. }
  162. if strutils.IsStringEmpty(params.ReplyTopic) {
  163. return errors.New("没有传递响应订阅主题")
  164. }
  165. if params.PublishData == nil {
  166. return errors.New("发布的数据不能为nil")
  167. }
  168. return nil
  169. }
  170. type msgResponse struct {
  171. Success bool `json:"success"`
  172. Msg string `json:"msg"`
  173. }
  174. func (client *Client) PublishAndReceiveReplyMsgResponse(params *PublishAndReceiveReplyParams) error {
  175. err := params.Check()
  176. if err != nil {
  177. return err
  178. }
  179. err = client.publishAndReceiveReply(params, func(payload []byte) error {
  180. resp := new(msgResponse)
  181. err := json.Unmarshal(payload, resp)
  182. if err != nil {
  183. return err
  184. }
  185. if !resp.Success {
  186. return errors.New(resp.Msg)
  187. }
  188. return nil
  189. })
  190. if err != nil {
  191. return err
  192. }
  193. return nil
  194. }
  195. func (client *Client) PublishAndReceiveReply(params *PublishAndReceiveReplyParams, payloadDealFunc func(payload []byte) error) error {
  196. err := params.Check()
  197. if err != nil {
  198. return err
  199. }
  200. return client.publishAndReceiveReply(params, payloadDealFunc)
  201. }
  202. func (client *Client) publish(topic string, qos byte, retained bool, payload any) {
  203. client.waitConnected()
  204. token := client.client.Publish(topic, qos, retained, payload)
  205. if !token.WaitTimeout(time.Duration(client.opts.WriteTimeoutSec) * time.Second) {
  206. log.Println("发布超时")
  207. return
  208. }
  209. if token.Error() != nil {
  210. log.Println(token.Error())
  211. return
  212. }
  213. }
  214. func (client *Client) subscribe(topic string, handlerFunc MessageHandler) (*SubscribeToken, error) {
  215. client.waitConnected()
  216. return client.addSubscribedTopic(topic, handlerFunc, func(subscribedTopic *subscribeTopic) error {
  217. return client.doSubscribe(subscribedTopic)
  218. })
  219. }
  220. func (client *Client) unsubscribe(topic string, token *SubscribeToken) error {
  221. client.waitConnected()
  222. err := client.removeSubscribedTopic(topic, token, func(subscribedTopic *subscribeTopic) error {
  223. token := client.client.Unsubscribe(subscribedTopic.topic)
  224. if !token.WaitTimeout(time.Duration(client.opts.WriteTimeoutSec) * time.Second) {
  225. return errors.New("取消订阅超时")
  226. }
  227. if token.Error() != nil {
  228. return token.Error()
  229. }
  230. return nil
  231. })
  232. if err != nil {
  233. return err
  234. }
  235. return nil
  236. }
  237. func (client *Client) publishAndReceiveReply(params *PublishAndReceiveReplyParams, payloadDealFunc func(payload []byte) error) error {
  238. doneChan := make(chan any)
  239. if params.StopBefore {
  240. client.publishAndReceiveTopicMapMutex.Lock()
  241. existDoneChan, ok := client.publishAndReceiveTopicMap[params.Topic]
  242. if ok {
  243. close(existDoneChan)
  244. delete(client.publishAndReceiveTopicMap, params.Topic)
  245. } else {
  246. client.publishAndReceiveTopicMap[params.Topic] = doneChan
  247. }
  248. client.publishAndReceiveTopicMapMutex.Unlock()
  249. }
  250. // 订阅响应主题
  251. token, err := client.subscribe(params.ReplyTopic, func(c *Client, token *SubscribeToken, topic string, data []byte) error {
  252. defer func() {
  253. client.publishAndReceiveTopicMapMutex.Lock()
  254. existDoneChan, ok := client.publishAndReceiveTopicMap[params.Topic]
  255. if ok {
  256. close(existDoneChan)
  257. delete(client.publishAndReceiveTopicMap, params.Topic)
  258. }
  259. client.publishAndReceiveTopicMapMutex.Unlock()
  260. }()
  261. if token.SuccessHandleCount() >= 1 {
  262. return ErrMessageIgnore
  263. }
  264. if payloadDealFunc != nil {
  265. err := payloadDealFunc(data)
  266. if err != nil {
  267. return err
  268. }
  269. }
  270. return nil
  271. })
  272. if err != nil {
  273. return err
  274. }
  275. go func() {
  276. client.publish(params.Topic, 2, false, params.PublishData)
  277. currentTryTime := 1
  278. timer := time.NewTimer(time.Duration(params.RepublishDurationSec) * time.Second)
  279. defer timer.Stop()
  280. for {
  281. select {
  282. case <-doneChan:
  283. err := client.unsubscribe(params.ReplyTopic, token)
  284. if err != nil {
  285. fslog.Error(err)
  286. return
  287. }
  288. client.publishAndReceiveTopicMapMutex.Lock()
  289. delete(client.publishAndReceiveTopicMap, params.Topic)
  290. client.publishAndReceiveTopicMapMutex.Unlock()
  291. return
  292. case <-timer.C:
  293. client.publish(params.Topic, 2, false, params.PublishData)
  294. if params.TryTimes != 0 {
  295. currentTryTime++
  296. }
  297. if params.TryTimes != 0 && currentTryTime > params.TryTimes {
  298. client.publishAndReceiveTopicMapMutex.Lock()
  299. existDoneChan, ok := client.publishAndReceiveTopicMap[params.Topic]
  300. if ok {
  301. close(existDoneChan)
  302. delete(client.publishAndReceiveTopicMap, params.Topic)
  303. }
  304. client.publishAndReceiveTopicMapMutex.Unlock()
  305. return
  306. }
  307. resetDuration := time.Duration(params.RepublishDurationSec*int64(currentTryTime)) * time.Second
  308. timer.Reset(resetDuration)
  309. }
  310. }
  311. }()
  312. return nil
  313. }
  314. func (client *Client) waitConnected() {
  315. for {
  316. if client.client.IsConnected() {
  317. break
  318. }
  319. time.Sleep(1 * time.Second)
  320. }
  321. }
  322. func (client *Client) subscribeAll() error {
  323. err := client.rangeSubscribedTopics(func(subscribedTopic *subscribeTopic) error {
  324. return client.doSubscribe(subscribedTopic)
  325. })
  326. if err != nil {
  327. return err
  328. }
  329. return nil
  330. }
  331. func (client *Client) doSubscribe(subscribedTopic *subscribeTopic) error {
  332. token := client.client.Subscribe(subscribedTopic.topic, 2, func(mqttClient mqtt.Client, message mqtt.Message) {
  333. wg := sync.WaitGroup{}
  334. wg.Add(len(subscribedTopic.tokens))
  335. for _, token := range subscribedTopic.tokens {
  336. go func(token *SubscribeToken, message mqtt.Message) {
  337. defer func() {
  338. token.handleCount++
  339. wg.Done()
  340. }()
  341. err := token.messageHandler(client, token, subscribedTopic.topic, message.Payload())
  342. if err != nil && !errors.Is(err, ErrMessageIgnore) {
  343. fslog.Error(err)
  344. return
  345. }
  346. if err != nil && errors.Is(err, ErrMessageIgnore) {
  347. return
  348. }
  349. token.successHandleCount++
  350. }(token, message)
  351. }
  352. wg.Wait()
  353. })
  354. if !token.WaitTimeout(time.Duration(client.opts.WriteTimeoutSec) * time.Second) {
  355. return errors.New("订阅超时")
  356. }
  357. if token.Error() != nil {
  358. return token.Error()
  359. }
  360. return nil
  361. }
  362. func (client *Client) addSubscribedTopic(topic string, handler MessageHandler, addNew func(subscribedTopic *subscribeTopic) error) (*SubscribeToken, error) {
  363. client.topicsMutex.Lock()
  364. defer client.topicsMutex.Unlock()
  365. newToken := &SubscribeToken{messageHandler: handler}
  366. for _, savedTopic := range client.topics {
  367. if savedTopic.topic == topic {
  368. savedTopic.tokens = append(savedTopic.tokens, newToken)
  369. return newToken, nil
  370. }
  371. }
  372. subscribedTopic := &subscribeTopic{
  373. topic: topic,
  374. tokens: []*SubscribeToken{newToken},
  375. }
  376. err := addNew(subscribedTopic)
  377. if err != nil {
  378. return nil, err
  379. }
  380. client.topics = append(client.topics, subscribedTopic)
  381. return newToken, nil
  382. }
  383. func (client *Client) removeSubscribedTopic(topic string, token *SubscribeToken, noTokens func(subscribedTopic *subscribeTopic) error) error {
  384. client.topicsMutex.Lock()
  385. defer client.topicsMutex.Unlock()
  386. findSubscribeTopicIndex := -1
  387. for index, subscribedTopic := range client.topics {
  388. if subscribedTopic.topic == topic {
  389. findSubscribeTopicIndex = index
  390. }
  391. }
  392. if findSubscribeTopicIndex == -1 {
  393. return nil
  394. }
  395. subscribedTopic := client.topics[findSubscribeTopicIndex]
  396. if subscribedTopic.tokens != nil && len(subscribedTopic.tokens) == 1 {
  397. err := noTokens(subscribedTopic)
  398. if err != nil {
  399. return err
  400. }
  401. client.topics = append(client.topics[:findSubscribeTopicIndex], client.topics[findSubscribeTopicIndex+1:]...)
  402. return nil
  403. }
  404. findTokenIndex := -1
  405. for index, savedToken := range subscribedTopic.tokens {
  406. if savedToken == token {
  407. findTokenIndex = index
  408. }
  409. }
  410. if findTokenIndex == -1 {
  411. return nil
  412. }
  413. subscribedTopic.tokens = append(subscribedTopic.tokens[:findTokenIndex], subscribedTopic.tokens[findTokenIndex+1:]...)
  414. return nil
  415. }
  416. func (client *Client) rangeSubscribedTopics(rangeFunc func(subscribedTopic *subscribeTopic) error) error {
  417. client.topicsMutex.Lock()
  418. defer client.topicsMutex.Unlock()
  419. for _, subscribedTopic := range client.topics {
  420. tempTopic := subscribedTopic
  421. err := rangeFunc(tempTopic)
  422. if err != nil {
  423. return err
  424. }
  425. }
  426. return nil
  427. }