package mqtt_binding import ( "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client" "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router" "git.sxidc.com/go-tools/api_binding/mqtt_binding/request" "git.sxidc.com/go-tools/api_binding/mqtt_binding/response" "git.sxidc.com/go-tools/api_binding/utils" "reflect" ) type BusinessFunc[I any, O any] func(c *mqtt_client.MqttClient, inputModel I) (O, error) type BindingFunc[O any] func(c *mqtt_client.MqttClient, item *router.Item, request any, sendFunc response.SendFunc[O]) bool type Binding struct { router *router.Router } // NewBinding 创建版本对应的binding func NewBinding(apiVersion string, handlers ...router.Handler) *Binding { var group string if utils.IsStringNotEmpty(apiVersion) && apiVersion != "root" { group = apiVersion } r := mqttClientInstance.GetRouter(group, handlers) return &Binding{router: r} } func Bind[I any, O any](b *Binding, item *BindItem[I, O], handlers ...router.Handler) { item.bind(b.router, handlers...) } // BindItem 路由条目结构 type BindItem[I any, O any] struct { Topic string // 请求路径 Qos byte // QOS Retained bool // 是否保留响应 ResponseFunc response.SendFunc[O] // 响应泛型函数 BusinessFunc BusinessFunc[I, O] // 业务泛型函数 OptionalBindingFunc BindingFunc[O] // 可选的绑定函数 } func (item *BindItem[I, O]) bind(r *router.Router, handlers ...router.Handler) { if utils.IsStringEmpty(item.Topic) { panic("需要指定主题") } if item.ResponseFunc == nil { panic("需要指定响应函数") } var inputCheckModel I inputType := reflect.TypeOf(inputCheckModel) if inputType != nil { if inputType.Kind() == reflect.Pointer { panic("输入对象不能使用指针类型") } if inputType.Kind() != reflect.Struct { panic("输入对象必须是结构") } } // 给单个路由增加中间件 handlers = append(handlers, func(routerItem *router.Item, data []byte) { var inputModel I // 请求的结构类型不为any if inputType != nil { // 将请求数据解析到inputModel中 if item.OptionalBindingFunc != nil { ok := item.OptionalBindingFunc(mqttClientInstance, routerItem, &inputModel, item.ResponseFunc) if !ok { return } } else { ok := request.BindingJson(mqttClientInstance, routerItem, &inputModel, item.ResponseFunc) if !ok { return } } } // 执行业务函数 if item.BusinessFunc != nil { outputModel, err := item.BusinessFunc(mqttClientInstance, inputModel) item.ResponseFunc(mqttClientInstance, routerItem, outputModel, err) return } }) // 所有的函数加入到执行链中 routerItem, err := router.NewItem(r.Group+item.Topic, item.Qos, handlers) if err != nil { panic("创建路由条目失败: " + err.Error()) return } err = r.AddItem(routerItem) if err != nil { panic("添加路由条目失败: " + err.Error()) return } }