mqtt_binding_test.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package api_binding
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "git.sxidc.com/go-tools/api_binding/mqtt_binding"
  6. "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client"
  7. "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router"
  8. "git.sxidc.com/go-tools/api_binding/mqtt_binding/response"
  9. mqtt "github.com/eclipse/paho.mqtt.golang"
  10. "sync"
  11. "testing"
  12. "time"
  13. )
  14. func TestMqttBinding(t *testing.T) {
  15. err := mqtt_binding.Init(&mqtt_client.MqttClientOptions{
  16. UserName: "admin",
  17. Password: "mtyzxhc",
  18. Address: "tcp://127.0.0.1:1883",
  19. ClientID: "test-binding",
  20. KeepAliveSec: 5,
  21. PingTimeoutSec: 5,
  22. })
  23. if err != nil {
  24. t.Fatal(err)
  25. }
  26. defer mqtt_binding.Destroy()
  27. testBinding := mqtt_binding.NewBinding("test", func(item *router.Item, data []byte) {
  28. fmt.Println("Global Middleware!!!")
  29. item.Next()
  30. })
  31. mqtt_binding.Bind(testBinding, &mqtt_binding.BindItem[struct {
  32. Time string `json:"time"`
  33. }, map[string]interface{}]{
  34. Topic: "/test-topic",
  35. Qos: 2,
  36. Retained: true,
  37. ResponseFunc: response.SendMapResponse,
  38. BusinessFunc: func(c *mqtt_client.MqttClient, inputModel struct {
  39. Time string `json:"time"`
  40. }) (map[string]interface{}, error) {
  41. fmt.Printf("Received: %v\n", inputModel)
  42. return map[string]interface{}{
  43. "result": "pong",
  44. }, nil
  45. },
  46. OptionalBindingFunc: nil,
  47. }, func(item *router.Item, data []byte) {
  48. fmt.Println("Binding Middleware!!!")
  49. item.Next()
  50. })
  51. time.Sleep(10 * time.Minute)
  52. wg := sync.WaitGroup{}
  53. wg.Add(1)
  54. go func() {
  55. mqttClient := mqtt.NewClient(mqtt.NewClientOptions().
  56. SetAutoReconnect(true).
  57. SetUsername("admin").
  58. SetPassword("mtyzxhc").
  59. AddBroker("tcp://127.0.0.1:1883").
  60. SetClientID("test-client").
  61. SetKeepAlive(5).
  62. SetPingTimeout(5).
  63. SetWill("test-client/will", "dead", 2, true).
  64. SetOnConnectHandler(func(client mqtt.Client) {
  65. token := client.Subscribe("test/test-topic/reply", 2, func(client mqtt.Client, message mqtt.Message) {
  66. respMap := make(map[string]interface{})
  67. err = json.Unmarshal(message.Payload(), &respMap)
  68. if err != nil {
  69. fmt.Println(err)
  70. return
  71. }
  72. pong, ok := respMap["result"]
  73. if !ok {
  74. fmt.Println("响应错误")
  75. return
  76. }
  77. if pong != "pong" {
  78. fmt.Println("响应错误")
  79. return
  80. }
  81. })
  82. if token.Wait(); token.Error() != nil {
  83. fmt.Println(token.Error())
  84. return
  85. }
  86. sendMap := map[string]any{
  87. "time": time.Now().Format(time.DateTime),
  88. }
  89. sendJson, err := json.Marshal(sendMap)
  90. if token.Wait(); token.Error() != nil {
  91. fmt.Println(err)
  92. return
  93. }
  94. token = client.Publish("test/test-topic", 2, false, sendJson)
  95. if token.Wait(); token.Error() != nil {
  96. fmt.Println(token.Error())
  97. return
  98. }
  99. }))
  100. token := mqttClient.Connect()
  101. if token.Wait(); token.Error() != nil {
  102. fmt.Println(token.Error())
  103. return
  104. }
  105. defer mqttClient.Disconnect(10000)
  106. time.Sleep(2 * time.Second)
  107. wg.Done()
  108. }()
  109. wg.Wait()
  110. }