123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- package mqtt_binding
- import (
- "errors"
- "git.sxidc.com/go-tools/api_binding/mqtt_binding/middleware"
- "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client"
- "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router"
- "git.sxidc.com/go-tools/api_binding/mqtt_binding/request"
- "git.sxidc.com/go-tools/api_binding/mqtt_binding/response"
- "git.sxidc.com/go-tools/utils/strutils"
- "git.sxidc.com/service-supports/fslog"
- "reflect"
- "strings"
- )
- var ErrIgnoreBusinessResult = errors.New("忽略业务结果")
- type BusinessFunc[I any, O any] func(c *mqtt_client.MqttClient, inputModel I) (O, error)
- type BindingFunc[O any] func(c *mqtt_client.MqttClient, item *router.Item, request any, sendFunc response.SendFunc[O]) bool
- type Binding struct {
- responseIdentifier response.Identifier
- router *router.Router
- }
- // NewBinding 创建版本对应的binding
- func NewBinding(apiVersion string, responseIdentifier response.Identifier, handlers ...router.Handler) *Binding {
- group := topicPrefix
- if strutils.IsStringNotEmpty(apiVersion) && apiVersion != "root" {
- group += "/" + apiVersion
- }
- handlers = append([]router.Handler{
- middleware.Logger([]string{group + "/version"}),
- middleware.Recovery(),
- }, handlers...)
- r := mqttClientInstance.GetRouter(group, handlers)
- return &Binding{
- responseIdentifier: responseIdentifier,
- router: r,
- }
- }
- func Bind[I any, O any](b *Binding, item *BindItem[I, O], handlers ...router.Handler) {
- item.bind(b.router, b.responseIdentifier, handlers...)
- }
- func (binding *Binding) AddHandler(topic string, handlers ...router.Handler) error {
- if strutils.IsStringEmpty(topic) {
- return errors.New("没有传递主题")
- }
- if handlers == nil || len(handlers) == 0 {
- return errors.New("没有传递处理函数")
- }
- err := binding.router.RangeItem(func(existItem *router.Item) error {
- if existItem.Topic == binding.router.Group+topic {
- return errors.New("主题已存在: " + existItem.Topic)
- }
- return nil
- })
- if err != nil {
- if strings.Contains(err.Error(), "主题已存在: ") {
- fslog.Error(err)
- return nil
- }
- return err
- }
- return binding.router.AddTopic(binding.router.Group+topic, handlers...)
- }
- // BindItem 路由条目结构
- type BindItem[I any, O any] struct {
- Topic string // 请求路径
- ResponseIdentifier response.Identifier // 响应标识符提供接口,会在响应主题添加该标识符
- ResponseFunc response.SendFunc[O] // 响应泛型函数
- BusinessFunc BusinessFunc[I, O] // 业务泛型函数
- OptionalBindingFunc BindingFunc[O] // 可选的绑定函数
- }
- func (item *BindItem[I, O]) bind(r *router.Router, bindingResponseIdentifier response.Identifier, handlers ...router.Handler) {
- if strutils.IsStringEmpty(item.Topic) {
- panic("需要指定主题")
- }
- if item.ResponseFunc == nil {
- panic("需要指定响应函数")
- }
- err := r.RangeItem(func(existItem *router.Item) error {
- if existItem.Topic == r.Group+item.Topic {
- return errors.New("主题已存在: " + existItem.Topic)
- }
- return nil
- })
- if err != nil {
- panic(err)
- }
- var inputCheckModel I
- inputType := reflect.TypeOf(inputCheckModel)
- if inputType != nil {
- if inputType.Kind() == reflect.Pointer {
- panic("输入对象不能使用指针类型")
- }
- if inputType.Kind() != reflect.Struct {
- panic("输入对象必须是结构")
- }
- }
- var responseIdentifier response.Identifier
- if item.ResponseIdentifier != nil {
- responseIdentifier = item.ResponseIdentifier.Copy()
- } else if bindingResponseIdentifier != nil {
- responseIdentifier = bindingResponseIdentifier.Copy()
- }
- // 给单个路由增加中间件
- handlers = append(handlers, func(routerItem *router.Item, data []byte) {
- var inputModel I
- var req any
- if inputType != nil {
- req = &inputModel
- }
- // 请求的结构类型不为any
- if responseIdentifier != nil || req != nil {
- // 将请求数据解析到inputModel中
- if item.OptionalBindingFunc != nil {
- ok := item.OptionalBindingFunc(mqttClientInstance, routerItem, req, item.ResponseFunc)
- if !ok {
- return
- }
- } else {
- ok := request.BindingJson(mqttClientInstance, routerItem, req, responseIdentifier, item.ResponseFunc)
- if !ok {
- return
- }
- }
- }
- // 执行业务函数
- if item.BusinessFunc != nil {
- outputModel, err := item.BusinessFunc(mqttClientInstance, inputModel)
- if errors.Is(err, ErrIgnoreBusinessResult) {
- return
- }
- item.ResponseFunc(mqttClientInstance, routerItem, responseIdentifier, outputModel, err)
- return
- }
- })
- // 所有的函数加入到执行链中
- routerItem, err := router.NewItem(r.Group+item.Topic, handlers)
- if err != nil {
- panic("创建路由条目失败: " + err.Error())
- return
- }
- err = r.AddItem(routerItem)
- if err != nil {
- panic("添加路由条目失败: " + err.Error())
- return
- }
- }
|