seq.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package flow
  2. import (
  3. "git.sxidc.com/go-tools/utils/pipeline/component"
  4. "github.com/mitchellh/mapstructure"
  5. "github.com/pkg/errors"
  6. )
  7. const (
  8. TypeSeq = "seq"
  9. )
  10. type SeqBuildParams struct {
  11. Components []SubComponentBuildParams `mapstructure:"components" structs:"components"`
  12. }
  13. func (params *SeqBuildParams) Check() error {
  14. if params.Components == nil || len(params.Components) == 0 {
  15. return errors.New(TypeSeq + "流没有包含任何组件")
  16. }
  17. subComponentMap := make(map[string]bool)
  18. for _, componentParams := range params.Components {
  19. err := componentParams.Check(TypeSeq)
  20. if err != nil {
  21. return err
  22. }
  23. _, ok := subComponentMap[componentParams.Name]
  24. if ok {
  25. return errors.New("该流程中存在同名的组件: " + componentParams.Name)
  26. }
  27. subComponentMap[componentParams.Name] = true
  28. }
  29. return nil
  30. }
  31. type Seq struct {
  32. component.BaseComponent
  33. Components []component.Component
  34. }
  35. func (f *Seq) Run(globalRunParams *component.GlobalRunParams, dynamicParams map[string]any) (any, error) {
  36. return f.OnRun(globalRunParams, dynamicParams,
  37. func(globalRunParams *component.GlobalRunParams, dynamicParams map[string]any) (any, error) {
  38. var result any
  39. for _, subComponent := range f.Components {
  40. var subDynamicParamsMap map[string]any
  41. subDynamicParams, ok := dynamicParams[subComponent.GetName()]
  42. if ok {
  43. innerSubDynamicParamsMap, ok := subDynamicParams.(map[string]any)
  44. if !ok {
  45. return nil, errors.New(TypeSeq + "流程" + f.GetName() + "的子组件" + subComponent.GetName() + "动态参数类型错误")
  46. }
  47. subDynamicParamsMap = innerSubDynamicParamsMap
  48. }
  49. innerResult, err := subComponent.Run(globalRunParams, subDynamicParamsMap)
  50. if err != nil {
  51. return nil, err
  52. }
  53. result = innerResult
  54. }
  55. return result, nil
  56. })
  57. }
  58. type SeqBuilder struct{}
  59. func (builder *SeqBuilder) ProductType() string {
  60. return TypeSeq
  61. }
  62. func (builder *SeqBuilder) Build(name string, buildParams map[string]any, runParams map[string]any) (component.Component, error) {
  63. if buildParams == nil || len(buildParams) == 0 {
  64. return nil, errors.New(TypeSeq + "流程没有传递构建参数")
  65. }
  66. flowBuildParams := new(SeqBuildParams)
  67. err := mapstructure.Decode(buildParams, flowBuildParams)
  68. if err != nil {
  69. return nil, errors.New(TypeSeq + "流程构建参数Decode失败: " + err.Error())
  70. }
  71. err = flowBuildParams.Check()
  72. if err != nil {
  73. return nil, err
  74. }
  75. subComponents := make([]component.Component, 0)
  76. for _, componentParams := range flowBuildParams.Components {
  77. subComponent, err := componentParams.BuildComponent()
  78. if err != nil {
  79. return nil, err
  80. }
  81. subComponents = append(subComponents, subComponent)
  82. }
  83. return &Seq{
  84. BaseComponent: *component.NewBaseComponent(TypeSeq, name, runParams),
  85. Components: subComponents,
  86. }, nil
  87. }