pipeline.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package pipeline
  2. import (
  3. "errors"
  4. "git.sxidc.com/go-tools/utils/fileutils"
  5. "git.sxidc.com/go-tools/utils/pipeline/component"
  6. "git.sxidc.com/go-tools/utils/pipeline/component/flow"
  7. "git.sxidc.com/go-tools/utils/strutils"
  8. "gopkg.in/yaml.v3"
  9. "os"
  10. )
  11. func init() {
  12. err := component.RegisterComponentBuilders(
  13. &flow.SeqBuilder{}, &flow.IfBuilder{}, &flow.LoopBuilder{}, &flow.RangeBuilder{}, &flow.BiBuilder{},
  14. )
  15. if err != nil {
  16. panic(err)
  17. }
  18. }
  19. type Pipeline struct {
  20. Flow *flow.Seq
  21. }
  22. func (p *Pipeline) Run(globalRunParams map[string]any, dynamicParams map[string]any) *RunToken {
  23. token := NewRunToken()
  24. go func() {
  25. defer token.Done()
  26. result, err := p.Flow.Run(&component.GlobalRunParams{
  27. CustomRunParams: globalRunParams,
  28. }, dynamicParams)
  29. if err != nil {
  30. token.Err = err
  31. return
  32. }
  33. token.Result = result
  34. }()
  35. return token
  36. }
  37. func NewPipelineFromYaml(yamlPath string) (*Pipeline, error) {
  38. if strutils.IsStringEmpty(yamlPath) {
  39. return nil, errors.New("没有传递流水线定义文件")
  40. }
  41. if !fileutils.PathExists(yamlPath) {
  42. return nil, errors.New("流水线定义文件不存在")
  43. }
  44. yamlBytes, err := os.ReadFile(yamlPath)
  45. if err != nil {
  46. return nil, errors.New("读取流水线定义文件失败: " + err.Error())
  47. }
  48. def := new(Definition)
  49. err = yaml.Unmarshal(yamlBytes, def)
  50. if err != nil {
  51. return nil, errors.New("Unmarshal流水线定义文件失败: " + err.Error())
  52. }
  53. return def.NewPipeline()
  54. }
  55. func NewPipelineFromYamlStr(yamlStr string) (*Pipeline, error) {
  56. if strutils.IsStringEmpty(yamlStr) {
  57. return nil, errors.New("没有传递流水线定义")
  58. }
  59. def := new(Definition)
  60. err := yaml.Unmarshal([]byte(yamlStr), def)
  61. if err != nil {
  62. return nil, errors.New("Unmarshal流水线定义文件失败: " + err.Error())
  63. }
  64. return def.NewPipeline()
  65. }
  66. func LoadDynamicParamsFromYaml(yamlPath string) (map[string]any, error) {
  67. if strutils.IsStringEmpty(yamlPath) {
  68. return nil, errors.New("没有传递流水线动态参数文件不存在")
  69. }
  70. if !fileutils.PathExists(yamlPath) {
  71. return nil, errors.New("流水线动态参数文件不存在")
  72. }
  73. yamlBytes, err := os.ReadFile(yamlPath)
  74. if err != nil {
  75. return nil, errors.New("读取流水线动态参数文件失败: " + err.Error())
  76. }
  77. dynamicParams := make(map[string]any)
  78. err = yaml.Unmarshal(yamlBytes, dynamicParams)
  79. if err != nil {
  80. return nil, err
  81. }
  82. return dynamicParams, nil
  83. }
  84. func LoadDynamicParamsFromYamlStr(yamlStr string) (map[string]any, error) {
  85. if strutils.IsStringEmpty(yamlStr) {
  86. return nil, errors.New("没有传递流水线动态参数不存在")
  87. }
  88. dynamicParams := make(map[string]any)
  89. err := yaml.Unmarshal([]byte(yamlStr), dynamicParams)
  90. if err != nil {
  91. return nil, err
  92. }
  93. return dynamicParams, nil
  94. }