123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531 |
- package mqtt_client
- import (
- "encoding/json"
- "errors"
- "git.sxidc.com/go-tools/utils/strutils"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- "log"
- "sync"
- "time"
- )
- var (
- ErrMessageIgnore = errors.New("mqtt消息忽略")
- )
- type MessageHandler func(client *Client, token *SubscribeToken, topic string, data []byte) error
- type ClientOptions struct {
- UserName string
- Password string
- Address string
- ClientID string
- KeepAliveSec int64
- PingTimeoutSec int64
- WriteTimeoutSec int64
- }
- func (opts *ClientOptions) check() error {
- if strutils.IsStringEmpty(opts.UserName) {
- return errors.New("必须传递用户名")
- }
- if strutils.IsStringEmpty(opts.Password) {
- return errors.New("必须传递密码")
- }
- if strutils.IsStringEmpty(opts.Address) {
- return errors.New("必须传递地址")
- }
- if strutils.IsStringEmpty(opts.ClientID) {
- return errors.New("必须传递客户端ID")
- }
- return nil
- }
- type SubscribeToken struct {
- messageHandler MessageHandler
- successHandleCount int
- handleCount int
- }
- func (token *SubscribeToken) SuccessHandleCount() int {
- return token.successHandleCount
- }
- func (token *SubscribeToken) HandleCount() int {
- return token.handleCount
- }
- type subscribeTopic struct {
- topic string
- tokens []*SubscribeToken
- }
- type Client struct {
- client mqtt.Client
- opts *ClientOptions
- topicsMutex *sync.Mutex
- topics []*subscribeTopic
- publishAndReceiveTopicMapMutex sync.Mutex
- publishAndReceiveTopicMap map[string]chan any
- }
- func New(opts *ClientOptions) (*Client, error) {
- if opts == nil {
- return nil, errors.New("必须传递参数")
- }
- err := opts.check()
- if err != nil {
- return nil, err
- }
- if opts.WriteTimeoutSec == 0 {
- opts.WriteTimeoutSec = 60
- }
- client := &Client{
- opts: opts,
- topicsMutex: &sync.Mutex{},
- topics: make([]*subscribeTopic, 0),
- publishAndReceiveTopicMap: make(map[string]chan any),
- }
- mqttClient := mqtt.NewClient(mqtt.NewClientOptions().
- SetAutoReconnect(true).
- SetUsername(opts.UserName).
- SetPassword(opts.Password).
- AddBroker(opts.Address).
- SetClientID(opts.ClientID).
- SetKeepAlive(time.Duration(opts.KeepAliveSec)*time.Second).
- SetPingTimeout(time.Duration(opts.PingTimeoutSec)*time.Second).
- SetWriteTimeout(time.Duration(opts.WriteTimeoutSec)*time.Second).
- SetOrderMatters(false).
- SetWill(opts.ClientID+"/will", "dead", 2, true).
- SetOnConnectHandler(func(mqttClient mqtt.Client) {
- err := client.subscribeAll()
- if err != nil {
- log.Println("SetOnConnectHandler:", err.Error())
- return
- }
- }))
- token := mqttClient.Connect()
- if !token.WaitTimeout(time.Duration(opts.WriteTimeoutSec) * time.Second) {
- return nil, errors.New("连接超时")
- }
- if token.Error() != nil {
- return nil, token.Error()
- }
- client.client = mqttClient
- return client, nil
- }
- func Destroy(client *Client) {
- client.client.Disconnect(250)
- client.topicsMutex.Lock()
- for _, subscribedTopic := range client.topics {
- token := client.client.Unsubscribe(subscribedTopic.topic)
- if !token.WaitTimeout(time.Duration(client.opts.WriteTimeoutSec) * time.Second) {
- continue
- }
- if token.Error() != nil {
- log.Println("Destroy: ", token.Error())
- continue
- }
- }
- client.topics = nil
- client.topicsMutex.Unlock()
- client = nil
- }
- func (client *Client) Publish(topic string, qos byte, retained bool, payload any) error {
- if strutils.IsStringEmpty(topic) {
- return errors.New("没有传递发布主题")
- }
- if payload == nil {
- return errors.New("发布的payload不能为nil")
- }
- client.publish(topic, qos, retained, payload)
- return nil
- }
- func (client *Client) Subscribe(topic string, handlerFunc MessageHandler) (*SubscribeToken, error) {
- if strutils.IsStringEmpty(topic) {
- return nil, errors.New("没有传递订阅主题")
- }
- if handlerFunc == nil {
- return nil, errors.New("必须传递处理函数")
- }
- return client.subscribe(topic, handlerFunc)
- }
- func (client *Client) Unsubscribe(topic string, token *SubscribeToken) error {
- if strutils.IsStringEmpty(topic) {
- return errors.New("没有传递取消订阅主题")
- }
- return client.unsubscribe(topic, token)
- }
- type PublishAndReceiveReplyParams struct {
- Topic string
- ReplyTopic string
- PublishData []byte
- RepublishDurationSec int64
- TryTimes int
- StopBefore bool
- }
- func (params *PublishAndReceiveReplyParams) Check() error {
- if strutils.IsStringEmpty(params.Topic) {
- return errors.New("没有传递订阅主题")
- }
- if strutils.IsStringEmpty(params.ReplyTopic) {
- return errors.New("没有传递响应订阅主题")
- }
- if params.PublishData == nil {
- return errors.New("发布的数据不能为nil")
- }
- return nil
- }
- type msgResponse struct {
- Success bool `json:"success"`
- Msg string `json:"msg"`
- }
- func (client *Client) PublishAndReceiveReplyMsgResponse(params *PublishAndReceiveReplyParams) error {
- err := params.Check()
- if err != nil {
- return err
- }
- err = client.publishAndReceiveReply(params, func(payload []byte) error {
- resp := new(msgResponse)
- err := json.Unmarshal(payload, resp)
- if err != nil {
- return err
- }
- if !resp.Success {
- return errors.New(resp.Msg)
- }
- return nil
- })
- if err != nil {
- return err
- }
- return nil
- }
- func (client *Client) PublishAndReceiveReply(params *PublishAndReceiveReplyParams, payloadDealFunc func(payload []byte) error) error {
- err := params.Check()
- if err != nil {
- return err
- }
- return client.publishAndReceiveReply(params, payloadDealFunc)
- }
- func (client *Client) publish(topic string, qos byte, retained bool, payload any) {
- client.waitConnected()
- token := client.client.Publish(topic, qos, retained, payload)
- if !token.WaitTimeout(time.Duration(client.opts.WriteTimeoutSec) * time.Second) {
- log.Println("发布超时")
- return
- }
- if token.Error() != nil {
- log.Println("publish:", token.Error())
- return
- }
- }
- func (client *Client) subscribe(topic string, handlerFunc MessageHandler) (*SubscribeToken, error) {
- client.waitConnected()
- return client.addSubscribedTopic(topic, handlerFunc, func(subscribedTopic *subscribeTopic) error {
- return client.doSubscribe(subscribedTopic)
- })
- }
- func (client *Client) unsubscribe(topic string, token *SubscribeToken) error {
- client.waitConnected()
- err := client.removeSubscribedTopic(topic, token, func(subscribedTopic *subscribeTopic) error {
- token := client.client.Unsubscribe(subscribedTopic.topic)
- if !token.WaitTimeout(time.Duration(client.opts.WriteTimeoutSec) * time.Second) {
- return errors.New("取消订阅超时")
- }
- if token.Error() != nil {
- return token.Error()
- }
- return nil
- })
- if err != nil {
- return err
- }
- return nil
- }
- func (client *Client) publishAndReceiveReply(params *PublishAndReceiveReplyParams, payloadDealFunc func(payload []byte) error) error {
- doneChan := make(chan any)
- if params.StopBefore {
- client.publishAndReceiveTopicMapMutex.Lock()
- existDoneChan, ok := client.publishAndReceiveTopicMap[params.Topic]
- if ok {
- close(existDoneChan)
- delete(client.publishAndReceiveTopicMap, params.Topic)
- } else {
- client.publishAndReceiveTopicMap[params.Topic] = doneChan
- }
- client.publishAndReceiveTopicMapMutex.Unlock()
- }
- // 订阅响应主题
- token, err := client.subscribe(params.ReplyTopic, func(c *Client, token *SubscribeToken, topic string, data []byte) error {
- defer func() {
- client.publishAndReceiveTopicMapMutex.Lock()
- existDoneChan, ok := client.publishAndReceiveTopicMap[params.Topic]
- if ok {
- close(existDoneChan)
- delete(client.publishAndReceiveTopicMap, params.Topic)
- }
- client.publishAndReceiveTopicMapMutex.Unlock()
- }()
- if token.SuccessHandleCount() >= 1 {
- return ErrMessageIgnore
- }
- if payloadDealFunc != nil {
- err := payloadDealFunc(data)
- if err != nil {
- return err
- }
- }
- return nil
- })
- if err != nil {
- return err
- }
- go func() {
- client.publish(params.Topic, 2, false, params.PublishData)
- currentTryTime := 1
- timer := time.NewTimer(time.Duration(params.RepublishDurationSec) * time.Second)
- defer timer.Stop()
- for {
- select {
- case <-doneChan:
- err := client.unsubscribe(params.ReplyTopic, token)
- if err != nil {
- log.Println("publishAndReceiveReply done:", err.Error())
- return
- }
- client.publishAndReceiveTopicMapMutex.Lock()
- delete(client.publishAndReceiveTopicMap, params.Topic)
- client.publishAndReceiveTopicMapMutex.Unlock()
- return
- case <-timer.C:
- client.publish(params.Topic, 2, false, params.PublishData)
- if params.TryTimes != 0 {
- currentTryTime++
- }
- if params.TryTimes != 0 && currentTryTime > params.TryTimes {
- client.publishAndReceiveTopicMapMutex.Lock()
- existDoneChan, ok := client.publishAndReceiveTopicMap[params.Topic]
- if ok {
- close(existDoneChan)
- delete(client.publishAndReceiveTopicMap, params.Topic)
- }
- client.publishAndReceiveTopicMapMutex.Unlock()
- return
- }
- resetDuration := time.Duration(params.RepublishDurationSec*int64(currentTryTime)) * time.Second
- timer.Reset(resetDuration)
- }
- }
- }()
- return nil
- }
- func (client *Client) waitConnected() {
- for {
- if client.client.IsConnected() {
- break
- }
- time.Sleep(1 * time.Second)
- }
- }
- func (client *Client) subscribeAll() error {
- err := client.rangeSubscribedTopics(func(subscribedTopic *subscribeTopic) error {
- return client.doSubscribe(subscribedTopic)
- })
- if err != nil {
- return err
- }
- return nil
- }
- func (client *Client) doSubscribe(subscribedTopic *subscribeTopic) error {
- token := client.client.Subscribe(subscribedTopic.topic, 2, func(mqttClient mqtt.Client, message mqtt.Message) {
- wg := sync.WaitGroup{}
- wg.Add(len(subscribedTopic.tokens))
- for _, token := range subscribedTopic.tokens {
- go func(token *SubscribeToken, message mqtt.Message) {
- defer func() {
- token.handleCount++
- wg.Done()
- }()
- err := token.messageHandler(client, token, subscribedTopic.topic, message.Payload())
- if err != nil && !errors.Is(err, ErrMessageIgnore) {
- log.Println("doSubscribe token.messageHandler:", err.Error())
- return
- }
- if err != nil && errors.Is(err, ErrMessageIgnore) {
- return
- }
- token.successHandleCount++
- }(token, message)
- }
- wg.Wait()
- })
- if !token.WaitTimeout(time.Duration(client.opts.WriteTimeoutSec) * time.Second) {
- return errors.New("订阅超时")
- }
- if token.Error() != nil {
- return token.Error()
- }
- return nil
- }
- func (client *Client) addSubscribedTopic(topic string, handler MessageHandler, addNew func(subscribedTopic *subscribeTopic) error) (*SubscribeToken, error) {
- client.topicsMutex.Lock()
- defer client.topicsMutex.Unlock()
- newToken := &SubscribeToken{messageHandler: handler}
- for _, savedTopic := range client.topics {
- if savedTopic.topic == topic {
- savedTopic.tokens = append(savedTopic.tokens, newToken)
- return newToken, nil
- }
- }
- subscribedTopic := &subscribeTopic{
- topic: topic,
- tokens: []*SubscribeToken{newToken},
- }
- err := addNew(subscribedTopic)
- if err != nil {
- return nil, err
- }
- client.topics = append(client.topics, subscribedTopic)
- return newToken, nil
- }
- func (client *Client) removeSubscribedTopic(topic string, token *SubscribeToken, noTokens func(subscribedTopic *subscribeTopic) error) error {
- client.topicsMutex.Lock()
- defer client.topicsMutex.Unlock()
- findSubscribeTopicIndex := -1
- for index, subscribedTopic := range client.topics {
- if subscribedTopic.topic == topic {
- findSubscribeTopicIndex = index
- }
- }
- if findSubscribeTopicIndex == -1 {
- return nil
- }
- subscribedTopic := client.topics[findSubscribeTopicIndex]
- if subscribedTopic.tokens != nil && len(subscribedTopic.tokens) == 1 {
- err := noTokens(subscribedTopic)
- if err != nil {
- return err
- }
- client.topics = append(client.topics[:findSubscribeTopicIndex], client.topics[findSubscribeTopicIndex+1:]...)
- return nil
- }
- findTokenIndex := -1
- for index, savedToken := range subscribedTopic.tokens {
- if savedToken == token {
- findTokenIndex = index
- }
- }
- if findTokenIndex == -1 {
- return nil
- }
- subscribedTopic.tokens = append(subscribedTopic.tokens[:findTokenIndex], subscribedTopic.tokens[findTokenIndex+1:]...)
- return nil
- }
- func (client *Client) rangeSubscribedTopics(rangeFunc func(subscribedTopic *subscribeTopic) error) error {
- client.topicsMutex.Lock()
- defer client.topicsMutex.Unlock()
- for _, subscribedTopic := range client.topics {
- tempTopic := subscribedTopic
- err := rangeFunc(tempTopic)
- if err != nil {
- return err
- }
- }
- return nil
- }
|