definition.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package pipeline
  2. import (
  3. "git.sxidc.com/go-tools/utils/pipeline/component"
  4. "git.sxidc.com/go-tools/utils/pipeline/component/flow"
  5. "git.sxidc.com/go-tools/utils/strutils"
  6. "github.com/fatih/structs"
  7. "github.com/pkg/errors"
  8. )
  9. type Definition struct {
  10. Name string `yaml:"name"`
  11. Components []ComponentDefinition `yaml:"components"`
  12. }
  13. func (def *Definition) Check() error {
  14. if strutils.IsStringEmpty(def.Name) {
  15. return errors.New("流水线没有传递名称")
  16. }
  17. if def.Components == nil || len(def.Components) == 0 {
  18. return errors.New("流水线没有添加组件")
  19. }
  20. for _, c := range def.Components {
  21. err := c.Check()
  22. if err != nil {
  23. return err
  24. }
  25. }
  26. return nil
  27. }
  28. type ComponentDefinition struct {
  29. Type string `yaml:"type"`
  30. Name string `yaml:"name"`
  31. BuildParams map[string]any `yaml:"build_params"`
  32. RunParams map[string]any `yaml:"run_params"`
  33. }
  34. func (def *ComponentDefinition) Check() error {
  35. if strutils.IsStringEmpty(def.Type) {
  36. return errors.New("组件没有传递类型")
  37. }
  38. if strutils.IsStringEmpty(def.Name) {
  39. return errors.New("组件没有传递名称")
  40. }
  41. return nil
  42. }
  43. func (def *Definition) NewPipeline() (*Pipeline, error) {
  44. err := def.Check()
  45. if err != nil {
  46. return nil, err
  47. }
  48. subComponentsParams := make([]flow.SubComponentBuildParams, 0)
  49. for _, subDef := range def.Components {
  50. subComponentsParams = append(subComponentsParams, flow.SubComponentBuildParams{
  51. Type: subDef.Type,
  52. Name: subDef.Name,
  53. BuildParams: subDef.BuildParams,
  54. RunParams: subDef.RunParams,
  55. })
  56. }
  57. pipeline, err := newPipelineWithBuildParams(def.Name, &flow.SeqBuildParams{
  58. Components: subComponentsParams,
  59. })
  60. if err != nil {
  61. return nil, err
  62. }
  63. return pipeline, nil
  64. }
  65. func newPipelineWithBuildParams(name string, params *flow.SeqBuildParams) (*Pipeline, error) {
  66. if strutils.IsStringEmpty(name) {
  67. return nil, errors.New("没有传递流水线的名称")
  68. }
  69. err := params.Check()
  70. if err != nil {
  71. return nil, err
  72. }
  73. rootComponent, err := component.BuildComponent("seq", name, structs.Map(params), nil)
  74. if err != nil {
  75. return nil, err
  76. }
  77. rootSeqFlow, ok := rootComponent.(*flow.Seq)
  78. if !ok {
  79. return nil, errors.New("流水线的根流程不是顺序流程")
  80. }
  81. return &Pipeline{Flow: rootSeqFlow}, nil
  82. }