bi.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package flow
  2. import (
  3. "errors"
  4. "git.sxidc.com/go-tools/utils/pipeline/component"
  5. "github.com/mitchellh/mapstructure"
  6. )
  7. const (
  8. TypeBi = "bi"
  9. )
  10. const (
  11. BiLeft = "bi_left"
  12. BiRight = "bi_right"
  13. )
  14. type BiBuildParams struct {
  15. Components []SubComponentBuildParams `mapstructure:"components" structs:"components"`
  16. }
  17. func (params *BiBuildParams) Check() error {
  18. if params.Components == nil || len(params.Components) == 0 {
  19. return errors.New(TypeBi + "流没有包含任何组件")
  20. }
  21. subComponentMap := make(map[string]bool)
  22. for _, componentParams := range params.Components {
  23. err := componentParams.Check(TypeBi)
  24. if err != nil {
  25. return err
  26. }
  27. _, ok := subComponentMap[componentParams.Name]
  28. if ok {
  29. return errors.New("该流程中存在同名的组件: " + componentParams.Name)
  30. }
  31. subComponentMap[componentParams.Name] = true
  32. }
  33. return nil
  34. }
  35. type BiRunParams struct {
  36. IsBi bool `mapstructure:"is_bi" structs:"is_bi"`
  37. LeftParams any `mapstructure:"left_params" structs:"left_params"`
  38. RightParams any `mapstructure:"right_params" structs:"right_params"`
  39. }
  40. func (params *BiRunParams) Check() error {
  41. if params.LeftParams == nil {
  42. return errors.New(TypeBi + "没有传递左参数")
  43. }
  44. if params.LeftParams == nil {
  45. return errors.New(TypeBi + "没有传递右参数")
  46. }
  47. return nil
  48. }
  49. type Bi struct {
  50. component.BaseComponent
  51. Components []component.Component
  52. }
  53. func (f *Bi) Run(globalRunParams *component.GlobalRunParams, dynamicParams map[string]any) (any, error) {
  54. return f.OnRun(globalRunParams, dynamicParams,
  55. func(globalRunParams *component.GlobalRunParams, dynamicParams map[string]any) (any, error) {
  56. params := new(BiRunParams)
  57. err := mapstructure.Decode(dynamicParams, params)
  58. if err != nil {
  59. return nil, errors.New(TypeBi + "流程运行时参数Decode失败: " + err.Error())
  60. }
  61. err = params.Check()
  62. if err != nil {
  63. return nil, err
  64. }
  65. result, err := f.runSubComponents(globalRunParams, dynamicParams, params.LeftParams, params.RightParams)
  66. if err != nil {
  67. return nil, err
  68. }
  69. if !params.IsBi {
  70. return result, nil
  71. }
  72. result, err = f.runSubComponents(globalRunParams, dynamicParams, params.RightParams, params.LeftParams)
  73. if err != nil {
  74. return nil, err
  75. }
  76. return result, nil
  77. })
  78. }
  79. func (f *Bi) runSubComponents(globalRunParams *component.GlobalRunParams, dynamicParams map[string]any,
  80. leftParams any, rightParams any) (any, error) {
  81. var result any
  82. for _, subComponent := range f.Components {
  83. subDynamicParamsMap := make(map[string]any)
  84. subDynamicParams, ok := dynamicParams[subComponent.GetName()]
  85. if ok {
  86. innerSubDynamicParamsMap, ok := subDynamicParams.(map[string]any)
  87. if !ok {
  88. return nil, errors.New(TypeBi + "流程" + f.GetName() + "的子组件" + subComponent.GetName() + "动态参数类型错误")
  89. }
  90. subDynamicParamsMap = innerSubDynamicParamsMap
  91. }
  92. subDynamicParamsMap[BiLeft] = leftParams
  93. subDynamicParamsMap[BiRight] = rightParams
  94. innerResult, err := subComponent.Run(globalRunParams, subDynamicParamsMap)
  95. if err != nil {
  96. return nil, err
  97. }
  98. result = innerResult
  99. }
  100. return result, nil
  101. }
  102. type BiBuilder struct{}
  103. func (builder *BiBuilder) ProductType() string {
  104. return TypeBi
  105. }
  106. func (builder *BiBuilder) Build(name string, buildParams map[string]any, runParams map[string]any) (component.Component, error) {
  107. if buildParams == nil || len(buildParams) == 0 {
  108. return nil, errors.New(TypeBi + "流程没有传递构建参数")
  109. }
  110. flowBuildParams := new(BiBuildParams)
  111. err := mapstructure.Decode(buildParams, flowBuildParams)
  112. if err != nil {
  113. return nil, errors.New(TypeBi + "流程构建参数Decode失败: " + err.Error())
  114. }
  115. err = flowBuildParams.Check()
  116. if err != nil {
  117. return nil, err
  118. }
  119. subComponents := make([]component.Component, 0)
  120. for _, componentParams := range flowBuildParams.Components {
  121. subComponent, err := componentParams.BuildComponent()
  122. if err != nil {
  123. return nil, err
  124. }
  125. subComponents = append(subComponents, subComponent)
  126. }
  127. return &Bi{
  128. BaseComponent: *component.NewBaseComponent(TypeBi, name, runParams),
  129. Components: subComponents,
  130. }, nil
  131. }