mqtt_binding.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package mqtt_binding
  2. import (
  3. "errors"
  4. "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client"
  5. "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router"
  6. "git.sxidc.com/go-tools/api_binding/mqtt_binding/request"
  7. "git.sxidc.com/go-tools/api_binding/mqtt_binding/response"
  8. "git.sxidc.com/go-tools/api_binding/utils"
  9. "reflect"
  10. )
  11. type BusinessFunc[I any, O any] func(c *mqtt_client.MqttClient, inputModel I) (O, error)
  12. type BindingFunc[O any] func(c *mqtt_client.MqttClient, item *router.Item, request any, sendFunc response.SendFunc[O]) bool
  13. type Binding struct {
  14. responseIdentifier response.Identifier
  15. router *router.Router
  16. }
  17. // NewBinding 创建版本对应的binding
  18. func NewBinding(apiVersion string, responseIdentifier response.Identifier, handlers ...router.Handler) *Binding {
  19. group := topicPrefix
  20. if utils.IsStringNotEmpty(apiVersion) && apiVersion != "root" {
  21. group += "/" + apiVersion
  22. }
  23. r := mqttClientInstance.GetRouter(group, handlers)
  24. return &Binding{
  25. responseIdentifier: responseIdentifier,
  26. router: r,
  27. }
  28. }
  29. func Bind[I any, O any](b *Binding, item *BindItem[I, O], handlers ...router.Handler) {
  30. item.bind(b.router, b.responseIdentifier, handlers...)
  31. }
  32. func (binding *Binding) AddHandler(topic string, handlers ...router.Handler) error {
  33. if utils.IsStringEmpty(topic) {
  34. return errors.New("没有传递主题")
  35. }
  36. if handlers == nil || len(handlers) == 0 {
  37. return errors.New("没有传递处理函数")
  38. }
  39. return binding.router.AddTopic(binding.router.Group+topic, handlers...)
  40. }
  41. // BindItem 路由条目结构
  42. type BindItem[I any, O any] struct {
  43. Topic string // 请求路径
  44. ResponseIdentifier response.Identifier // 响应标识符提供接口,会在响应主题添加该标识符
  45. ResponseFunc response.SendFunc[O] // 响应泛型函数
  46. BusinessFunc BusinessFunc[I, O] // 业务泛型函数
  47. OptionalBindingFunc BindingFunc[O] // 可选的绑定函数
  48. }
  49. func (item *BindItem[I, O]) bind(r *router.Router, bindingResponseIdentifier response.Identifier, handlers ...router.Handler) {
  50. if utils.IsStringEmpty(item.Topic) {
  51. panic("需要指定主题")
  52. }
  53. if item.ResponseFunc == nil {
  54. panic("需要指定响应函数")
  55. }
  56. var inputCheckModel I
  57. inputType := reflect.TypeOf(inputCheckModel)
  58. if inputType != nil {
  59. if inputType.Kind() == reflect.Pointer {
  60. panic("输入对象不能使用指针类型")
  61. }
  62. if inputType.Kind() != reflect.Struct {
  63. panic("输入对象必须是结构")
  64. }
  65. }
  66. // 给单个路由增加中间件
  67. handlers = append(handlers, func(routerItem *router.Item, data []byte) {
  68. var responseIdentifier response.Identifier
  69. if item.ResponseIdentifier != nil {
  70. responseIdentifier = item.ResponseIdentifier
  71. } else if bindingResponseIdentifier != nil {
  72. responseIdentifier = bindingResponseIdentifier
  73. }
  74. var inputModel I
  75. // 请求的结构类型不为any
  76. if responseIdentifier != nil || inputType != nil {
  77. // 将请求数据解析到inputModel中
  78. if item.OptionalBindingFunc != nil {
  79. ok := item.OptionalBindingFunc(mqttClientInstance, routerItem, &inputModel, item.ResponseFunc)
  80. if !ok {
  81. return
  82. }
  83. } else {
  84. ok := request.BindingJson(mqttClientInstance, routerItem, &inputModel, responseIdentifier, item.ResponseFunc)
  85. if !ok {
  86. return
  87. }
  88. }
  89. }
  90. // 执行业务函数
  91. if item.BusinessFunc != nil {
  92. outputModel, err := item.BusinessFunc(mqttClientInstance, inputModel)
  93. item.ResponseFunc(mqttClientInstance, routerItem, responseIdentifier, outputModel, err)
  94. return
  95. }
  96. })
  97. // 所有的函数加入到执行链中
  98. routerItem, err := router.NewItem(r.Group+item.Topic, handlers)
  99. if err != nil {
  100. panic("创建路由条目失败: " + err.Error())
  101. return
  102. }
  103. err = r.AddItem(routerItem)
  104. if err != nil {
  105. panic("添加路由条目失败: " + err.Error())
  106. return
  107. }
  108. }