Bladeren bron

完成mqtt_binding封装

yjp 2 jaren geleden
bovenliggende
commit
156de14615

+ 5 - 1
mqtt_binding/mqtt_binding.go

@@ -29,6 +29,10 @@ func NewBinding(apiVersion string, handlers ...router.Handler) *Binding {
 	return &Binding{router: r}
 }
 
+func Bind[I any, O any](b *Binding, item *BindItem[I, O], handlers ...router.Handler) {
+	item.bind(b.router, handlers...)
+}
+
 // BindItem 路由条目结构
 type BindItem[I any, O any] struct {
 	Topic               string               // 请求路径
@@ -89,7 +93,7 @@ func (item *BindItem[I, O]) bind(r *router.Router, handlers ...router.Handler) {
 	})
 
 	// 所有的函数加入到执行链中
-	routerItem, err := router.NewItem(item.Topic, item.Qos, item.Retained)
+	routerItem, err := router.NewItem(r.Group+item.Topic, item.Qos, item.Retained, handlers)
 	if err != nil {
 		panic("创建路由条目失败: " + err.Error())
 		return

+ 2 - 4
mqtt_binding/mqtt_client/mqtt_client.go

@@ -97,8 +97,7 @@ func (c *MqttClient) Connect() error {
 
 		err := c.rangeRouters(func(r *router.Router) error {
 			err := r.RangeItem(func(item router.Item) error {
-				topic := r.Group + item.Topic
-				token := c.client.Subscribe(topic, item.Qos, func(client mqtt.Client, message mqtt.Message) {
+				token := c.client.Subscribe(item.Topic, item.Qos, func(client mqtt.Client, message mqtt.Message) {
 					item.CallHandlers(message.Payload())
 				})
 				if token.Wait(); token.Error() != nil {
@@ -141,8 +140,7 @@ func (c *MqttClient) GetRouter(group string, handlers []router.Handler) *router.
 			time.Sleep(1 * time.Second)
 		}
 
-		topic := group + item.Topic
-		token := c.client.Subscribe(topic, item.Qos, func(client mqtt.Client, message mqtt.Message) {
+		token := c.client.Subscribe(item.Topic, item.Qos, func(client mqtt.Client, message mqtt.Message) {
 			item.CallHandlers(message.Payload())
 		})
 		if token.Wait(); token.Error() != nil {

+ 2 - 2
mqtt_binding/mqtt_client/router/router.go

@@ -93,7 +93,7 @@ type Item struct {
 	currentData         []byte
 }
 
-func NewItem(topic string, qos byte, responseRetained bool) (*Item, error) {
+func NewItem(topic string, qos byte, responseRetained bool, handlers []Handler) (*Item, error) {
 	if utils.IsStringEmpty(topic) {
 		return nil, errors.New("没有传递主题")
 	}
@@ -102,7 +102,7 @@ func NewItem(topic string, qos byte, responseRetained bool) (*Item, error) {
 		Topic:               topic,
 		Qos:                 qos,
 		ResponseRetained:    responseRetained,
-		handlers:            make([]Handler, 0),
+		handlers:            handlers,
 		currentHandlerIndex: 0,
 		currentData:         make([]byte, 0),
 	}, nil

+ 6 - 0
mqtt_binding/mqtt_init.go

@@ -12,6 +12,11 @@ func Init(opts *mqtt_client.MqttClientOptions) error {
 		return err
 	}
 
+	err = mqttClient.Connect()
+	if err != nil {
+		return err
+	}
+
 	mqttClientInstance = mqttClient
 
 	return nil
@@ -19,6 +24,7 @@ func Init(opts *mqtt_client.MqttClientOptions) error {
 
 func Destroy() {
 	if mqttClientInstance != nil {
+		mqttClientInstance.Disconnect()
 		mqtt_client.DestroyMqttClient(mqttClientInstance)
 		mqttClientInstance = nil
 	}

+ 103 - 0
mqtt_binding_test.go

@@ -0,0 +1,103 @@
+package api_binding
+
+import (
+	"encoding/json"
+	"fmt"
+	"git.sxidc.com/go-tools/api_binding/mqtt_binding"
+	"git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client"
+	"git.sxidc.com/go-tools/api_binding/mqtt_binding/response"
+	mqtt "github.com/eclipse/paho.mqtt.golang"
+	"sync"
+	"testing"
+	"time"
+)
+
+func TestMqttBinding(t *testing.T) {
+	err := mqtt_binding.Init(&mqtt_client.MqttClientOptions{
+		UserName:       "admin",
+		Password:       "mtyzxhc",
+		Address:        "tcp://127.0.0.1:1883",
+		ClientID:       "test-binding",
+		KeepAliveSec:   5,
+		PingTimeoutSec: 5,
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	defer mqtt_binding.Destroy()
+
+	testBinding := mqtt_binding.NewBinding("test")
+	mqtt_binding.Bind(testBinding, &mqtt_binding.BindItem[any, map[string]interface{}]{
+		Topic:        "/test-topic",
+		Qos:          2,
+		Retained:     true,
+		ResponseFunc: response.SendMapResponse,
+		BusinessFunc: func(c *mqtt_client.MqttClient, inputModel any) (map[string]interface{}, error) {
+			return map[string]interface{}{
+				"result": "pong",
+			}, nil
+		},
+		OptionalBindingFunc: nil,
+	})
+
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+
+	go func() {
+		mqttClient := mqtt.NewClient(mqtt.NewClientOptions().
+			SetAutoReconnect(true).
+			SetUsername("admin").
+			SetPassword("mtyzxhc").
+			AddBroker("tcp://127.0.0.1:1883").
+			SetClientID("test-client").
+			SetKeepAlive(5).
+			SetPingTimeout(5).
+			SetWill("test-client/will", "dead", 2, true).
+			SetOnConnectHandler(func(client mqtt.Client) {
+				token := client.Subscribe("test/test-topic/reply", 2, func(client mqtt.Client, message mqtt.Message) {
+					respMap := make(map[string]interface{})
+					err = json.Unmarshal(message.Payload(), &respMap)
+					if err != nil {
+						fmt.Println(err)
+						return
+					}
+
+					pong, ok := respMap["result"]
+					if !ok {
+						fmt.Println("响应错误")
+						return
+					}
+
+					if pong != "pong" {
+						fmt.Println("响应错误")
+						return
+					}
+				})
+				if token.Wait(); token.Error() != nil {
+					fmt.Println(token.Error())
+					return
+				}
+
+				token = client.Publish("test/test-topic", 2, true, "test")
+				if token.Wait(); token.Error() != nil {
+					fmt.Println(token.Error())
+					return
+				}
+			}))
+
+		token := mqttClient.Connect()
+		if token.Wait(); token.Error() != nil {
+			fmt.Println(token.Error())
+			return
+		}
+
+		defer mqttClient.Disconnect(10000)
+
+		time.Sleep(2 * time.Second)
+
+		wg.Done()
+	}()
+
+	wg.Wait()
+}