pipeline.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package pipeline
  2. import (
  3. "errors"
  4. "git.sxidc.com/go-tools/utils/pipeline/component"
  5. "git.sxidc.com/go-tools/utils/pipeline/component/flow"
  6. "git.sxidc.com/go-tools/utils/pipeline/utils"
  7. "gopkg.in/yaml.v3"
  8. "os"
  9. )
  10. func init() {
  11. err := component.RegisterComponentBuilders(
  12. &flow.SeqBuilder{}, &flow.IfBuilder{}, &flow.LoopBuilder{}, &flow.RangeBuilder{}, &flow.BiBuilder{},
  13. )
  14. if err != nil {
  15. panic(err)
  16. }
  17. }
  18. type Pipeline struct {
  19. Flow *flow.Seq
  20. }
  21. func (p *Pipeline) Run(globalRunParams map[string]any, dynamicParams map[string]any) *RunToken {
  22. token := NewRunToken()
  23. go func() {
  24. result, err := p.Flow.Run(&component.GlobalRunParams{
  25. CustomRunParams: globalRunParams,
  26. }, dynamicParams)
  27. if err != nil {
  28. token.Err = err
  29. return
  30. }
  31. token.Result = result
  32. token.Done()
  33. }()
  34. return token
  35. }
  36. func NewPipelineFromYaml(yamlPath string) (*Pipeline, error) {
  37. if utils.IsStringEmpty(yamlPath) {
  38. return nil, errors.New("没有传递流水线定义文件")
  39. }
  40. if !utils.PathExists(yamlPath) {
  41. return nil, errors.New("流水线定义文件不存在")
  42. }
  43. yamlBytes, err := os.ReadFile(yamlPath)
  44. if err != nil {
  45. return nil, errors.New("读取流水线定义文件失败: " + err.Error())
  46. }
  47. def := new(Definition)
  48. err = yaml.Unmarshal(yamlBytes, def)
  49. if err != nil {
  50. return nil, errors.New("Unmarshal流水线定义文件失败: " + err.Error())
  51. }
  52. return def.NewPipeline()
  53. }
  54. func NewPipelineFromYamlStr(yamlStr string) (*Pipeline, error) {
  55. if utils.IsStringEmpty(yamlStr) {
  56. return nil, errors.New("没有传递流水线定义")
  57. }
  58. def := new(Definition)
  59. err := yaml.Unmarshal([]byte(yamlStr), def)
  60. if err != nil {
  61. return nil, errors.New("Unmarshal流水线定义文件失败: " + err.Error())
  62. }
  63. return def.NewPipeline()
  64. }
  65. func LoadDynamicParamsFromYaml(yamlPath string) (map[string]any, error) {
  66. if utils.IsStringEmpty(yamlPath) {
  67. return nil, errors.New("没有传递流水线动态参数文件不存在")
  68. }
  69. if !utils.PathExists(yamlPath) {
  70. return nil, errors.New("流水线动态参数文件不存在")
  71. }
  72. yamlBytes, err := os.ReadFile(yamlPath)
  73. if err != nil {
  74. return nil, errors.New("读取流水线动态参数文件失败: " + err.Error())
  75. }
  76. dynamicParams := make(map[string]any)
  77. err = yaml.Unmarshal(yamlBytes, dynamicParams)
  78. if err != nil {
  79. return nil, err
  80. }
  81. return dynamicParams, nil
  82. }
  83. func LoadDynamicParamsFromYamlStr(yamlStr string) (map[string]any, error) {
  84. if utils.IsStringEmpty(yamlStr) {
  85. return nil, errors.New("没有传递流水线动态参数不存在")
  86. }
  87. dynamicParams := make(map[string]any)
  88. err := yaml.Unmarshal([]byte(yamlStr), dynamicParams)
  89. if err != nil {
  90. return nil, err
  91. }
  92. return dynamicParams, nil
  93. }