|
@@ -12,7 +12,7 @@ import (
|
|
)
|
|
)
|
|
|
|
|
|
var (
|
|
var (
|
|
- ErrMqttMessageIgnore = errors.New("mqtt消息忽略")
|
|
+ ErrMessageIgnore = errors.New("mqtt消息忽略")
|
|
)
|
|
)
|
|
|
|
|
|
type MessageHandler func(client *Client, token *SubscribeToken, topic string, data []byte) error
|
|
type MessageHandler func(client *Client, token *SubscribeToken, topic string, data []byte) error
|
|
@@ -272,7 +272,7 @@ func (client *Client) subscribe(topic string, handlerFunc MessageHandler) (*Subs
|
|
client.waitConnected()
|
|
client.waitConnected()
|
|
|
|
|
|
return client.addSubscribedTopic(topic, handlerFunc, func(subscribedTopic *subscribeTopic) error {
|
|
return client.addSubscribedTopic(topic, handlerFunc, func(subscribedTopic *subscribeTopic) error {
|
|
- return client.subscribeMqtt(subscribedTopic)
|
|
+ return client.doSubscribe(subscribedTopic)
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
|
|
@@ -326,7 +326,7 @@ func (client *Client) publishAndReceiveReply(params *PublishAndReceiveReplyParam
|
|
}()
|
|
}()
|
|
|
|
|
|
if token.SuccessHandleCount() >= 1 {
|
|
if token.SuccessHandleCount() >= 1 {
|
|
- return ErrMqttMessageIgnore
|
|
+ return ErrMessageIgnore
|
|
}
|
|
}
|
|
|
|
|
|
if payloadDealFunc != nil {
|
|
if payloadDealFunc != nil {
|
|
@@ -402,7 +402,7 @@ func (client *Client) waitConnected() {
|
|
|
|
|
|
func (client *Client) subscribeAll() error {
|
|
func (client *Client) subscribeAll() error {
|
|
err := client.rangeSubscribedTopics(func(subscribedTopic *subscribeTopic) error {
|
|
err := client.rangeSubscribedTopics(func(subscribedTopic *subscribeTopic) error {
|
|
- return client.subscribeMqtt(subscribedTopic)
|
|
+ return client.doSubscribe(subscribedTopic)
|
|
})
|
|
})
|
|
if err != nil {
|
|
if err != nil {
|
|
return err
|
|
return err
|
|
@@ -411,7 +411,7 @@ func (client *Client) subscribeAll() error {
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
|
|
|
|
-func (client *Client) subscribeMqtt(subscribedTopic *subscribeTopic) error {
|
|
+func (client *Client) doSubscribe(subscribedTopic *subscribeTopic) error {
|
|
token := client.client.Subscribe(subscribedTopic.topic, 2, func(mqttClient mqtt.Client, message mqtt.Message) {
|
|
token := client.client.Subscribe(subscribedTopic.topic, 2, func(mqttClient mqtt.Client, message mqtt.Message) {
|
|
wg := sync.WaitGroup{}
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(len(subscribedTopic.tokens))
|
|
wg.Add(len(subscribedTopic.tokens))
|
|
@@ -423,12 +423,12 @@ func (client *Client) subscribeMqtt(subscribedTopic *subscribeTopic) error {
|
|
}()
|
|
}()
|
|
|
|
|
|
err := token.messageHandler(client, token, subscribedTopic.topic, message.Payload())
|
|
err := token.messageHandler(client, token, subscribedTopic.topic, message.Payload())
|
|
- if err != nil && !errors.Is(err, ErrMqttMessageIgnore) {
|
|
+ if err != nil && !errors.Is(err, ErrMessageIgnore) {
|
|
fslog.Error(err)
|
|
fslog.Error(err)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
- if err != nil && errors.Is(err, ErrMqttMessageIgnore) {
|
|
+ if err != nil && errors.Is(err, ErrMessageIgnore) {
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|