Browse Source

修改qos

yjp 1 year ago
parent
commit
7f400198c9
2 changed files with 67 additions and 71 deletions
  1. 1 1
      mqtt_binding/mqtt_client/mqtt_client.go
  2. 66 70
      mqtt_binding_test.go

+ 1 - 1
mqtt_binding/mqtt_client/mqtt_client.go

@@ -146,7 +146,7 @@ func (c *MqttClient) GetRouter(group string, handlers []router.Handler) *router.
 }
 
 func (c *MqttClient) Response(item *router.Item, data []byte) error {
-	token := c.client.Publish(item.Topic+"/reply", 0, false, data)
+	token := c.client.Publish(item.Topic+"/reply", 2, false, data)
 	if token.Wait(); token.Error() != nil {
 		return token.Error()
 	}

+ 66 - 70
mqtt_binding_test.go

@@ -8,7 +8,6 @@ import (
 	"git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router"
 	"git.sxidc.com/go-tools/api_binding/mqtt_binding/response"
 	mqtt "github.com/eclipse/paho.mqtt.golang"
-	"sync"
 	"testing"
 	"time"
 )
@@ -52,73 +51,70 @@ func TestMqttBinding(t *testing.T) {
 		item.Next()
 	})
 
-	wg := sync.WaitGroup{}
-	wg.Add(1)
-
-	go func() {
-		mqttClient := mqtt.NewClient(mqtt.NewClientOptions().
-			SetAutoReconnect(true).
-			SetUsername("admin").
-			SetPassword("mtyzxhc").
-			AddBroker("tcp://127.0.0.1:1883").
-			SetClientID("test-client").
-			SetKeepAlive(5*time.Second).
-			SetPingTimeout(5*time.Second).
-			SetWill("test-client/will", "dead", 2, true).
-			SetOnConnectHandler(func(client mqtt.Client) {
-				token := client.Subscribe("test_prefix/test/test-topic/reply", 2, func(client mqtt.Client, message mqtt.Message) {
-					respMap := make(map[string]interface{})
-					err = json.Unmarshal(message.Payload(), &respMap)
-					if err != nil {
-						fmt.Println(err)
-						return
-					}
-
-					pong, ok := respMap["result"]
-					if !ok {
-						fmt.Println("响应错误")
-						return
-					}
-
-					if pong != "pong" {
-						fmt.Println("响应错误")
-						return
-					}
-				})
-				if token.Wait(); token.Error() != nil {
-					fmt.Println(token.Error())
-					return
-				}
-
-				sendMap := map[string]any{
-					"time": time.Now().Format(time.DateTime),
-				}
-
-				sendJson, err := json.Marshal(sendMap)
-				if token.Wait(); token.Error() != nil {
-					fmt.Println(err)
-					return
-				}
-
-				token = client.Publish("test_prefix/test/test-topic", 2, false, sendJson)
-				if token.Wait(); token.Error() != nil {
-					fmt.Println(token.Error())
-					return
-				}
-			}))
-
-		token := mqttClient.Connect()
-		if token.Wait(); token.Error() != nil {
-			fmt.Println(token.Error())
-			return
-		}
-
-		defer mqttClient.Disconnect(250)
-
-		time.Sleep(2 * time.Second)
-
-		wg.Done()
-	}()
-
-	wg.Wait()
+	waitChan := make(chan []byte)
+
+	mqttClient := mqtt.NewClient(mqtt.NewClientOptions().
+		SetAutoReconnect(true).
+		SetUsername("admin").
+		SetPassword("mtyzxhc").
+		AddBroker("tcp://127.0.0.1:1883").
+		SetClientID("test-client").
+		SetKeepAlive(5*time.Second).
+		SetPingTimeout(5*time.Second).
+		SetWill("test-client/will", "dead", 2, true).
+		SetOnConnectHandler(func(client mqtt.Client) {
+			token := client.Subscribe("test_prefix/test/test-topic/reply", 2, func(client mqtt.Client, message mqtt.Message) {
+				waitChan <- message.Payload()
+			})
+			if token.Wait(); token.Error() != nil {
+				fmt.Println(token.Error())
+				return
+			}
+
+			sendMap := map[string]any{
+				"time": time.Now().Format(time.DateTime),
+			}
+
+			sendJson, err := json.Marshal(sendMap)
+			if token.Wait(); token.Error() != nil {
+				fmt.Println(err)
+				return
+			}
+
+			token = client.Publish("test_prefix/test/test-topic", 2, false, sendJson)
+			if token.Wait(); token.Error() != nil {
+				fmt.Println(token.Error())
+				return
+			}
+		}))
+
+	token := mqttClient.Connect()
+	if token.Wait(); token.Error() != nil {
+		fmt.Println(token.Error())
+		return
+	}
+
+	defer mqttClient.Disconnect(250)
+
+	data := <-waitChan
+
+	respMap := make(map[string]interface{})
+	err = json.Unmarshal(data, &respMap)
+	if err != nil {
+		fmt.Println(err)
+		return
+	}
+
+	pong, ok := respMap["result"]
+	if !ok {
+		fmt.Println("响应错误")
+		return
+	}
+
+	if pong != "pong" {
+		fmt.Println("响应错误")
+		return
+	}
+
+	fmt.Println(respMap)
 }