Browse Source

修改响应标识符实现

yjp 1 year ago
parent
commit
2070624d0e

+ 19 - 7
mqtt_binding/mqtt_binding.go

@@ -14,11 +14,12 @@ type BusinessFunc[I any, O any] func(c *mqtt_client.MqttClient, inputModel I) (O
 type BindingFunc[O any] func(c *mqtt_client.MqttClient, item *router.Item, request any, sendFunc response.SendFunc[O]) bool
 
 type Binding struct {
-	router *router.Router
+	responseIdentifier response.Identifier
+	router             *router.Router
 }
 
 // NewBinding 创建版本对应的binding
-func NewBinding(apiVersion string, handlers ...router.Handler) *Binding {
+func NewBinding(apiVersion string, responseIdentifier response.Identifier, handlers ...router.Handler) *Binding {
 	group := topicPrefix
 	if utils.IsStringNotEmpty(apiVersion) && apiVersion != "root" {
 		group += "/" + apiVersion
@@ -26,11 +27,14 @@ func NewBinding(apiVersion string, handlers ...router.Handler) *Binding {
 
 	r := mqttClientInstance.GetRouter(group, handlers)
 
-	return &Binding{router: r}
+	return &Binding{
+		responseIdentifier: responseIdentifier,
+		router:             r,
+	}
 }
 
 func Bind[I any, O any](b *Binding, item *BindItem[I, O], handlers ...router.Handler) {
-	item.bind(b.router, handlers...)
+	item.bind(b.router, b.responseIdentifier, handlers...)
 }
 
 func (binding *Binding) AddHandler(topic string, handlers ...router.Handler) error {
@@ -48,12 +52,13 @@ func (binding *Binding) AddHandler(topic string, handlers ...router.Handler) err
 // BindItem 路由条目结构
 type BindItem[I any, O any] struct {
 	Topic               string               // 请求路径
+	ResponseIdentifier  response.Identifier  // 响应标识符提供接口,会在响应主题添加该标识符
 	ResponseFunc        response.SendFunc[O] // 响应泛型函数
 	BusinessFunc        BusinessFunc[I, O]   // 业务泛型函数
 	OptionalBindingFunc BindingFunc[O]       // 可选的绑定函数
 }
 
-func (item *BindItem[I, O]) bind(r *router.Router, handlers ...router.Handler) {
+func (item *BindItem[I, O]) bind(r *router.Router, bindingResponseIdentifier response.Identifier, handlers ...router.Handler) {
 	if utils.IsStringEmpty(item.Topic) {
 		panic("需要指定主题")
 	}
@@ -76,6 +81,13 @@ func (item *BindItem[I, O]) bind(r *router.Router, handlers ...router.Handler) {
 
 	// 给单个路由增加中间件
 	handlers = append(handlers, func(routerItem *router.Item, data []byte) {
+		var responseIdentifier response.Identifier
+		if item.ResponseIdentifier != nil {
+			responseIdentifier = item.ResponseIdentifier
+		} else if bindingResponseIdentifier != nil {
+			responseIdentifier = bindingResponseIdentifier
+		}
+
 		var inputModel I
 
 		// 请求的结构类型不为any
@@ -87,7 +99,7 @@ func (item *BindItem[I, O]) bind(r *router.Router, handlers ...router.Handler) {
 					return
 				}
 			} else {
-				ok := request.BindingJson(mqttClientInstance, routerItem, &inputModel, item.ResponseFunc)
+				ok := request.BindingJson(mqttClientInstance, routerItem, &inputModel, responseIdentifier, item.ResponseFunc)
 				if !ok {
 					return
 				}
@@ -97,7 +109,7 @@ func (item *BindItem[I, O]) bind(r *router.Router, handlers ...router.Handler) {
 		// 执行业务函数
 		if item.BusinessFunc != nil {
 			outputModel, err := item.BusinessFunc(mqttClientInstance, inputModel)
-			item.ResponseFunc(mqttClientInstance, routerItem, outputModel, err)
+			item.ResponseFunc(mqttClientInstance, routerItem, responseIdentifier, outputModel, err)
 			return
 		}
 	})

+ 7 - 2
mqtt_binding/mqtt_client/mqtt_client.go

@@ -185,8 +185,13 @@ func (c *MqttClient) GetRouter(group string, handlers []router.Handler) *router.
 	return r
 }
 
-func (c *MqttClient) Response(item *router.Item, data []byte) error {
-	token := c.client.Publish(item.Topic+"/reply", 2, false, data)
+func (c *MqttClient) Response(item *router.Item, responseIdentifier string, data []byte) error {
+	replyTopic := item.Topic + "/reply"
+	if utils.IsStringNotEmpty(responseIdentifier) {
+		replyTopic = fmt.Sprintf("%s/%s/reply", item.Topic, responseIdentifier)
+	}
+
+	token := c.client.Publish(replyTopic, 2, false, data)
 	if !token.WaitTimeout(time.Duration(c.opts.WriteTimeoutSec) * time.Second) {
 		return errors.New("发布超时")
 	}

+ 11 - 3
mqtt_binding/request/request.go

@@ -10,13 +10,21 @@ import (
 
 var validate = validator.New(validator.WithRequiredStructEnabled())
 
-func BindingJson[O any](c *mqtt_client.MqttClient, item *router.Item, request any, sendFunc response.SendFunc[O]) bool {
+func BindingJson[O any](c *mqtt_client.MqttClient, item *router.Item, request any,
+	responseIdentifier response.Identifier, sendFunc response.SendFunc[O]) bool {
 	data := item.GetData()
 	if data != nil && len(data) != 0 {
+		if responseIdentifier != nil {
+			err := json.Unmarshal(data, responseIdentifier)
+			if err != nil {
+				panic(err)
+			}
+		}
+
 		err := json.Unmarshal(data, request)
 		if err != nil {
 			var zero O
-			sendFunc(c, item, zero, err)
+			sendFunc(c, item, responseIdentifier, zero, err)
 			return false
 		}
 	}
@@ -24,7 +32,7 @@ func BindingJson[O any](c *mqtt_client.MqttClient, item *router.Item, request an
 	err := validate.Struct(request)
 	if err != nil {
 		var zero O
-		sendFunc(c, item, zero, err)
+		sendFunc(c, item, responseIdentifier, zero, err)
 		return false
 	}
 

+ 21 - 21
mqtt_binding/response/response.go

@@ -9,13 +9,13 @@ import (
 	"git.sxidc.com/service-supports/fslog"
 )
 
-func NoResponse(_ any, _ error) {
+func NoResponse(_ *mqtt_client.MqttClient, _ *router.Item, _ Identifier, _ any, _ error) {
 	return
 }
 
-func SendMsgResponse(c *mqtt_client.MqttClient, item *router.Item, _ any, err error) {
+func SendMsgResponse(c *mqtt_client.MqttClient, item *router.Item, responseIdentifier Identifier, _ any, err error) {
 	msgResp := formMsgResponse(err)
-	jsonResponse(c, item, msgResp)
+	jsonResponse(c, item, responseIdentifier, msgResp)
 }
 
 type IDResponse[T IDType] struct {
@@ -23,9 +23,9 @@ type IDResponse[T IDType] struct {
 	ID T `json:"id"`
 }
 
-func SendIDResponse[T IDType](c *mqtt_client.MqttClient, item *router.Item, id T, err error) {
+func SendIDResponse[T IDType](c *mqtt_client.MqttClient, item *router.Item, responseIdentifier Identifier, id T, err error) {
 	msgResp := formMsgResponse(err)
-	jsonResponse(c, item, IDResponse[T]{
+	jsonResponse(c, item, responseIdentifier, IDResponse[T]{
 		MsgResponse: msgResp,
 		ID:          id,
 	})
@@ -36,9 +36,9 @@ type InfoResponse[T any] struct {
 	Info T `json:"info"`
 }
 
-func SendInfoResponse[T any](c *mqtt_client.MqttClient, item *router.Item, info T, err error) {
+func SendInfoResponse[T any](c *mqtt_client.MqttClient, item *router.Item, responseIdentifier Identifier, info T, err error) {
 	msgResp := formMsgResponse(err)
-	jsonResponse(c, item, InfoResponse[T]{
+	jsonResponse(c, item, responseIdentifier, InfoResponse[T]{
 		MsgResponse: msgResp,
 		Info:        info,
 	})
@@ -49,9 +49,9 @@ type InfosResponse[T any] struct {
 	InfosData[T]
 }
 
-func SendInfosResponse[T any](c *mqtt_client.MqttClient, item *router.Item, data InfosData[T], err error) {
+func SendInfosResponse[T any](c *mqtt_client.MqttClient, item *router.Item, responseIdentifier Identifier, data InfosData[T], err error) {
 	msgResp := formMsgResponse(err)
-	jsonResponse(c, item, InfosResponse[T]{
+	jsonResponse(c, item, responseIdentifier, InfosResponse[T]{
 		MsgResponse: msgResp,
 		InfosData:   data,
 	})
@@ -72,13 +72,13 @@ func StructToMap(originStruct any) map[string]any {
 	return retMap
 }
 
-func SendMapResponse(c *mqtt_client.MqttClient, item *router.Item, data map[string]any, err error) {
+func SendMapResponse(c *mqtt_client.MqttClient, item *router.Item, responseIdentifier Identifier, data map[string]any, err error) {
 	msgRespMap := formMapMsgResponse(err)
 	for key, value := range data {
 		msgRespMap[key] = value
 	}
 
-	jsonResponse(c, item, msgRespMap)
+	jsonResponse(c, item, responseIdentifier, msgRespMap)
 }
 
 type MsgResponse struct {
@@ -126,39 +126,39 @@ func formMapMsgResponse(err error) map[string]any {
 	return resp
 }
 
-func SendString(c *mqtt_client.MqttClient, item *router.Item, data string, err error) {
+func SendString(c *mqtt_client.MqttClient, item *router.Item, responseIdentifier Identifier, data string, err error) {
 	if err != nil {
-		bytesResponse(c, item, []byte(err.Error()))
+		bytesResponse(c, item, responseIdentifier, []byte(err.Error()))
 		return
 	}
 
-	bytesResponse(c, item, []byte(data))
+	bytesResponse(c, item, responseIdentifier, []byte(data))
 }
 
-func WriteBytes(c *mqtt_client.MqttClient, item *router.Item, bytes []byte, err error) {
+func WriteBytes(c *mqtt_client.MqttClient, item *router.Item, responseIdentifier Identifier, bytes []byte, err error) {
 	if err != nil {
-		bytesResponse(c, item, []byte(err.Error()))
+		bytesResponse(c, item, responseIdentifier, []byte(err.Error()))
 		return
 	}
 
-	bytesResponse(c, item, bytes)
+	bytesResponse(c, item, responseIdentifier, bytes)
 }
 
-func jsonResponse(c *mqtt_client.MqttClient, item *router.Item, obj any) {
+func jsonResponse(c *mqtt_client.MqttClient, item *router.Item, responseIdentifier Identifier, obj any) {
 	jsonBytes, err := json.Marshal(obj)
 	if err != nil {
 		panic(err)
 	}
 
-	err = c.Response(item, jsonBytes)
+	err = c.Response(item, responseIdentifier.Identifier(), jsonBytes)
 	if err != nil {
 		fmt.Println("发送mqtt json响应失败")
 		return
 	}
 }
 
-func bytesResponse(c *mqtt_client.MqttClient, item *router.Item, obj []byte) {
-	err := c.Response(item, []byte(obj))
+func bytesResponse(c *mqtt_client.MqttClient, item *router.Item, responseIdentifier Identifier, obj []byte) {
+	err := c.Response(item, responseIdentifier.Identifier(), obj)
 	if err != nil {
 		fmt.Println("发送mqtt bytes响应失败")
 		return

+ 5 - 1
mqtt_binding/response/type.go

@@ -5,7 +5,11 @@ import (
 	"git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router"
 )
 
-type SendFunc[T any] func(c *mqtt_client.MqttClient, item *router.Item, data T, err error)
+type Identifier interface {
+	Identifier() string
+}
+
+type SendFunc[T any] func(c *mqtt_client.MqttClient, item *router.Item, identifier Identifier, data T, err error)
 
 type IDType interface {
 	~string | ~uint64

+ 29 - 5
mqtt_binding_test.go

@@ -12,6 +12,17 @@ import (
 	"time"
 )
 
+type CallerIdentifier struct {
+	CallerIdentifier string `json:"callerIdentifier" validate:"required"`
+}
+
+func (callerIdentifier *CallerIdentifier) Identifier() string {
+	return callerIdentifier.CallerIdentifier
+}
+
+var bindingCallerIdentifier = &CallerIdentifier{CallerIdentifier: ""}
+var itemCallerIdentifier = &CallerIdentifier{CallerIdentifier: ""}
+
 func TestMqttBinding(t *testing.T) {
 	err := mqtt_binding.Init("test_prefix", &mqtt_client.MqttClientOptions{
 		UserName:       "admin",
@@ -27,15 +38,16 @@ func TestMqttBinding(t *testing.T) {
 
 	defer mqtt_binding.Destroy()
 
-	testBinding := mqtt_binding.NewBinding("test", func(item *router.Item, data []byte) {
+	testBinding := mqtt_binding.NewBinding("test", bindingCallerIdentifier, func(item *router.Item, data []byte) {
 		fmt.Println("Global Middleware!!!")
 		item.Next()
 	})
 	mqtt_binding.Bind(testBinding, &mqtt_binding.BindItem[struct {
 		Time string `json:"time"`
 	}, map[string]interface{}]{
-		Topic:        "/test-topic",
-		ResponseFunc: response.SendMapResponse,
+		Topic:              "/test-topic",
+		ResponseIdentifier: itemCallerIdentifier,
+		ResponseFunc:       response.SendMapResponse,
 		BusinessFunc: func(c *mqtt_client.MqttClient, inputModel struct {
 			Time string `json:"time"`
 		}) (map[string]interface{}, error) {
@@ -45,7 +57,6 @@ func TestMqttBinding(t *testing.T) {
 				"result": "pong",
 			}, nil
 		},
-		OptionalBindingFunc: nil,
 	}, func(item *router.Item, data []byte) {
 		fmt.Println("Binding Middleware!!!")
 		item.Next()
@@ -63,7 +74,14 @@ func TestMqttBinding(t *testing.T) {
 		SetPingTimeout(5*time.Second).
 		SetWill("test-client/will", "dead", 2, true).
 		SetOnConnectHandler(func(client mqtt.Client) {
-			token := client.Subscribe("test_prefix/test/test-topic/reply", 2, func(client mqtt.Client, message mqtt.Message) {
+			replyTopic := "test_prefix/test/test-topic/reply"
+			if itemCallerIdentifier.Identifier() != "" {
+				replyTopic = "test_prefix/test/test-topic/" + itemCallerIdentifier.Identifier() + "/reply"
+			} else if bindingCallerIdentifier.Identifier() != "" {
+				replyTopic = "test_prefix/test/test-topic/" + bindingCallerIdentifier.Identifier() + "/reply"
+			}
+
+			token := client.Subscribe(replyTopic, 2, func(client mqtt.Client, message mqtt.Message) {
 				waitChan <- message.Payload()
 			})
 			if token.Wait(); token.Error() != nil {
@@ -75,6 +93,12 @@ func TestMqttBinding(t *testing.T) {
 				"time": time.Now().Format(time.DateTime),
 			}
 
+			if itemCallerIdentifier.Identifier() != "" {
+				sendMap["callerIdentifier"] = itemCallerIdentifier.Identifier()
+			} else if bindingCallerIdentifier.Identifier() != "" {
+				sendMap["callerIdentifier"] = bindingCallerIdentifier.Identifier()
+			}
+
 			sendJson, err := json.Marshal(sendMap)
 			if token.Wait(); token.Error() != nil {
 				fmt.Println(err)