mqtt_binding.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package mqtt_binding
  2. import (
  3. "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client"
  4. "git.sxidc.com/go-tools/api_binding/mqtt_binding/mqtt_client/router"
  5. "git.sxidc.com/go-tools/api_binding/mqtt_binding/request"
  6. "git.sxidc.com/go-tools/api_binding/mqtt_binding/response"
  7. "git.sxidc.com/go-tools/api_binding/utils"
  8. "reflect"
  9. )
  10. type BusinessFunc[I any, O any] func(c *mqtt_client.MqttClient, inputModel I) (O, error)
  11. type BindingFunc[O any] func(c *mqtt_client.MqttClient, item *router.Item, request any, sendFunc response.SendFunc[O]) bool
  12. type Binding struct {
  13. router *router.Router
  14. }
  15. // NewBinding 创建版本对应的binding
  16. func NewBinding(apiVersion string, handlers ...router.Handler) *Binding {
  17. var group string
  18. if utils.IsStringNotEmpty(apiVersion) && apiVersion != "root" {
  19. group = apiVersion
  20. }
  21. r := mqttClientInstance.GetRouter(group, handlers)
  22. return &Binding{router: r}
  23. }
  24. // BindItem 路由条目结构
  25. type BindItem[I any, O any] struct {
  26. Topic string // 请求路径
  27. Qos byte // QOS
  28. Retained bool // 是否保留响应
  29. ResponseFunc response.SendFunc[O] // 响应泛型函数
  30. BusinessFunc BusinessFunc[I, O] // 业务泛型函数
  31. OptionalBindingFunc BindingFunc[O] // 可选的绑定函数
  32. }
  33. func (item *BindItem[I, O]) bind(r *router.Router, handlers ...router.Handler) {
  34. if utils.IsStringEmpty(item.Topic) {
  35. panic("需要指定主题")
  36. }
  37. if item.ResponseFunc == nil {
  38. panic("需要指定响应函数")
  39. }
  40. var inputCheckModel I
  41. inputType := reflect.TypeOf(inputCheckModel)
  42. if inputType != nil {
  43. if inputType.Kind() == reflect.Pointer {
  44. panic("输入对象不能使用指针类型")
  45. }
  46. if inputType.Kind() != reflect.Struct {
  47. panic("输入对象必须是结构")
  48. }
  49. }
  50. // 给单个路由增加中间件
  51. handlers = append(handlers, func(routerItem *router.Item, data []byte) {
  52. var inputModel I
  53. // 请求的结构类型不为any
  54. if inputType != nil {
  55. // 将请求数据解析到inputModel中
  56. if item.OptionalBindingFunc != nil {
  57. ok := item.OptionalBindingFunc(mqttClientInstance, routerItem, &inputModel, item.ResponseFunc)
  58. if !ok {
  59. return
  60. }
  61. } else {
  62. ok := request.BindingJson(mqttClientInstance, routerItem, &inputModel, item.ResponseFunc)
  63. if !ok {
  64. return
  65. }
  66. }
  67. }
  68. // 执行业务函数
  69. if item.BusinessFunc != nil {
  70. outputModel, err := item.BusinessFunc(mqttClientInstance, inputModel)
  71. item.ResponseFunc(mqttClientInstance, routerItem, outputModel, err)
  72. return
  73. }
  74. })
  75. // 所有的函数加入到执行链中
  76. routerItem, err := router.NewItem(item.Topic, item.Qos, item.Retained)
  77. if err != nil {
  78. panic("创建路由条目失败: " + err.Error())
  79. return
  80. }
  81. err = r.AddItem(routerItem)
  82. if err != nil {
  83. panic("添加路由条目失败: " + err.Error())
  84. return
  85. }
  86. }