mqtt_binding_test.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package api_binding
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "git.sxidc.com/go-tools/api_binding/mqtt_binding"
  6. "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client"
  7. "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router"
  8. "git.sxidc.com/go-tools/api_binding/mqtt_binding/request"
  9. "git.sxidc.com/go-tools/api_binding/mqtt_binding/response"
  10. mqtt "github.com/eclipse/paho.mqtt.golang"
  11. "testing"
  12. "time"
  13. )
  14. type ResponseIdentifier struct {
  15. ResponseIdentifier string `json:"responseIdentifier" validate:"required"`
  16. }
  17. func (responseIdentifier *ResponseIdentifier) Identifier() string {
  18. return responseIdentifier.ResponseIdentifier
  19. }
  20. func (responseIdentifier *ResponseIdentifier) Copy() response.Identifier {
  21. return &ResponseIdentifier{}
  22. }
  23. var bindingResponseIdentifier = &request.CallerIdentifierRequest{CallerIdentifier: "binding"}
  24. var itemResponseIdentifier = &ResponseIdentifier{ResponseIdentifier: "item"}
  25. //var itemResponseIdentifier *ResponseIdentifier
  26. func TestMqttBinding(t *testing.T) {
  27. err := mqtt_binding.Init("test_prefix", &mqtt_client.MqttClientOptions{
  28. UserName: "admin",
  29. Password: "mtyzxhc",
  30. Address: "tcp://127.0.0.1:1883",
  31. ClientID: "test-binding",
  32. KeepAliveSec: 5,
  33. PingTimeoutSec: 5,
  34. })
  35. if err != nil {
  36. t.Fatal(err)
  37. }
  38. defer mqtt_binding.Destroy()
  39. testBinding := mqtt_binding.NewBinding("test", bindingResponseIdentifier, func(item *router.Item, data []byte) {
  40. fmt.Println("Global Middleware!!!")
  41. item.Next()
  42. })
  43. mqtt_binding.Bind(testBinding, &mqtt_binding.BindItem[struct {
  44. Time string `json:"time"`
  45. }, map[string]interface{}]{
  46. Topic: "/test-topic",
  47. ResponseIdentifier: &ResponseIdentifier{},
  48. ResponseFunc: response.SendMapResponse,
  49. BusinessFunc: func(c *mqtt_client.MqttClient, inputModel struct {
  50. Time string `json:"time"`
  51. }) (map[string]interface{}, error) {
  52. fmt.Printf("Received: %v\n", inputModel)
  53. return map[string]interface{}{
  54. "result": "pong",
  55. }, nil
  56. },
  57. }, func(item *router.Item, data []byte) {
  58. fmt.Println("Binding Middleware!!!")
  59. item.Next()
  60. })
  61. waitChan := make(chan []byte)
  62. mqttClient := mqtt.NewClient(mqtt.NewClientOptions().
  63. SetAutoReconnect(true).
  64. SetUsername("admin").
  65. SetPassword("mtyzxhc").
  66. AddBroker("tcp://127.0.0.1:1883").
  67. SetClientID("test-client").
  68. SetKeepAlive(5*time.Second).
  69. SetPingTimeout(5*time.Second).
  70. SetWill("test-client/will", "dead", 2, true).
  71. SetOnConnectHandler(func(client mqtt.Client) {
  72. replyTopic := "test_prefix/test/test-topic/reply"
  73. if itemResponseIdentifier != nil {
  74. replyTopic = "test_prefix/test/test-topic/" + itemResponseIdentifier.Identifier() + "/reply"
  75. } else if bindingResponseIdentifier != nil {
  76. replyTopic = "test_prefix/test/test-topic/" + bindingResponseIdentifier.Identifier() + "/reply"
  77. }
  78. fmt.Println("Reply Topic: " + replyTopic)
  79. token := client.Subscribe(replyTopic, 2, func(client mqtt.Client, message mqtt.Message) {
  80. waitChan <- message.Payload()
  81. })
  82. if token.Wait(); token.Error() != nil {
  83. fmt.Println(token.Error())
  84. return
  85. }
  86. sendMap := map[string]any{
  87. "time": time.Now().Format(time.DateTime),
  88. }
  89. if itemResponseIdentifier != nil {
  90. sendMap["responseIdentifier"] = itemResponseIdentifier.Identifier()
  91. } else if bindingResponseIdentifier != nil {
  92. sendMap["callerIdentifier"] = bindingResponseIdentifier.Identifier()
  93. }
  94. sendJson, err := json.Marshal(sendMap)
  95. if token.Wait(); token.Error() != nil {
  96. fmt.Println(err)
  97. return
  98. }
  99. token = client.Publish("test_prefix/test/test-topic", 2, false, sendJson)
  100. if token.Wait(); token.Error() != nil {
  101. fmt.Println(token.Error())
  102. return
  103. }
  104. }))
  105. token := mqttClient.Connect()
  106. if token.Wait(); token.Error() != nil {
  107. fmt.Println(token.Error())
  108. return
  109. }
  110. defer mqttClient.Disconnect(250)
  111. data := <-waitChan
  112. respMap := make(map[string]interface{})
  113. err = json.Unmarshal(data, &respMap)
  114. if err != nil {
  115. fmt.Println(err)
  116. return
  117. }
  118. pong, ok := respMap["result"]
  119. if !ok {
  120. fmt.Println("响应错误")
  121. return
  122. }
  123. if pong != "pong" {
  124. fmt.Println("响应错误")
  125. return
  126. }
  127. fmt.Println(respMap)
  128. }