mqtt_binding_test.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package api_binding
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "git.sxidc.com/go-tools/api_binding/mqtt_binding"
  6. "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client"
  7. "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router"
  8. "git.sxidc.com/go-tools/api_binding/mqtt_binding/response"
  9. mqtt "github.com/eclipse/paho.mqtt.golang"
  10. "testing"
  11. "time"
  12. )
  13. type CallerIdentifier struct {
  14. CallerIdentifier string `json:"callerIdentifier" validate:"required"`
  15. }
  16. func (callerIdentifier *CallerIdentifier) Identifier() string {
  17. return callerIdentifier.CallerIdentifier
  18. }
  19. var bindingCallerIdentifier = &CallerIdentifier{CallerIdentifier: "binding"}
  20. var itemCallerIdentifier = &CallerIdentifier{CallerIdentifier: "item"}
  21. //var bindingCallerIdentifier = nil
  22. //var itemCallerIdentifier = nil
  23. func TestMqttBinding(t *testing.T) {
  24. err := mqtt_binding.Init("test_prefix", &mqtt_client.MqttClientOptions{
  25. UserName: "admin",
  26. Password: "mtyzxhc",
  27. Address: "tcp://127.0.0.1:1883",
  28. ClientID: "test-binding",
  29. KeepAliveSec: 5,
  30. PingTimeoutSec: 5,
  31. })
  32. if err != nil {
  33. t.Fatal(err)
  34. }
  35. defer mqtt_binding.Destroy()
  36. testBinding := mqtt_binding.NewBinding("test", bindingCallerIdentifier, func(item *router.Item, data []byte) {
  37. fmt.Println("Global Middleware!!!")
  38. item.Next()
  39. })
  40. mqtt_binding.Bind(testBinding, &mqtt_binding.BindItem[struct {
  41. Time string `json:"time"`
  42. }, map[string]interface{}]{
  43. Topic: "/test-topic",
  44. ResponseIdentifier: itemCallerIdentifier,
  45. ResponseFunc: response.SendMapResponse,
  46. BusinessFunc: func(c *mqtt_client.MqttClient, inputModel struct {
  47. Time string `json:"time"`
  48. }) (map[string]interface{}, error) {
  49. panic("11111111")
  50. fmt.Printf("Received: %v\n", inputModel)
  51. return map[string]interface{}{
  52. "result": "pong",
  53. }, nil
  54. },
  55. }, func(item *router.Item, data []byte) {
  56. fmt.Println("Binding Middleware!!!")
  57. item.Next()
  58. })
  59. waitChan := make(chan []byte)
  60. mqttClient := mqtt.NewClient(mqtt.NewClientOptions().
  61. SetAutoReconnect(true).
  62. SetUsername("admin").
  63. SetPassword("mtyzxhc").
  64. AddBroker("tcp://127.0.0.1:1883").
  65. SetClientID("test-client").
  66. SetKeepAlive(5*time.Second).
  67. SetPingTimeout(5*time.Second).
  68. SetWill("test-client/will", "dead", 2, true).
  69. SetOnConnectHandler(func(client mqtt.Client) {
  70. replyTopic := "test_prefix/test/test-topic/reply"
  71. if itemCallerIdentifier != nil {
  72. replyTopic = "test_prefix/test/test-topic/" + itemCallerIdentifier.Identifier() + "/reply"
  73. } else if bindingCallerIdentifier != nil {
  74. replyTopic = "test_prefix/test/test-topic/" + bindingCallerIdentifier.Identifier() + "/reply"
  75. }
  76. token := client.Subscribe(replyTopic, 2, func(client mqtt.Client, message mqtt.Message) {
  77. waitChan <- message.Payload()
  78. })
  79. if token.Wait(); token.Error() != nil {
  80. fmt.Println(token.Error())
  81. return
  82. }
  83. sendMap := map[string]any{
  84. "callerIdentifier": "test",
  85. "time": time.Now().Format(time.DateTime),
  86. }
  87. if itemCallerIdentifier.Identifier() != "" {
  88. sendMap["callerIdentifier"] = itemCallerIdentifier.Identifier()
  89. } else if bindingCallerIdentifier.Identifier() != "" {
  90. sendMap["callerIdentifier"] = bindingCallerIdentifier.Identifier()
  91. }
  92. sendJson, err := json.Marshal(sendMap)
  93. if token.Wait(); token.Error() != nil {
  94. fmt.Println(err)
  95. return
  96. }
  97. token = client.Publish("test_prefix/test/test-topic", 2, false, sendJson)
  98. if token.Wait(); token.Error() != nil {
  99. fmt.Println(token.Error())
  100. return
  101. }
  102. }))
  103. token := mqttClient.Connect()
  104. if token.Wait(); token.Error() != nil {
  105. fmt.Println(token.Error())
  106. return
  107. }
  108. defer mqttClient.Disconnect(250)
  109. data := <-waitChan
  110. respMap := make(map[string]interface{})
  111. err = json.Unmarshal(data, &respMap)
  112. if err != nil {
  113. fmt.Println(err)
  114. return
  115. }
  116. pong, ok := respMap["result"]
  117. if !ok {
  118. fmt.Println("响应错误")
  119. return
  120. }
  121. if pong != "pong" {
  122. fmt.Println("响应错误")
  123. return
  124. }
  125. fmt.Println(respMap)
  126. }