mqtt_binding.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package mqtt_binding
  2. import (
  3. "errors"
  4. "git.sxidc.com/go-tools/api_binding/mqtt_binding/middleware"
  5. "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client"
  6. "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router"
  7. "git.sxidc.com/go-tools/api_binding/mqtt_binding/request"
  8. "git.sxidc.com/go-tools/api_binding/mqtt_binding/response"
  9. "git.sxidc.com/go-tools/utils/strutils"
  10. "git.sxidc.com/service-supports/fslog"
  11. "reflect"
  12. "strings"
  13. )
  14. var ErrIgnoreBusinessResult = errors.New("忽略业务结果")
  15. type BusinessFunc[I any, O any] func(c *mqtt_client.MqttClient, inputModel I) (O, error)
  16. type BindingFunc[O any] func(c *mqtt_client.MqttClient, item *router.Item, request any, sendFunc response.SendFunc[O]) bool
  17. type Binding struct {
  18. responseIdentifier response.Identifier
  19. router *router.Router
  20. }
  21. // NewBinding 创建版本对应的binding
  22. func NewBinding(apiVersion string, responseIdentifier response.Identifier, handlers ...router.Handler) *Binding {
  23. group := topicPrefix
  24. if strutils.IsStringNotEmpty(apiVersion) && apiVersion != "root" {
  25. group += "/" + apiVersion
  26. }
  27. handlers = append([]router.Handler{
  28. middleware.Logger([]string{group + "/version"}),
  29. middleware.Recovery(),
  30. }, handlers...)
  31. r := mqttClientInstance.GetRouter(group, handlers)
  32. return &Binding{
  33. responseIdentifier: responseIdentifier,
  34. router: r,
  35. }
  36. }
  37. func Bind[I any, O any](b *Binding, item *BindItem[I, O], handlers ...router.Handler) {
  38. item.bind(b.router, b.responseIdentifier, handlers...)
  39. }
  40. func (binding *Binding) AddHandler(topic string, handlers ...router.Handler) error {
  41. if strutils.IsStringEmpty(topic) {
  42. return errors.New("没有传递主题")
  43. }
  44. if handlers == nil || len(handlers) == 0 {
  45. return errors.New("没有传递处理函数")
  46. }
  47. err := binding.router.RangeItem(func(existItem *router.Item) error {
  48. if existItem.Topic == binding.router.Group+topic {
  49. return errors.New("主题已存在: " + existItem.Topic)
  50. }
  51. return nil
  52. })
  53. if err != nil {
  54. if strings.Contains(err.Error(), "主题已存在: ") {
  55. fslog.Error(err)
  56. return nil
  57. }
  58. return err
  59. }
  60. return binding.router.AddTopic(binding.router.Group+topic, handlers...)
  61. }
  62. // BindItem 路由条目结构
  63. type BindItem[I any, O any] struct {
  64. Topic string // 请求路径
  65. ResponseIdentifier response.Identifier // 响应标识符提供接口,会在响应主题添加该标识符
  66. ResponseFunc response.SendFunc[O] // 响应泛型函数
  67. BusinessFunc BusinessFunc[I, O] // 业务泛型函数
  68. OptionalBindingFunc BindingFunc[O] // 可选的绑定函数
  69. }
  70. func (item *BindItem[I, O]) bind(r *router.Router, bindingResponseIdentifier response.Identifier, handlers ...router.Handler) {
  71. if strutils.IsStringEmpty(item.Topic) {
  72. panic("需要指定主题")
  73. }
  74. if item.ResponseFunc == nil {
  75. panic("需要指定响应函数")
  76. }
  77. err := r.RangeItem(func(existItem *router.Item) error {
  78. if existItem.Topic == r.Group+item.Topic {
  79. return errors.New("主题已存在: " + existItem.Topic)
  80. }
  81. return nil
  82. })
  83. if err != nil {
  84. panic(err)
  85. }
  86. var inputCheckModel I
  87. inputType := reflect.TypeOf(inputCheckModel)
  88. if inputType != nil {
  89. if inputType.Kind() == reflect.Pointer {
  90. panic("输入对象不能使用指针类型")
  91. }
  92. if inputType.Kind() != reflect.Struct {
  93. panic("输入对象必须是结构")
  94. }
  95. }
  96. var responseIdentifier response.Identifier
  97. if item.ResponseIdentifier != nil {
  98. responseIdentifier = item.ResponseIdentifier.Copy()
  99. } else if bindingResponseIdentifier != nil {
  100. responseIdentifier = bindingResponseIdentifier.Copy()
  101. }
  102. // 给单个路由增加中间件
  103. handlers = append(handlers, func(routerItem *router.Item, data []byte) {
  104. var inputModel I
  105. var req any
  106. if inputType != nil {
  107. req = &inputModel
  108. }
  109. // 请求的结构类型不为any
  110. if responseIdentifier != nil || req != nil {
  111. // 将请求数据解析到inputModel中
  112. if item.OptionalBindingFunc != nil {
  113. ok := item.OptionalBindingFunc(mqttClientInstance, routerItem, req, item.ResponseFunc)
  114. if !ok {
  115. return
  116. }
  117. } else {
  118. ok := request.BindingJson(mqttClientInstance, routerItem, req, responseIdentifier, item.ResponseFunc)
  119. if !ok {
  120. return
  121. }
  122. }
  123. }
  124. // 执行业务函数
  125. if item.BusinessFunc != nil {
  126. outputModel, err := item.BusinessFunc(mqttClientInstance, inputModel)
  127. if errors.Is(err, ErrIgnoreBusinessResult) {
  128. return
  129. }
  130. item.ResponseFunc(mqttClientInstance, routerItem, responseIdentifier, outputModel, err)
  131. return
  132. }
  133. })
  134. // 所有的函数加入到执行链中
  135. routerItem, err := router.NewItem(r.Group+item.Topic, handlers)
  136. if err != nil {
  137. panic("创建路由条目失败: " + err.Error())
  138. return
  139. }
  140. err = r.AddItem(routerItem)
  141. if err != nil {
  142. panic("添加路由条目失败: " + err.Error())
  143. return
  144. }
  145. }