123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- package api_binding
- import (
- "encoding/json"
- "fmt"
- "git.sxidc.com/go-tools/api_binding/mqtt_binding"
- "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client"
- "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router"
- "git.sxidc.com/go-tools/api_binding/mqtt_binding/request"
- "git.sxidc.com/go-tools/api_binding/mqtt_binding/response"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- "testing"
- "time"
- )
- type ResponseIdentifier struct {
- ResponseIdentifier string `json:"responseIdentifier" validate:"required"`
- }
- func (responseIdentifier *ResponseIdentifier) Identifier() string {
- return responseIdentifier.ResponseIdentifier
- }
- func (responseIdentifier *ResponseIdentifier) Copy() response.Identifier {
- return &ResponseIdentifier{}
- }
- var bindingResponseIdentifier = &request.CallerIdentifierRequest{CallerIdentifier: "binding"}
- var itemResponseIdentifier = &ResponseIdentifier{ResponseIdentifier: "item"}
- //var itemResponseIdentifier *ResponseIdentifier
- func TestMqttBinding(t *testing.T) {
- err := mqtt_binding.Init("test_prefix", &mqtt_client.MqttClientOptions{
- UserName: "admin",
- Password: "mtyzxhc",
- Address: "tcp://127.0.0.1:1883",
- ClientID: "test-binding",
- KeepAliveSec: 5,
- PingTimeoutSec: 5,
- })
- if err != nil {
- t.Fatal(err)
- }
- defer mqtt_binding.Destroy()
- testBinding := mqtt_binding.NewBinding("test", bindingResponseIdentifier, func(item *router.Item, data []byte) {
- fmt.Println("Global Middleware!!!")
- item.Next()
- })
- mqtt_binding.Bind(testBinding, &mqtt_binding.BindItem[struct {
- Time string `json:"time"`
- }, map[string]interface{}]{
- Topic: "/test-topic",
- ResponseIdentifier: &ResponseIdentifier{},
- ResponseFunc: response.SendMapResponse,
- BusinessFunc: func(c *mqtt_client.MqttClient, inputModel struct {
- Time string `json:"time"`
- }) (map[string]interface{}, error) {
- fmt.Printf("Received: %v\n", inputModel)
- return map[string]interface{}{
- "result": "pong",
- }, nil
- },
- }, func(item *router.Item, data []byte) {
- fmt.Println("Binding Middleware!!!")
- item.Next()
- })
- waitChan := make(chan []byte)
- mqttClient := mqtt.NewClient(mqtt.NewClientOptions().
- SetAutoReconnect(true).
- SetUsername("admin").
- SetPassword("mtyzxhc").
- AddBroker("tcp://127.0.0.1:1883").
- SetClientID("test-client").
- SetKeepAlive(5*time.Second).
- SetPingTimeout(5*time.Second).
- SetWill("test-client/will", "dead", 2, true).
- SetOnConnectHandler(func(client mqtt.Client) {
- replyTopic := "test_prefix/test/test-topic/reply"
- if itemResponseIdentifier != nil {
- replyTopic = "test_prefix/test/test-topic/" + itemResponseIdentifier.Identifier() + "/reply"
- } else if bindingResponseIdentifier != nil {
- replyTopic = "test_prefix/test/test-topic/" + bindingResponseIdentifier.Identifier() + "/reply"
- }
- fmt.Println("Reply Topic: " + replyTopic)
- token := client.Subscribe(replyTopic, 2, func(client mqtt.Client, message mqtt.Message) {
- waitChan <- message.Payload()
- })
- if token.Wait(); token.Error() != nil {
- fmt.Println(token.Error())
- return
- }
- sendMap := map[string]any{
- "time": time.Now().Format(time.DateTime),
- }
- if itemResponseIdentifier != nil {
- sendMap["responseIdentifier"] = itemResponseIdentifier.Identifier()
- } else if bindingResponseIdentifier != nil {
- sendMap["callerIdentifier"] = bindingResponseIdentifier.Identifier()
- }
- sendJson, err := json.Marshal(sendMap)
- if token.Wait(); token.Error() != nil {
- fmt.Println(err)
- return
- }
- token = client.Publish("test_prefix/test/test-topic", 2, false, sendJson)
- if token.Wait(); token.Error() != nil {
- fmt.Println(token.Error())
- return
- }
- }))
- token := mqttClient.Connect()
- if token.Wait(); token.Error() != nil {
- fmt.Println(token.Error())
- return
- }
- defer mqttClient.Disconnect(250)
- data := <-waitChan
- respMap := make(map[string]interface{})
- err = json.Unmarshal(data, &respMap)
- if err != nil {
- fmt.Println(err)
- return
- }
- pong, ok := respMap["result"]
- if !ok {
- fmt.Println("响应错误")
- return
- }
- if pong != "pong" {
- fmt.Println("响应错误")
- return
- }
- fmt.Println(respMap)
- }
|