package api_binding import ( "encoding/json" "fmt" "git.sxidc.com/go-tools/api_binding/mqtt_binding" "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client" "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router" "git.sxidc.com/go-tools/api_binding/mqtt_binding/response" mqtt "github.com/eclipse/paho.mqtt.golang" "sync" "testing" "time" ) func TestMqttBinding(t *testing.T) { err := mqtt_binding.Init("test_prefix", &mqtt_client.MqttClientOptions{ UserName: "admin", Password: "mtyzxhc", Address: "tcp://127.0.0.1:1883", ClientID: "test-binding", KeepAliveSec: 5, PingTimeoutSec: 5, }) if err != nil { t.Fatal(err) } defer mqtt_binding.Destroy() testBinding := mqtt_binding.NewBinding("test", func(item *router.Item, data []byte) { fmt.Println("Global Middleware!!!") item.Next() }) mqtt_binding.Bind(testBinding, &mqtt_binding.BindItem[struct { Time string `json:"time"` }, map[string]interface{}]{ Topic: "/test-topic", ResponseFunc: response.SendMapResponse, BusinessFunc: func(c *mqtt_client.MqttClient, inputModel struct { Time string `json:"time"` }) (map[string]interface{}, error) { fmt.Printf("Received: %v\n", inputModel) return map[string]interface{}{ "result": "pong", }, nil }, OptionalBindingFunc: nil, }, func(item *router.Item, data []byte) { fmt.Println("Binding Middleware!!!") item.Next() }) wg := sync.WaitGroup{} wg.Add(1) go func() { mqttClient := mqtt.NewClient(mqtt.NewClientOptions(). SetAutoReconnect(true). SetUsername("admin"). SetPassword("mtyzxhc"). AddBroker("tcp://127.0.0.1:1883"). SetClientID("test-client"). SetKeepAlive(5*time.Second). SetPingTimeout(5*time.Second). SetWill("test-client/will", "dead", 2, true). SetOnConnectHandler(func(client mqtt.Client) { token := client.Subscribe("test_prefix/test/test-topic/reply", 2, func(client mqtt.Client, message mqtt.Message) { respMap := make(map[string]interface{}) err = json.Unmarshal(message.Payload(), &respMap) if err != nil { fmt.Println(err) return } pong, ok := respMap["result"] if !ok { fmt.Println("响应错误") return } if pong != "pong" { fmt.Println("响应错误") return } }) if token.Wait(); token.Error() != nil { fmt.Println(token.Error()) return } sendMap := map[string]any{ "time": time.Now().Format(time.DateTime), } sendJson, err := json.Marshal(sendMap) if token.Wait(); token.Error() != nil { fmt.Println(err) return } token = client.Publish("test_prefix/test/test-topic", 2, false, sendJson) if token.Wait(); token.Error() != nil { fmt.Println(token.Error()) return } })) token := mqttClient.Connect() if token.Wait(); token.Error() != nil { fmt.Println(token.Error()) return } defer mqttClient.Disconnect(250) time.Sleep(2 * time.Second) wg.Done() }() wg.Wait() }