package mqtt_binding import ( "git.sxidc.com/go-framework/baize/framework/core/domain" "git.sxidc.com/go-framework/baize/framework/core/infrastructure" "git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger" "git.sxidc.com/go-framework/baize/framework/core/mqtt_api" "git.sxidc.com/go-framework/baize/framework/core/mqtt_api/mqtt_request" "git.sxidc.com/go-framework/baize/framework/core/mqtt_api/mqtt_response" "git.sxidc.com/go-tools/utils/reflectutils" "git.sxidc.com/go-tools/utils/strutils" "github.com/pkg/errors" "reflect" "strings" ) // ResponseIdentifierFunc 获取响应标识符的函数 // 参数: // - c: 上下文 // - params: 请求参数 // - objects: 基于请求参数构造出的领域对象 // - i: 基础设施 // 返回值: // - 响应标识符 // - 错误 type ResponseIdentifierFunc func(c *mqtt_api.Context, params any) (string, error) // FormDomainObjectsFunc 构造领域对象函数类型 // 参数: // - c: 上下文 // - params: 请求参数 // 返回值: // - 基于请求参数构造的领域对象,回传到ServiceFunc的objects参数 // - 错误 type FormDomainObjectsFunc func(c *mqtt_api.Context, params any) ([]domain.Object, error) // ServiceFunc 服务函数(业务逻辑函数) // 类型参数: // - O: 响应数据类型,对应response.SendResponseFunc的data的类型 // 参数: // - c: 上下文 // - params: 请求参数 // - objects: 基于请求参数构造出的领域对象 // - i: 基础设施 // 返回值: // - 响应数据 // - 错误 type ServiceFunc[O any] func(c *mqtt_api.Context, params any, objects []domain.Object, i *infrastructure.Infrastructure) (O, error) // Bind 通用绑定 // 类型参数: // - O: 响应数据类型,对应response.SendResponseFunc的data的类型 // 参数: // - binder: 用来执行绑定的binder // - item: 执行绑定的参数 // - responseIdentifier: 全局响应标识符 // - middlewares: 该绑定的中间件 func Bind[O any](binder *Binder, item *BindItem[O], middlewares ...Middleware) { item.bind(binder, middlewares...) } // BindItem 通用BindItem type BindItem[O any] struct { // 相对主题 Topic string // 响应泛型函数,如果不响应,需要使用NoResponse零值占位 SendResponseFunc mqtt_response.SendResponseFunc[O] // 使用的请求参数,非必传,当请求参数为nil时,说明该接口没有参数 RequestParams any // 获取响应标识符的函数 ResponseIdentifierFunc ResponseIdentifierFunc // 通过请求参数构造使用的领域对象,之后在ServiceFunc中会按照构造实体的顺序进行回调 // 非必传,如果该字段为nil,则说明没有领域对象 // 与Objects字段二选一使用,如果都指定,会按照该字段处理 FormDomainObjectsFunc FormDomainObjectsFunc // 使用的领域对象,当使用Tag对实体进行标注后,可以直接通过该字段给定实体,之后在ServiceFunc中会按照给定实体的顺序进行回调 // 非必传,如果为nil或长度为0,则说明没有领域对象 // 与FormObjectsFunc字段二选一使用,如果都指定,会按照FormObjectsFunc字段处理 Objects []domain.Object // 应用服务泛型函数 ServiceFunc ServiceFunc[O] } func (item *BindItem[O]) bind(binder *Binder, middlewares ...Middleware) { if strutils.IsStringEmpty(item.Topic) { panic("需要指定主题") } if item.SendResponseFunc == nil { panic("需要指定响应函数") } if item.ServiceFunc == nil { panic("需要指定应用服务函数") } var outputZero O outputZeroValue := reflect.ValueOf(outputZero) if outputZeroValue.IsValid() && outputZeroValue.Kind() == reflect.Pointer { panic("bind的输出类型不能使用指针类型") } if outputZeroValue.IsValid() && strings.Contains(outputZeroValue.String(), "mqtt_response.InfosData") { infosField := outputZeroValue.FieldByName("Infos") if infosField.IsValid() && infosField.Type().Elem().Kind() == reflect.Pointer { panic("bind的输出类型不能使用指针类型") } } apiMiddlewares := make([]mqtt_api.Handler, len(middlewares)) for i, middleware := range middlewares { innerMiddleware := middleware apiMiddlewares[i] = func(c *mqtt_api.Context) { innerMiddleware(c, binder.i) } } handlers := append(apiMiddlewares, func(c *mqtt_api.Context) { var params any var responseIdentifier string // 有请求数据 if item.RequestParams != nil { requestParamsType := reflect.TypeOf(item.RequestParams) if !reflectutils.IsTypeStructOrStructPointer(requestParamsType) { err := errors.New("请求参数不是结构或结构指针") logger.GetInstance().Error(err) item.SendResponseFunc(c, responseIdentifier, outputZero, err) return } if requestParamsType.Kind() == reflect.Pointer { params = reflect.New(requestParamsType.Elem()).Interface() } else { params = reflect.New(requestParamsType).Interface() } // 将请求数据解析到请求参数中 err := mqtt_request.BindingJson(c, params) if err != nil { logger.GetInstance().Error(err) item.SendResponseFunc(c, responseIdentifier, outputZero, err) return } } if item.ResponseIdentifierFunc != nil { newResponseIdentifier, err := item.ResponseIdentifierFunc(c, params) if err != nil { logger.GetInstance().Error(err) item.SendResponseFunc(c, responseIdentifier, outputZero, err) return } responseIdentifier = newResponseIdentifier } // 进行领域对象转化 var domainObjects []domain.Object if item.FormDomainObjectsFunc != nil { innerDomainObjects, err := item.FormDomainObjectsFunc(c, params) if err != nil { item.SendResponseFunc(c, responseIdentifier, outputZero, err) return } domainObjects = innerDomainObjects } else { if item.Objects != nil && len(item.Objects) != 0 { for _, object := range item.Objects { if object == nil { continue } objectType := reflect.TypeOf(object) if !reflectutils.IsTypeStructOrStructPointer(objectType) { item.SendResponseFunc(c, responseIdentifier, outputZero, errors.New("领域对象不是结构或结构指针")) return } obj := reflect.New(reflectutils.PointerTypeElem(objectType)).Interface().(domain.Object) if params != nil { err := mqtt_request.AssignRequestParamsToDomainObject(params, obj) if err != nil { item.SendResponseFunc(c, responseIdentifier, outputZero, err) return } } domainObjects = append(domainObjects, obj) } } } // 执行业务函数 outputModel, err := item.ServiceFunc(c, params, domainObjects, binder.i) // 发送响应 item.SendResponseFunc(c, responseIdentifier, outputModel, err) return }) // 所有的函数加入到执行链中 err := binder.router.AddTopic(item.Topic, handlers...) if err != nil { panic("添加主题" + item.Topic + "失败") } }