package api_binding import ( "encoding/json" "fmt" "git.sxidc.com/go-tools/api_binding/mqtt_binding" "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client" "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router" "git.sxidc.com/go-tools/api_binding/mqtt_binding/request" "git.sxidc.com/go-tools/api_binding/mqtt_binding/response" mqtt "github.com/eclipse/paho.mqtt.golang" "testing" "time" ) type ResponseIdentifier struct { ResponseIdentifier string `json:"responseIdentifier" validate:"required"` } func (responseIdentifier *ResponseIdentifier) Identifier() string { return responseIdentifier.ResponseIdentifier } func (responseIdentifier *ResponseIdentifier) Copy() response.Identifier { return &ResponseIdentifier{} } var bindingResponseIdentifier = &request.CallerIdentifierRequest{CallerIdentifier: "binding"} var itemResponseIdentifier = &ResponseIdentifier{ResponseIdentifier: "item"} //var itemResponseIdentifier *ResponseIdentifier func TestMqttBinding(t *testing.T) { err := mqtt_binding.Init("test_prefix", &mqtt_client.MqttClientOptions{ UserName: "admin", Password: "mtyzxhc", Address: "tcp://127.0.0.1:1883", ClientID: "test-binding", KeepAliveSec: 5, PingTimeoutSec: 5, }) if err != nil { t.Fatal(err) } defer mqtt_binding.Destroy() testBinding := mqtt_binding.NewBinding("test", bindingResponseIdentifier, func(item *router.Item, data []byte) { fmt.Println("Global Middleware!!!") item.Next() }) mqtt_binding.Bind(testBinding, &mqtt_binding.BindItem[struct { Time string `json:"time"` }, map[string]interface{}]{ Topic: "/test-topic", ResponseIdentifier: &ResponseIdentifier{}, ResponseFunc: response.SendMapResponse, BusinessFunc: func(c *mqtt_client.MqttClient, inputModel struct { Time string `json:"time"` }) (map[string]interface{}, error) { fmt.Printf("Received: %v\n", inputModel) return map[string]interface{}{ "result": "pong", }, nil }, }, func(item *router.Item, data []byte) { fmt.Println("Binding Middleware!!!") item.Next() }) waitChan := make(chan []byte) mqttClient := mqtt.NewClient(mqtt.NewClientOptions(). SetAutoReconnect(true). SetUsername("admin"). SetPassword("mtyzxhc"). AddBroker("tcp://127.0.0.1:1883"). SetClientID("test-client"). SetKeepAlive(5*time.Second). SetPingTimeout(5*time.Second). SetWill("test-client/will", "dead", 2, true). SetOnConnectHandler(func(client mqtt.Client) { replyTopic := "test_prefix/test/test-topic/reply" if itemResponseIdentifier != nil { replyTopic = "test_prefix/test/test-topic/" + itemResponseIdentifier.Identifier() + "/reply" } else if bindingResponseIdentifier != nil { replyTopic = "test_prefix/test/test-topic/" + bindingResponseIdentifier.Identifier() + "/reply" } fmt.Println("Reply Topic: " + replyTopic) token := client.Subscribe(replyTopic, 2, func(client mqtt.Client, message mqtt.Message) { waitChan <- message.Payload() }) if token.Wait(); token.Error() != nil { fmt.Println(token.Error()) return } sendMap := map[string]any{ "time": time.Now().Format(time.DateTime), } if itemResponseIdentifier != nil { sendMap["responseIdentifier"] = itemResponseIdentifier.Identifier() } else if bindingResponseIdentifier != nil { sendMap["callerIdentifier"] = bindingResponseIdentifier.Identifier() } sendJson, err := json.Marshal(sendMap) if token.Wait(); token.Error() != nil { fmt.Println(err) return } token = client.Publish("test_prefix/test/test-topic", 2, false, sendJson) if token.Wait(); token.Error() != nil { fmt.Println(token.Error()) return } })) token := mqttClient.Connect() if token.Wait(); token.Error() != nil { fmt.Println(token.Error()) return } defer mqttClient.Disconnect(250) data := <-waitChan respMap := make(map[string]interface{}) err = json.Unmarshal(data, &respMap) if err != nil { fmt.Println(err) return } pong, ok := respMap["result"] if !ok { fmt.Println("响应错误") return } if pong != "pong" { fmt.Println("响应错误") return } fmt.Println(respMap) }