Browse Source

完成mqtt_binding开发

yjp 3 months ago
parent
commit
f4e435061d

+ 71 - 0
framework/core/mqtt_api/context.go

@@ -0,0 +1,71 @@
+package mqtt_api
+
+import (
+	"time"
+)
+
+type Context struct {
+	// 主题
+	Topic string
+
+	// 数据
+	Data []byte
+
+	mqttClient          *MqttClient
+	handlers            []Handler
+	currentHandlerIndex int
+}
+
+func newContext(mqttClient *MqttClient, topic string, data []byte, handlers []Handler) (*Context, error) {
+	return &Context{
+		Topic:               topic,
+		Data:                data,
+		mqttClient:          mqttClient,
+		handlers:            handlers,
+		currentHandlerIndex: 0,
+	}, nil
+}
+
+func destroyContext(c *Context) {
+	if c == nil {
+		return
+	}
+
+	if c.currentHandlerIndex == len(c.handlers) {
+		return
+	}
+
+	ticker := time.NewTicker(100 * time.Millisecond)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case <-ticker.C:
+			if c.currentHandlerIndex == len(c.handlers) {
+				break
+			}
+		}
+	}
+}
+
+func (c *Context) callHandlers(data []byte) {
+	c.currentHandlerIndex = 0
+	c.Data = data
+	c.handlers[c.currentHandlerIndex](c)
+}
+
+func (c *Context) Response(responseIdentifier string, data []byte) error {
+	err := c.mqttClient.response(c.Topic, responseIdentifier, data)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (c *Context) Next() {
+	c.currentHandlerIndex++
+	if c.currentHandlerIndex < len(c.handlers) {
+		c.handlers[c.currentHandlerIndex](c)
+	}
+}

+ 100 - 0
framework/core/mqtt_api/mqtt_api.go

@@ -0,0 +1,100 @@
+package mqtt_api
+
+import (
+	"git.sxidc.com/service-supports/fslog"
+	"runtime/debug"
+	"slices"
+	"time"
+)
+
+type MqttApi struct {
+	options Options
+	router  *Router
+}
+
+func New(opts ...Option) (*MqttApi, error) {
+	options := new(Options)
+
+	for _, opt := range opts {
+		opt(options)
+	}
+
+	router := NewRouter(options.topicPrefix, []Handler{
+		func(c *Context) {
+			start := time.Now()
+			c.Next()
+
+			if slices.Contains(options.logSkipPaths, c.Topic) {
+				return
+			}
+
+			end := time.Now()
+			fslog.Info("| %d | %s |",
+				end.Sub(start),
+				c.Topic,
+			)
+		},
+		func(c *Context) {
+			defer func() {
+				err := recover()
+				if err != nil {
+					fslog.Error("%s", debug.Stack())
+				}
+			}()
+
+			c.Next()
+		},
+	})
+
+	return &MqttApi{
+		options: *options,
+		router:  router,
+	}, nil
+}
+
+func Destroy(api *MqttApi) {
+	DestroyRouter(api.router)
+	api.router = nil
+}
+
+// Start 运行Api
+// 参数: 无
+// 返回值:
+// - 错误
+func (api *MqttApi) Start() error {
+	err := api.router.Start(api.options.mqttOptions)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// Finish 终止Api
+// 参数: 无
+// 返回值: 无
+func (api *MqttApi) Finish() {
+	if api.router == nil {
+		return
+	}
+
+	api.router.Finish()
+	api.router = nil
+}
+
+// Options 获取Api的选项
+// 参数: 无
+// 返回值:
+// - Api的选项
+func (api *MqttApi) Options() Options {
+	return api.options
+}
+
+// Router 获取Api的路由
+// 参数:
+// - 消息组
+// 返回值:
+// - Api的路由
+func (api *MqttApi) Router() *Router {
+	return api.router
+}

+ 160 - 0
framework/core/mqtt_api/mqtt_client.go

@@ -0,0 +1,160 @@
+package mqtt_api
+
+import (
+	"errors"
+	"fmt"
+	"git.sxidc.com/go-tools/utils/strutils"
+	mqtt "github.com/eclipse/paho.mqtt.golang"
+	"time"
+)
+
+type MqttClientOptions struct {
+	UserName        string
+	Password        string
+	Address         string
+	ClientID        string
+	KeepAliveSec    int64
+	PingTimeoutSec  int64
+	WriteTimeoutSec int64
+}
+
+func (opts *MqttClientOptions) check() error {
+	if strutils.IsStringEmpty(opts.UserName) {
+		return errors.New("必须传递用户名")
+	}
+
+	if strutils.IsStringEmpty(opts.Password) {
+		return errors.New("必须传递密码")
+	}
+
+	if strutils.IsStringEmpty(opts.Address) {
+		return errors.New("必须传递地址")
+	}
+
+	if strutils.IsStringEmpty(opts.ClientID) {
+		return errors.New("必须传递客户端ID")
+	}
+
+	return nil
+}
+
+type MqttClient struct {
+	mqttClient mqtt.Client
+	opts       *MqttClientOptions
+}
+
+func newMqttClient(opts *MqttClientOptions, onConnect func(client *MqttClient), onConnectionLost func(client *MqttClient)) (*MqttClient, error) {
+	if opts == nil {
+		return nil, errors.New("没有传递mqtt客户端选项")
+	}
+
+	err := opts.check()
+	if err != nil {
+		return nil, err
+	}
+
+	if opts.WriteTimeoutSec == 0 {
+		opts.WriteTimeoutSec = 60
+	}
+
+	mqttClient := &MqttClient{
+		opts: opts,
+	}
+
+	mqttOptions := mqtt.NewClientOptions().
+		SetAutoReconnect(true).
+		SetUsername(opts.UserName).
+		SetPassword(opts.Password).
+		AddBroker(opts.Address).
+		SetClientID(opts.ClientID).
+		SetKeepAlive(time.Duration(opts.KeepAliveSec)*time.Second).
+		SetPingTimeout(time.Duration(opts.PingTimeoutSec)*time.Second).
+		SetWriteTimeout(time.Duration(opts.WriteTimeoutSec)*time.Second).
+		SetWill(opts.ClientID+"/will", "dead", 2, false).
+		SetOnConnectHandler(func(client mqtt.Client) {
+			onConnect(mqttClient)
+		}).
+		SetConnectionLostHandler(func(client mqtt.Client, _ error) {
+			onConnectionLost(mqttClient)
+		})
+
+	mqttClient.mqttClient = mqtt.NewClient(mqttOptions)
+
+	return mqttClient, nil
+}
+
+func destroyMqttClient(c *MqttClient) {
+	if c != nil {
+		c.mqttClient = nil
+	}
+
+	c = nil
+}
+
+func (client *MqttClient) connect() error {
+	token := client.mqttClient.Connect()
+	if !token.WaitTimeout(time.Duration(client.opts.WriteTimeoutSec) * time.Second) {
+		return errors.New("连接超时")
+	}
+
+	if token.Error() != nil {
+		return token.Error()
+	}
+
+	return nil
+}
+
+func (client *MqttClient) disconnect() {
+	client.mqttClient.Disconnect(250)
+}
+
+func (client *MqttClient) subscribe(topic string, callback func(topic string, data []byte)) error {
+	token := client.mqttClient.Subscribe(topic, 2, func(mqttClient mqtt.Client, message mqtt.Message) {
+		callback(topic, message.Payload())
+	})
+	if !token.WaitTimeout(time.Duration(client.opts.WriteTimeoutSec) * time.Second) {
+		return errors.New("订阅超时")
+	}
+
+	if token.Error() != nil {
+		return token.Error()
+	}
+
+	fmt.Println("[MQTT] Subscribe Topic: " + topic)
+
+	return nil
+}
+
+func (client *MqttClient) unsubscribe(topic string) error {
+	token := client.mqttClient.Unsubscribe(topic)
+	if !token.WaitTimeout(time.Duration(client.opts.WriteTimeoutSec) * time.Second) {
+		return errors.New("取消订阅超时")
+	}
+
+	if token.Error() != nil {
+		return token.Error()
+	}
+
+	fmt.Println("[MQTT] Unsubscribe Topic: " + topic)
+
+	return nil
+}
+
+func (client *MqttClient) response(topic string, responseIdentifier string, data []byte) error {
+	replyTopic := topic + "/reply"
+	
+	if strutils.IsStringNotEmpty(responseIdentifier) {
+		replyTopic = fmt.Sprintf("%s/%s/reply", topic, responseIdentifier)
+	}
+
+	token := client.mqttClient.Publish(replyTopic, 2, false, data)
+	if !token.WaitTimeout(time.Duration(client.opts.WriteTimeoutSec) * time.Second) {
+		return errors.New("发布超时")
+	}
+
+	if token.Error() != nil {
+		return token.Error()
+	}
+
+	return nil
+}

+ 39 - 0
framework/core/mqtt_api/options.go

@@ -0,0 +1,39 @@
+package mqtt_api
+
+type Options struct {
+	// topic前缀
+	topicPrefix string
+
+	// 日志跳过的打印路径
+	logSkipPaths []string
+
+	// mqtt配置
+	mqttOptions *MqttClientOptions
+}
+
+func (options Options) GetTopicPrefix() string {
+	return options.topicPrefix
+}
+
+type Option func(options *Options)
+
+// WithTopicPrefix 设置topic前缀
+func WithTopicPrefix(topicPrefix string) Option {
+	return func(options *Options) {
+		options.topicPrefix = topicPrefix
+	}
+}
+
+// WithLogSkipPaths 设置日志跳过的打印路径
+func WithLogSkipPaths(logSkipPaths ...string) Option {
+	return func(options *Options) {
+		options.logSkipPaths = logSkipPaths
+	}
+}
+
+// WithMqttOptions 设置mqtt配置
+func WithMqttOptions(mqttOptions *MqttClientOptions) Option {
+	return func(options *Options) {
+		options.mqttOptions = mqttOptions
+	}
+}

+ 39 - 0
framework/core/mqtt_api/request/request.go

@@ -0,0 +1,39 @@
+package request
+
+import (
+	"encoding/json"
+	"git.sxidc.com/go-framework/baize/framework/core/domain"
+	"git.sxidc.com/go-framework/baize/framework/core/mqtt_api"
+	"git.sxidc.com/go-framework/baize/framework/core/tag/assign"
+	"github.com/go-playground/validator/v10"
+)
+
+var validate = validator.New(validator.WithRequiredStructEnabled())
+
+func BindingJson(c *mqtt_api.Context, request any) error {
+	if c.Data == nil || len(c.Data) == 0 || request == nil {
+		return nil
+	}
+
+	err := json.Unmarshal(c.Data, request)
+	if err != nil {
+		return err
+	}
+
+	err = validate.Struct(request)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// AssignRequestParamsToDomainObject 基于assign tag将请求参数赋值到领域对象
+// 参数:
+// - params: 请求参数
+// - domainObject: 领域对象
+// 返回值:
+// - 错误
+func AssignRequestParamsToDomainObject(params Params, domainObject domain.Object) error {
+	return assign.DefaultUsage(params, domainObject)
+}

+ 170 - 0
framework/core/mqtt_api/response/response.go

@@ -0,0 +1,170 @@
+package response
+
+import (
+	"encoding/json"
+	"fmt"
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger"
+	"git.sxidc.com/go-framework/baize/framework/core/mqtt_api"
+)
+
+type SendResponseFunc[T any] func(c *mqtt_api.Context, identifier string, data T, err error)
+
+func NoResponse(_ *mqtt_api.Context, _ string, _ any, _ error) {
+	return
+}
+
+func SendMsgResponse(c *mqtt_api.Context, identifier string, _ any, err error) {
+	msgResp := formMsgResponse(err)
+	jsonResponse(c, identifier, msgResp)
+}
+
+type IDResponse struct {
+	MsgResponse
+	ID string `json:"id"`
+}
+
+func SendIDResponse(c *mqtt_api.Context, identifier string, id string, err error) {
+	msgResp := formMsgResponse(err)
+	jsonResponse(c, identifier, IDResponse{
+		MsgResponse: msgResp,
+		ID:          id,
+	})
+}
+
+type InfoResponse[T any] struct {
+	MsgResponse
+	Info T `json:"info"`
+}
+
+func SendInfoResponse[T any](c *mqtt_api.Context, identifier string, info T, err error) {
+	msgResp := formMsgResponse(err)
+	jsonResponse(c, identifier, InfoResponse[T]{
+		MsgResponse: msgResp,
+		Info:        info,
+	})
+}
+
+type InfosData[T any] struct {
+	Infos      []T   `json:"infos"`
+	TotalCount int64 `json:"totalCount"`
+	PageNo     int   `json:"pageNo"`
+}
+
+type InfosResponse[T any] struct {
+	MsgResponse
+	InfosData[T]
+}
+
+func SendInfosResponse[T any](c *mqtt_api.Context, identifier string, data InfosData[T], err error) {
+	msgResp := formMsgResponse(err)
+	jsonResponse(c, identifier, InfosResponse[T]{
+		MsgResponse: msgResp,
+		InfosData:   data,
+	})
+}
+
+func StructToMap(originStruct any) map[string]any {
+	jsonBytes, err := json.Marshal(originStruct)
+	if err != nil {
+		panic(err)
+	}
+
+	retMap := make(map[string]any)
+	err = json.Unmarshal(jsonBytes, &retMap)
+	if err != nil {
+		panic(err)
+	}
+
+	return retMap
+}
+
+func SendMapResponse(c *mqtt_api.Context, identifier string, data map[string]any, err error) {
+	msgRespMap := formMapMsgResponse(err)
+	for key, value := range data {
+		msgRespMap[key] = value
+	}
+
+	jsonResponse(c, identifier, msgRespMap)
+}
+
+type MsgResponse struct {
+	Success bool   `json:"success"`
+	ErrCode int    `json:"errCode"`
+	Msg     string `json:"msg"`
+}
+
+func formMsgResponse(err error) MsgResponse {
+	if err != nil {
+		logger.GetInstance().Error(err)
+
+		return MsgResponse{
+			Success: false,
+			ErrCode: 1,
+			Msg:     err.Error(),
+		}
+	}
+
+	return MsgResponse{
+		Success: true,
+		ErrCode: 0,
+		Msg:     "操作成功",
+	}
+}
+
+func formMapMsgResponse(err error) map[string]any {
+	resp := make(map[string]any)
+	if err != nil {
+		logger.GetInstance().Error(err)
+
+		resp["success"] = false
+		resp["errCode"] = 1
+		resp["msg"] = err.Error()
+
+		return resp
+	}
+
+	resp["success"] = true
+	resp["errCode"] = 0
+	resp["msg"] = "操作成功"
+
+	return resp
+}
+
+func SendString(c *mqtt_api.Context, identifier string, data string, err error) {
+	if err != nil {
+		bytesResponse(c, identifier, []byte(err.Error()))
+		return
+	}
+
+	bytesResponse(c, identifier, []byte(data))
+}
+
+func WriteBytes(c *mqtt_api.Context, identifier string, bytes []byte, err error) {
+	if err != nil {
+		bytesResponse(c, identifier, []byte(err.Error()))
+		return
+	}
+
+	bytesResponse(c, identifier, bytes)
+}
+
+func jsonResponse(c *mqtt_api.Context, identifier string, obj any) {
+	jsonBytes, err := json.Marshal(obj)
+	if err != nil {
+		panic(err)
+	}
+
+	err = c.Response(identifier, jsonBytes)
+	if err != nil {
+		fmt.Println("发送mqtt json响应失败: " + err.Error())
+		return
+	}
+}
+
+func bytesResponse(c *mqtt_api.Context, identifier string, obj []byte) {
+	err := c.Response(identifier, obj)
+	if err != nil {
+		fmt.Println("发送mqtt bytes响应失败")
+		return
+	}
+}

+ 163 - 0
framework/core/mqtt_api/router.go

@@ -0,0 +1,163 @@
+package mqtt_api
+
+import (
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger"
+	"sync"
+)
+
+type Handler func(c *Context)
+
+type Router struct {
+	Group string
+
+	mqttClient *MqttClient
+
+	contextsMutex *sync.Mutex
+	contexts      []*Context
+
+	globalHandlers []Handler
+
+	topicHandlersMutex *sync.Mutex
+	topicHandlers      map[string][]Handler
+}
+
+func NewRouter(group string, globalHandlers []Handler) *Router {
+	return &Router{
+		Group:              group,
+		mqttClient:         nil,
+		contextsMutex:      &sync.Mutex{},
+		contexts:           make([]*Context, 0),
+		globalHandlers:     globalHandlers,
+		topicHandlersMutex: &sync.Mutex{},
+		topicHandlers:      make(map[string][]Handler),
+	}
+}
+
+func DestroyRouter(router *Router) {
+	if router == nil {
+		return
+	}
+
+	router.contextsMutex.Lock()
+
+	for _, context := range router.contexts {
+		destroyContext(context)
+	}
+
+	router.contexts = nil
+
+	router.contextsMutex.Unlock()
+
+	router = nil
+}
+
+func (router *Router) Start(mqttOptions *MqttClientOptions) error {
+	mqttClient, err := newMqttClient(mqttOptions, func(client *MqttClient) {
+		router.subscribeTopics(client)
+	}, func(client *MqttClient) {
+		router.unsubscribeTopics(client)
+	})
+	if err != nil {
+		return err
+	}
+
+	err = mqttClient.connect()
+	if err != nil {
+		return err
+	}
+
+	router.mqttClient = mqttClient
+
+	return nil
+}
+
+func (router *Router) Finish() {
+	if router.mqttClient == nil {
+		return
+	}
+
+	router.unsubscribeTopics(router.mqttClient)
+
+	router.mqttClient.disconnect()
+	destroyMqttClient(router.mqttClient)
+	router.mqttClient = nil
+}
+
+func (router *Router) AddTopic(topic string, handlers ...Handler) error {
+	added := router.addTopicHandlers(topic, handlers...)
+	if !added {
+		return nil
+	}
+
+	if router.mqttClient == nil {
+		return nil
+	}
+
+	err := router.mqttClient.subscribe(topic, func(topic string, data []byte) {})
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (router *Router) subscribeTopics(client *MqttClient) {
+	router.rangeTopicHandlers(func(topic string, handlers []Handler) {
+		err := client.subscribe(topic, func(topic string, data []byte) {
+			c, err := newContext(router.mqttClient, topic, data, handlers)
+			if err != nil {
+				logger.GetInstance().Error(err)
+				return
+			}
+
+			c.Next()
+		})
+		if err != nil {
+			logger.GetInstance().Error(err)
+			return
+		}
+	})
+}
+
+func (router *Router) unsubscribeTopics(client *MqttClient) {
+	router.rangeTopicHandlers(func(topic string, handlers []Handler) {
+		err := client.unsubscribe(topic)
+		if err != nil {
+			logger.GetInstance().Error(err)
+			return
+		}
+	})
+}
+
+func (router *Router) addTopicHandlers(topic string, handler ...Handler) bool {
+	router.topicHandlersMutex.Lock()
+	defer router.topicHandlersMutex.Unlock()
+
+	if router.topicHandlers[topic] != nil && len(router.topicHandlers[topic]) > 0 {
+		return false
+	}
+
+	router.topicHandlers[topic] = append(router.globalHandlers, handler...)
+
+	return true
+}
+
+func (router *Router) removeTopicHandlers(topic string) {
+	router.topicHandlersMutex.Lock()
+	defer router.topicHandlersMutex.Unlock()
+
+	if router.topicHandlers[topic] == nil || len(router.topicHandlers[topic]) == 0 {
+		return
+	}
+
+	delete(router.topicHandlers, topic)
+}
+
+func (router *Router) rangeTopicHandlers(callback func(topic string, handlers []Handler)) {
+	router.topicHandlersMutex.Lock()
+	defer router.topicHandlersMutex.Unlock()
+
+	for topic, topicHandlers := range router.topicHandlers {
+		callback(topic, topicHandlers)
+	}
+}

+ 214 - 0
framework/mqtt_binding/bind_item.go

@@ -0,0 +1,214 @@
+package mqtt_binding
+
+import (
+	"git.sxidc.com/go-framework/baize/framework/core/domain"
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure"
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger"
+	"git.sxidc.com/go-framework/baize/framework/core/mqtt_api"
+	"git.sxidc.com/go-framework/baize/framework/core/mqtt_api/request"
+	"git.sxidc.com/go-framework/baize/framework/core/mqtt_api/response"
+	"git.sxidc.com/go-tools/utils/reflectutils"
+	"git.sxidc.com/go-tools/utils/strutils"
+	"github.com/pkg/errors"
+	"reflect"
+	"strings"
+)
+
+// ResponseIdentifierFunc 获取响应标识符的函数
+// 参数:
+// - c: 上下文
+// - params: 请求参数
+// - objects: 基于请求参数构造出的领域对象
+// - i: 基础设施
+// 返回值:
+// - 响应标识符
+// - 错误
+type ResponseIdentifierFunc func(c *mqtt_api.Context, params any) (string, error)
+
+// FormDomainObjectsFunc 构造领域对象函数类型
+// 参数:
+// - c: 上下文
+// - params: 请求参数
+// 返回值:
+// - 基于请求参数构造的领域对象,回传到ServiceFunc的objects参数
+// - 错误
+type FormDomainObjectsFunc func(c *mqtt_api.Context, params any) ([]domain.Object, error)
+
+// ServiceFunc 服务函数(业务逻辑函数)
+// 类型参数:
+// - O: 响应数据类型,对应response.SendResponseFunc的data的类型
+// 参数:
+// - c: 上下文
+// - params: 请求参数
+// - objects: 基于请求参数构造出的领域对象
+// - i: 基础设施
+// 返回值:
+// - 响应数据
+// - 错误
+type ServiceFunc[O any] func(c *mqtt_api.Context, params any, objects []domain.Object, i *infrastructure.Infrastructure) (O, error)
+
+// Bind 通用绑定
+// 类型参数:
+// - O: 响应数据类型,对应response.SendResponseFunc的data的类型
+// 参数:
+// - binder: 用来执行绑定的binder
+// - item: 执行绑定的参数
+// - responseIdentifier: 全局响应标识符
+// - middlewares: 该绑定的中间件
+func Bind[O any](binder *Binder, item *BindItem[O], responseIdentifier string, middlewares ...Middleware) {
+	item.bind(binder, responseIdentifier, middlewares...)
+}
+
+// BindItem 通用BindItem
+type BindItem[O any] struct {
+	// 相对主题
+	Topic string
+
+	// 响应泛型函数,如果不响应,需要使用NoResponse零值占位
+	SendResponseFunc response.SendResponseFunc[O]
+
+	// 使用的请求参数,非必传,当请求参数为nil时,说明该接口没有参数
+	RequestParams any
+
+	// 获取响应标识符的函数
+	ResponseIdentifierFunc ResponseIdentifierFunc
+
+	// 通过请求参数构造使用的领域对象,之后在ServiceFunc中会按照构造实体的顺序进行回调
+	// 非必传,如果该字段为nil,则说明没有领域对象
+	// 与Objects字段二选一使用,如果都指定,会按照该字段处理
+	FormDomainObjectsFunc FormDomainObjectsFunc
+
+	// 使用的领域对象,当使用Tag对实体进行标注后,可以直接通过该字段给定实体,之后在ServiceFunc中会按照给定实体的顺序进行回调
+	// 非必传,如果为nil或长度为0,则说明没有领域对象
+	// 与FormObjectsFunc字段二选一使用,如果都指定,会按照FormObjectsFunc字段处理
+	Objects []domain.Object
+
+	// 应用服务泛型函数
+	ServiceFunc ServiceFunc[O]
+}
+
+func (item *BindItem[O]) bind(binder *Binder, responseIdentifier string, middlewares ...Middleware) {
+	if strutils.IsStringEmpty(item.Topic) {
+		panic("需要指定主题")
+	}
+
+	if item.SendResponseFunc == nil {
+		panic("需要指定响应函数")
+	}
+
+	if item.ServiceFunc == nil {
+		panic("需要指定应用服务函数")
+	}
+
+	var outputZero O
+	outputZeroValue := reflect.ValueOf(outputZero)
+	if outputZeroValue.IsValid() && outputZeroValue.Kind() == reflect.Pointer {
+		panic("bind的输出类型不能使用指针类型")
+	}
+
+	if outputZeroValue.IsValid() && strings.Contains(outputZeroValue.String(), "response.InfosData") {
+		infosField := outputZeroValue.FieldByName("Infos")
+		if infosField.IsValid() && infosField.Type().Elem().Kind() == reflect.Pointer {
+			panic("bind的输出类型不能使用指针类型")
+		}
+	}
+
+	apiMiddlewares := make([]mqtt_api.Handler, len(middlewares))
+	for i, middleware := range middlewares {
+		innerMiddleware := middleware
+		apiMiddlewares[i] = func(c *mqtt_api.Context) {
+			innerMiddleware(c, binder.i)
+		}
+	}
+
+	handlers := append(apiMiddlewares, func(c *mqtt_api.Context) {
+		var params any
+
+		// 有请求数据
+		if item.RequestParams != nil {
+			requestParamsType := reflect.TypeOf(item.RequestParams)
+			if !reflectutils.IsTypeStructOrStructPointer(requestParamsType) {
+				err := errors.New("请求参数不是结构或结构指针")
+				logger.GetInstance().Error(err)
+				item.SendResponseFunc(c, responseIdentifier, outputZero, err)
+				return
+			}
+
+			if requestParamsType.Kind() == reflect.Pointer {
+				params = reflect.New(requestParamsType.Elem()).Interface()
+			} else {
+				params = reflect.New(requestParamsType).Interface()
+			}
+
+			// 将请求数据解析到请求参数中
+			err := request.BindingJson(c, params)
+			if err != nil {
+				logger.GetInstance().Error(err)
+				item.SendResponseFunc(c, responseIdentifier, outputZero, err)
+				return
+			}
+		}
+
+		if item.ResponseIdentifierFunc != nil {
+			newResponseIdentifier, err := item.ResponseIdentifierFunc(c, params)
+			if err != nil {
+				logger.GetInstance().Error(err)
+				item.SendResponseFunc(c, responseIdentifier, outputZero, err)
+				return
+			}
+
+			responseIdentifier = newResponseIdentifier
+		}
+
+		// 进行领域对象转化
+		var domainObjects []domain.Object
+		if item.FormDomainObjectsFunc != nil {
+			innerDomainObjects, err := item.FormDomainObjectsFunc(c, params)
+			if err != nil {
+				item.SendResponseFunc(c, responseIdentifier, outputZero, err)
+				return
+			}
+
+			domainObjects = innerDomainObjects
+		} else {
+			if item.Objects != nil && len(item.Objects) != 0 {
+				for _, object := range item.Objects {
+					if object == nil {
+						continue
+					}
+
+					objectType := reflect.TypeOf(object)
+					if !reflectutils.IsTypeStructOrStructPointer(objectType) {
+						item.SendResponseFunc(c, responseIdentifier, outputZero, errors.New("领域对象不是结构或结构指针"))
+						return
+					}
+
+					obj := reflect.New(reflectutils.PointerTypeElem(objectType)).Interface().(domain.Object)
+
+					if params != nil {
+						err := request.AssignRequestParamsToDomainObject(params, obj)
+						if err != nil {
+							item.SendResponseFunc(c, responseIdentifier, outputZero, err)
+							return
+						}
+					}
+
+					domainObjects = append(domainObjects, obj)
+				}
+			}
+		}
+
+		// 执行业务函数
+		outputModel, err := item.ServiceFunc(c, params, domainObjects, binder.i)
+
+		// 发送响应
+		item.SendResponseFunc(c, responseIdentifier, outputModel, err)
+		return
+	})
+
+	// 所有的函数加入到执行链中
+	err := binder.router.AddTopic(item.Topic, handlers...)
+	if err != nil {
+		panic("添加主题" + item.Topic + "失败")
+	}
+}

+ 24 - 0
framework/mqtt_binding/binder.go

@@ -0,0 +1,24 @@
+package mqtt_binding
+
+import (
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure"
+	"git.sxidc.com/go-framework/baize/framework/core/mqtt_api"
+)
+
+type Binder struct {
+	router *mqtt_api.Router
+	i      *infrastructure.Infrastructure
+}
+
+// NewBinder 构造Binder
+// 参数:
+// - router: 使用的路由
+// - i: 使用的基础设施
+// 返回值:
+// - 构造的binder
+func NewBinder(router *mqtt_api.Router, i *infrastructure.Infrastructure) *Binder {
+	return &Binder{
+		router: router,
+		i:      i,
+	}
+}

+ 9 - 0
framework/mqtt_binding/middleware.go

@@ -0,0 +1,9 @@
+package mqtt_binding
+
+import (
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure"
+	"git.sxidc.com/go-framework/baize/framework/core/mqtt_api"
+)
+
+// Middleware binder使用的中间件
+type Middleware func(c *mqtt_api.Context, i *infrastructure.Infrastructure)