package mqtt_binding import ( "errors" "git.sxidc.com/go-tools/api_binding/mqtt_binding/middleware" "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client" "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router" "git.sxidc.com/go-tools/api_binding/mqtt_binding/request" "git.sxidc.com/go-tools/api_binding/mqtt_binding/response" "git.sxidc.com/go-tools/utils/strutils" "git.sxidc.com/service-supports/fslog" "reflect" "strings" ) var ErrIgnoreBusinessResult = errors.New("忽略业务结果") type BusinessFunc[I any, O any] func(c *mqtt_client.MqttClient, inputModel I) (O, error) type BindingFunc[O any] func(c *mqtt_client.MqttClient, item *router.Item, request any, sendFunc response.SendFunc[O]) bool type Binding struct { responseIdentifier response.Identifier router *router.Router } // NewBinding 创建版本对应的binding func NewBinding(apiVersion string, responseIdentifier response.Identifier, handlers ...router.Handler) *Binding { group := topicPrefix if strutils.IsStringNotEmpty(apiVersion) && apiVersion != "root" { group += "/" + apiVersion } handlers = append([]router.Handler{ middleware.Logger([]string{group + "/version"}), middleware.Recovery(), }, handlers...) r := mqttClientInstance.GetRouter(group, handlers) return &Binding{ responseIdentifier: responseIdentifier, router: r, } } func Bind[I any, O any](b *Binding, item *BindItem[I, O], handlers ...router.Handler) { item.bind(b.router, b.responseIdentifier, handlers...) } func (binding *Binding) AddHandler(topic string, handlers ...router.Handler) error { if strutils.IsStringEmpty(topic) { return errors.New("没有传递主题") } if handlers == nil || len(handlers) == 0 { return errors.New("没有传递处理函数") } err := binding.router.RangeItem(func(existItem *router.Item) error { if existItem.Topic == binding.router.Group+topic { return errors.New("主题已存在: " + existItem.Topic) } return nil }) if err != nil { if strings.Contains(err.Error(), "主题已存在: ") { fslog.Error(err) return nil } return err } return binding.router.AddTopic(binding.router.Group+topic, handlers...) } // BindItem 路由条目结构 type BindItem[I any, O any] struct { Topic string // 请求路径 ResponseIdentifier response.Identifier // 响应标识符提供接口,会在响应主题添加该标识符 ResponseFunc response.SendFunc[O] // 响应泛型函数 BusinessFunc BusinessFunc[I, O] // 业务泛型函数 OptionalBindingFunc BindingFunc[O] // 可选的绑定函数 } func (item *BindItem[I, O]) bind(r *router.Router, bindingResponseIdentifier response.Identifier, handlers ...router.Handler) { if strutils.IsStringEmpty(item.Topic) { panic("需要指定主题") } if item.ResponseFunc == nil { panic("需要指定响应函数") } err := r.RangeItem(func(existItem *router.Item) error { if existItem.Topic == r.Group+item.Topic { return errors.New("主题已存在: " + existItem.Topic) } return nil }) if err != nil { panic(err) } var inputCheckModel I inputType := reflect.TypeOf(inputCheckModel) if inputType != nil { if inputType.Kind() == reflect.Pointer { panic("输入对象不能使用指针类型") } if inputType.Kind() != reflect.Struct { panic("输入对象必须是结构") } } var responseIdentifier response.Identifier if item.ResponseIdentifier != nil { responseIdentifier = item.ResponseIdentifier.Copy() } else if bindingResponseIdentifier != nil { responseIdentifier = bindingResponseIdentifier.Copy() } // 给单个路由增加中间件 handlers = append(handlers, func(routerItem *router.Item, data []byte) { var inputModel I var req any if inputType != nil { req = &inputModel } // 请求的结构类型不为any if responseIdentifier != nil || req != nil { // 将请求数据解析到inputModel中 if item.OptionalBindingFunc != nil { ok := item.OptionalBindingFunc(mqttClientInstance, routerItem, req, item.ResponseFunc) if !ok { return } } else { ok := request.BindingJson(mqttClientInstance, routerItem, req, responseIdentifier, item.ResponseFunc) if !ok { return } } } // 执行业务函数 if item.BusinessFunc != nil { outputModel, err := item.BusinessFunc(mqttClientInstance, inputModel) if errors.Is(err, ErrIgnoreBusinessResult) { return } item.ResponseFunc(mqttClientInstance, routerItem, responseIdentifier, outputModel, err) return } }) // 所有的函数加入到执行链中 routerItem, err := router.NewItem(r.Group+item.Topic, handlers) if err != nil { panic("创建路由条目失败: " + err.Error()) return } err = r.AddItem(routerItem) if err != nil { panic("添加路由条目失败: " + err.Error()) return } }