mqtt_binding.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package mqtt_binding
  2. import (
  3. "errors"
  4. "git.sxidc.com/go-tools/api_binding/mqtt_binding/middleware"
  5. "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client"
  6. "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router"
  7. "git.sxidc.com/go-tools/api_binding/mqtt_binding/request"
  8. "git.sxidc.com/go-tools/api_binding/mqtt_binding/response"
  9. "git.sxidc.com/go-tools/api_binding/utils"
  10. "reflect"
  11. )
  12. type BusinessFunc[I any, O any] func(c *mqtt_client.MqttClient, inputModel I) (O, error)
  13. type BindingFunc[O any] func(c *mqtt_client.MqttClient, item *router.Item, request any, sendFunc response.SendFunc[O]) bool
  14. type Binding struct {
  15. responseIdentifier response.Identifier
  16. router *router.Router
  17. }
  18. // NewBinding 创建版本对应的binding
  19. func NewBinding(apiVersion string, responseIdentifier response.Identifier, handlers ...router.Handler) *Binding {
  20. group := topicPrefix
  21. if utils.IsStringNotEmpty(apiVersion) && apiVersion != "root" {
  22. group += "/" + apiVersion
  23. }
  24. handlers = append([]router.Handler{
  25. middleware.Logger([]string{group + "/version"}),
  26. middleware.Recovery(),
  27. }, handlers...)
  28. r := mqttClientInstance.GetRouter(group, handlers)
  29. return &Binding{
  30. responseIdentifier: responseIdentifier,
  31. router: r,
  32. }
  33. }
  34. func Bind[I any, O any](b *Binding, item *BindItem[I, O], handlers ...router.Handler) {
  35. item.bind(b.router, b.responseIdentifier, handlers...)
  36. }
  37. func (binding *Binding) AddHandler(topic string, handlers ...router.Handler) error {
  38. if utils.IsStringEmpty(topic) {
  39. return errors.New("没有传递主题")
  40. }
  41. if handlers == nil || len(handlers) == 0 {
  42. return errors.New("没有传递处理函数")
  43. }
  44. return binding.router.AddTopic(binding.router.Group+topic, handlers...)
  45. }
  46. // BindItem 路由条目结构
  47. type BindItem[I any, O any] struct {
  48. Topic string // 请求路径
  49. ResponseIdentifier response.Identifier // 响应标识符提供接口,会在响应主题添加该标识符
  50. ResponseFunc response.SendFunc[O] // 响应泛型函数
  51. BusinessFunc BusinessFunc[I, O] // 业务泛型函数
  52. OptionalBindingFunc BindingFunc[O] // 可选的绑定函数
  53. }
  54. func (item *BindItem[I, O]) bind(r *router.Router, bindingResponseIdentifier response.Identifier, handlers ...router.Handler) {
  55. if utils.IsStringEmpty(item.Topic) {
  56. panic("需要指定主题")
  57. }
  58. if item.ResponseFunc == nil {
  59. panic("需要指定响应函数")
  60. }
  61. var inputCheckModel I
  62. inputType := reflect.TypeOf(inputCheckModel)
  63. if inputType != nil {
  64. if inputType.Kind() == reflect.Pointer {
  65. panic("输入对象不能使用指针类型")
  66. }
  67. if inputType.Kind() != reflect.Struct {
  68. panic("输入对象必须是结构")
  69. }
  70. }
  71. var responseIdentifier response.Identifier
  72. if item.ResponseIdentifier != nil {
  73. responseIdentifier = item.ResponseIdentifier.Copy()
  74. } else if bindingResponseIdentifier != nil {
  75. responseIdentifier = bindingResponseIdentifier.Copy()
  76. }
  77. // 给单个路由增加中间件
  78. handlers = append(handlers, func(routerItem *router.Item, data []byte) {
  79. var inputModel I
  80. var req any
  81. if inputType != nil {
  82. req = &inputModel
  83. }
  84. // 请求的结构类型不为any
  85. if responseIdentifier != nil || req != nil {
  86. // 将请求数据解析到inputModel中
  87. if item.OptionalBindingFunc != nil {
  88. ok := item.OptionalBindingFunc(mqttClientInstance, routerItem, req, item.ResponseFunc)
  89. if !ok {
  90. return
  91. }
  92. } else {
  93. ok := request.BindingJson(mqttClientInstance, routerItem, req, responseIdentifier, item.ResponseFunc)
  94. if !ok {
  95. return
  96. }
  97. }
  98. }
  99. // 执行业务函数
  100. if item.BusinessFunc != nil {
  101. outputModel, err := item.BusinessFunc(mqttClientInstance, inputModel)
  102. item.ResponseFunc(mqttClientInstance, routerItem, responseIdentifier, outputModel, err)
  103. return
  104. }
  105. })
  106. // 所有的函数加入到执行链中
  107. routerItem, err := router.NewItem(r.Group+item.Topic, handlers)
  108. if err != nil {
  109. panic("创建路由条目失败: " + err.Error())
  110. return
  111. }
  112. err = r.AddItem(routerItem)
  113. if err != nil {
  114. panic("添加路由条目失败: " + err.Error())
  115. return
  116. }
  117. }