pipeline.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package pipeline
  2. import (
  3. "git.sxidc.com/go-tools/utils/fileutils"
  4. "git.sxidc.com/go-tools/utils/pipeline/component"
  5. "git.sxidc.com/go-tools/utils/pipeline/component/flow"
  6. "git.sxidc.com/go-tools/utils/strutils"
  7. "github.com/pkg/errors"
  8. "gopkg.in/yaml.v3"
  9. "os"
  10. )
  11. func init() {
  12. err := component.RegisterComponentBuilders(
  13. &flow.SeqBuilder{}, &flow.IfBuilder{}, &flow.LoopBuilder{}, &flow.RangeBuilder{}, &flow.BiBuilder{},
  14. )
  15. if err != nil {
  16. panic(err)
  17. }
  18. }
  19. type Pipeline struct {
  20. Flow *flow.Seq
  21. }
  22. func (p *Pipeline) Run(globalRunParams map[string]any, dynamicParams map[string]any) *RunToken {
  23. token := NewRunToken()
  24. go func() {
  25. result, err := p.Flow.Run(&component.GlobalRunParams{
  26. CustomRunParams: globalRunParams,
  27. }, dynamicParams)
  28. if err != nil {
  29. token.Err = err
  30. token.Done()
  31. return
  32. }
  33. token.Result = result
  34. token.Done()
  35. }()
  36. return token
  37. }
  38. func NewPipelineFromYaml(yamlPath string) (*Pipeline, error) {
  39. if strutils.IsStringEmpty(yamlPath) {
  40. return nil, errors.New("没有传递流水线定义文件")
  41. }
  42. if !fileutils.PathExists(yamlPath) {
  43. return nil, errors.New("流水线定义文件不存在")
  44. }
  45. yamlBytes, err := os.ReadFile(yamlPath)
  46. if err != nil {
  47. return nil, errors.New("读取流水线定义文件失败: " + err.Error())
  48. }
  49. def := new(Definition)
  50. err = yaml.Unmarshal(yamlBytes, def)
  51. if err != nil {
  52. return nil, errors.New("Unmarshal流水线定义文件失败: " + err.Error())
  53. }
  54. return def.NewPipeline()
  55. }
  56. func NewPipelineFromYamlStr(yamlStr string) (*Pipeline, error) {
  57. if strutils.IsStringEmpty(yamlStr) {
  58. return nil, errors.New("没有传递流水线定义")
  59. }
  60. def := new(Definition)
  61. err := yaml.Unmarshal([]byte(yamlStr), def)
  62. if err != nil {
  63. return nil, errors.New("Unmarshal流水线定义文件失败: " + err.Error())
  64. }
  65. return def.NewPipeline()
  66. }
  67. func LoadDynamicParamsFromYaml(yamlPath string) (map[string]any, error) {
  68. if strutils.IsStringEmpty(yamlPath) {
  69. return nil, errors.New("没有传递流水线动态参数文件不存在")
  70. }
  71. if !fileutils.PathExists(yamlPath) {
  72. return nil, errors.New("流水线动态参数文件不存在")
  73. }
  74. yamlBytes, err := os.ReadFile(yamlPath)
  75. if err != nil {
  76. return nil, errors.New("读取流水线动态参数文件失败: " + err.Error())
  77. }
  78. dynamicParams := make(map[string]any)
  79. err = yaml.Unmarshal(yamlBytes, dynamicParams)
  80. if err != nil {
  81. return nil, err
  82. }
  83. return dynamicParams, nil
  84. }
  85. func LoadDynamicParamsFromYamlStr(yamlStr string) (map[string]any, error) {
  86. if strutils.IsStringEmpty(yamlStr) {
  87. return nil, errors.New("没有传递流水线动态参数不存在")
  88. }
  89. dynamicParams := make(map[string]any)
  90. err := yaml.Unmarshal([]byte(yamlStr), dynamicParams)
  91. if err != nil {
  92. return nil, err
  93. }
  94. return dynamicParams, nil
  95. }