Browse Source

修改bug

yjp 1 year ago
parent
commit
d1f0cd69a0
1 changed files with 48 additions and 10 deletions
  1. 48 10
      mqtt_binding/mqtt_client/mqtt_client.go

+ 48 - 10
mqtt_binding/mqtt_client/mqtt_client.go

@@ -11,13 +11,14 @@ import (
 )
 
 type MqttClientOptions struct {
-	UserName        string
-	Password        string
-	Address         string
-	ClientID        string
-	KeepAliveSec    int64
-	PingTimeoutSec  int64
-	WriteTimeoutSec int64
+	UserName              string
+	Password              string
+	Address               string
+	ClientID              string
+	KeepAliveSec          int64
+	PingTimeoutSec        int64
+	WriteTimeoutSec       int64
+	SubscribeRoutineCount int
 }
 
 func (opt *MqttClientOptions) check() error {
@@ -37,14 +38,26 @@ func (opt *MqttClientOptions) check() error {
 		return errors.New("必须传递客户端ID")
 	}
 
+	if opt.SubscribeRoutineCount == 0 {
+		opt.SubscribeRoutineCount = 10
+	}
+
 	return nil
 }
 
+type subscribePayload struct {
+	item    *router.Item
+	payload []byte
+}
+
 type MqttClient struct {
 	client mqtt.Client
 
 	routersMutex *sync.Mutex
 	routers      []*router.Router
+
+	subscribeChan     chan *subscribePayload
+	subscribeDoneChan chan any
 }
 
 func NewMqttClient(opts *MqttClientOptions) (*MqttClient, error) {
@@ -58,8 +71,10 @@ func NewMqttClient(opts *MqttClientOptions) (*MqttClient, error) {
 	}
 
 	mqttClient := &MqttClient{
-		routersMutex: &sync.Mutex{},
-		routers:      make([]*router.Router, 0),
+		routersMutex:      &sync.Mutex{},
+		routers:           make([]*router.Router, 0),
+		subscribeChan:     make(chan *subscribePayload, 1024),
+		subscribeDoneChan: make(chan any),
 	}
 
 	mqttOptions := mqtt.NewClientOptions().
@@ -89,11 +104,21 @@ func NewMqttClient(opts *MqttClientOptions) (*MqttClient, error) {
 
 	mqttClient.client = mqtt.NewClient(mqttOptions)
 
+	for i := 0; i < opts.SubscribeRoutineCount; i++ {
+		go mqttClient.subscribeRoutine()
+	}
+
 	return mqttClient, nil
 }
 
 func DestroyMqttClient(c *MqttClient) {
 	if c != nil {
+		close(c.subscribeDoneChan)
+		c.subscribeDoneChan = nil
+
+		close(c.subscribeChan)
+		c.subscribeChan = nil
+
 		c.client = nil
 
 		c.routersMutex.Lock()
@@ -221,7 +246,10 @@ func (c *MqttClient) rangeRouters(rangeFunc func(router *router.Router) error) e
 
 func (c *MqttClient) subscribe(item *router.Item) error {
 	token := c.client.Subscribe(item.Topic, 2, func(client mqtt.Client, message mqtt.Message) {
-		item.CallHandlers(message.Payload())
+		c.subscribeChan <- &subscribePayload{
+			item:    item,
+			payload: message.Payload(),
+		}
 	})
 	if token.Wait(); token.Error() != nil {
 		return token.Error()
@@ -231,3 +259,13 @@ func (c *MqttClient) subscribe(item *router.Item) error {
 
 	return nil
 }
+
+func (c *MqttClient) subscribeRoutine() {
+	for {
+		select {
+		case <-c.subscribeDoneChan:
+		case payload := <-c.subscribeChan:
+			payload.item.CallHandlers(payload.payload)
+		}
+	}
+}