mqtt_binding_test.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package api_binding
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "git.sxidc.com/go-tools/api_binding/mqtt_binding"
  6. "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client"
  7. "git.sxidc.com/go-tools/api_binding/mqtt_binding/response"
  8. mqtt "github.com/eclipse/paho.mqtt.golang"
  9. "sync"
  10. "testing"
  11. "time"
  12. )
  13. func TestMqttBinding(t *testing.T) {
  14. err := mqtt_binding.Init(&mqtt_client.MqttClientOptions{
  15. UserName: "admin",
  16. Password: "mtyzxhc",
  17. Address: "tcp://127.0.0.1:1883",
  18. ClientID: "test-binding",
  19. KeepAliveSec: 5,
  20. PingTimeoutSec: 5,
  21. })
  22. if err != nil {
  23. t.Fatal(err)
  24. }
  25. defer mqtt_binding.Destroy()
  26. testBinding := mqtt_binding.NewBinding("test")
  27. mqtt_binding.Bind(testBinding, &mqtt_binding.BindItem[any, map[string]interface{}]{
  28. Topic: "/test-topic",
  29. Qos: 2,
  30. Retained: true,
  31. ResponseFunc: response.SendMapResponse,
  32. BusinessFunc: func(c *mqtt_client.MqttClient, inputModel any) (map[string]interface{}, error) {
  33. return map[string]interface{}{
  34. "result": "pong",
  35. }, nil
  36. },
  37. OptionalBindingFunc: nil,
  38. })
  39. wg := sync.WaitGroup{}
  40. wg.Add(1)
  41. go func() {
  42. mqttClient := mqtt.NewClient(mqtt.NewClientOptions().
  43. SetAutoReconnect(true).
  44. SetUsername("admin").
  45. SetPassword("mtyzxhc").
  46. AddBroker("tcp://127.0.0.1:1883").
  47. SetClientID("test-client").
  48. SetKeepAlive(5).
  49. SetPingTimeout(5).
  50. SetWill("test-client/will", "dead", 2, true).
  51. SetOnConnectHandler(func(client mqtt.Client) {
  52. token := client.Subscribe("test/test-topic/reply", 2, func(client mqtt.Client, message mqtt.Message) {
  53. respMap := make(map[string]interface{})
  54. err = json.Unmarshal(message.Payload(), &respMap)
  55. if err != nil {
  56. fmt.Println(err)
  57. return
  58. }
  59. pong, ok := respMap["result"]
  60. if !ok {
  61. fmt.Println("响应错误")
  62. return
  63. }
  64. if pong != "pong" {
  65. fmt.Println("响应错误")
  66. return
  67. }
  68. })
  69. if token.Wait(); token.Error() != nil {
  70. fmt.Println(token.Error())
  71. return
  72. }
  73. token = client.Publish("test/test-topic", 2, true, "test")
  74. if token.Wait(); token.Error() != nil {
  75. fmt.Println(token.Error())
  76. return
  77. }
  78. }))
  79. token := mqttClient.Connect()
  80. if token.Wait(); token.Error() != nil {
  81. fmt.Println(token.Error())
  82. return
  83. }
  84. defer mqttClient.Disconnect(10000)
  85. time.Sleep(2 * time.Second)
  86. wg.Done()
  87. }()
  88. wg.Wait()
  89. }