mqtt_binding_test.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package api_binding
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "git.sxidc.com/go-tools/api_binding/mqtt_binding"
  6. "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client"
  7. "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router"
  8. "git.sxidc.com/go-tools/api_binding/mqtt_binding/response"
  9. mqtt "github.com/eclipse/paho.mqtt.golang"
  10. "testing"
  11. "time"
  12. )
  13. func TestMqttBinding(t *testing.T) {
  14. err := mqtt_binding.Init("test_prefix", &mqtt_client.MqttClientOptions{
  15. UserName: "admin",
  16. Password: "mtyzxhc",
  17. Address: "tcp://127.0.0.1:1883",
  18. ClientID: "test-binding",
  19. KeepAliveSec: 5,
  20. PingTimeoutSec: 5,
  21. })
  22. if err != nil {
  23. t.Fatal(err)
  24. }
  25. defer mqtt_binding.Destroy()
  26. testBinding := mqtt_binding.NewBinding("test", func(item *router.Item, data []byte) {
  27. fmt.Println("Global Middleware!!!")
  28. item.Next()
  29. })
  30. mqtt_binding.Bind(testBinding, &mqtt_binding.BindItem[struct {
  31. Time string `json:"time"`
  32. }, map[string]interface{}]{
  33. Topic: "/test-topic",
  34. ResponseFunc: response.SendMapResponse,
  35. BusinessFunc: func(c *mqtt_client.MqttClient, inputModel struct {
  36. Time string `json:"time"`
  37. }) (map[string]interface{}, error) {
  38. fmt.Printf("Received: %v\n", inputModel)
  39. return map[string]interface{}{
  40. "result": "pong",
  41. }, nil
  42. },
  43. OptionalBindingFunc: nil,
  44. }, func(item *router.Item, data []byte) {
  45. fmt.Println("Binding Middleware!!!")
  46. item.Next()
  47. })
  48. waitChan := make(chan []byte)
  49. mqttClient := mqtt.NewClient(mqtt.NewClientOptions().
  50. SetAutoReconnect(true).
  51. SetUsername("admin").
  52. SetPassword("mtyzxhc").
  53. AddBroker("tcp://127.0.0.1:1883").
  54. SetClientID("test-client").
  55. SetKeepAlive(5*time.Second).
  56. SetPingTimeout(5*time.Second).
  57. SetWill("test-client/will", "dead", 2, true).
  58. SetOnConnectHandler(func(client mqtt.Client) {
  59. token := client.Subscribe("test_prefix/test/test-topic/reply", 2, func(client mqtt.Client, message mqtt.Message) {
  60. waitChan <- message.Payload()
  61. })
  62. if token.Wait(); token.Error() != nil {
  63. fmt.Println(token.Error())
  64. return
  65. }
  66. sendMap := map[string]any{
  67. "time": time.Now().Format(time.DateTime),
  68. }
  69. sendJson, err := json.Marshal(sendMap)
  70. if token.Wait(); token.Error() != nil {
  71. fmt.Println(err)
  72. return
  73. }
  74. token = client.Publish("test_prefix/test/test-topic", 2, false, sendJson)
  75. if token.Wait(); token.Error() != nil {
  76. fmt.Println(token.Error())
  77. return
  78. }
  79. }))
  80. token := mqttClient.Connect()
  81. if token.Wait(); token.Error() != nil {
  82. fmt.Println(token.Error())
  83. return
  84. }
  85. defer mqttClient.Disconnect(250)
  86. data := <-waitChan
  87. respMap := make(map[string]interface{})
  88. err = json.Unmarshal(data, &respMap)
  89. if err != nil {
  90. fmt.Println(err)
  91. return
  92. }
  93. pong, ok := respMap["result"]
  94. if !ok {
  95. fmt.Println("响应错误")
  96. return
  97. }
  98. if pong != "pong" {
  99. fmt.Println("响应错误")
  100. return
  101. }
  102. fmt.Println(respMap)
  103. }