yjp 1 year ago
parent
commit
45be3754cd
1 changed files with 21 additions and 3 deletions
  1. 21 3
      mqtt_binding/mqtt_client/mqtt_client.go

+ 21 - 3
mqtt_binding/mqtt_client/mqtt_client.go

@@ -49,6 +49,7 @@ type subscribePayload struct {
 
 type MqttClient struct {
 	client mqtt.Client
+	opts   *MqttClientOptions
 
 	routersMutex *sync.Mutex
 	routers      []*router.Router
@@ -75,7 +76,12 @@ func NewMqttClient(opts *MqttClientOptions) (*MqttClient, error) {
 		opts.SubscribeRoutineCount = 1024
 	}
 
+	if opts.WriteTimeoutSec == 0 {
+		opts.WriteTimeoutSec = 60
+	}
+
 	mqttClient := &MqttClient{
+		opts:              opts,
 		routersMutex:      &sync.Mutex{},
 		routers:           make([]*router.Router, 0),
 		subscribeChan:     make(chan *subscribePayload, opts.SubscribeRoutineCount),
@@ -139,7 +145,11 @@ func DestroyMqttClient(c *MqttClient) {
 
 func (c *MqttClient) Connect() error {
 	token := c.client.Connect()
-	if token.Wait(); token.Error() != nil {
+	if !token.WaitTimeout(time.Duration(c.opts.WriteTimeoutSec) * time.Second) {
+		return errors.New("连接超时")
+	}
+
+	if token.Error() != nil {
 		return token.Error()
 	}
 
@@ -177,7 +187,11 @@ func (c *MqttClient) GetRouter(group string, handlers []router.Handler) *router.
 
 func (c *MqttClient) Response(item *router.Item, data []byte) error {
 	token := c.client.Publish(item.Topic+"/reply", 2, false, data)
-	if token.Wait(); token.Error() != nil {
+	if !token.WaitTimeout(time.Duration(c.opts.WriteTimeoutSec) * time.Second) {
+		return errors.New("发布超时")
+	}
+
+	if token.Error() != nil {
 		return token.Error()
 	}
 
@@ -256,7 +270,11 @@ func (c *MqttClient) subscribe(item *router.Item) error {
 			payload: message.Payload(),
 		}
 	})
-	if token.Wait(); token.Error() != nil {
+	if !token.WaitTimeout(time.Duration(c.opts.WriteTimeoutSec) * time.Second) {
+		return errors.New("订阅超时")
+	}
+
+	if token.Error() != nil {
 		return token.Error()
 	}