package pipeline import ( "git.sxidc.com/go-tools/utils/fileutils" "git.sxidc.com/go-tools/utils/pipeline/component" "git.sxidc.com/go-tools/utils/pipeline/component/flow" "git.sxidc.com/go-tools/utils/strutils" "github.com/pkg/errors" "gopkg.in/yaml.v3" "os" ) func init() { err := component.RegisterComponentBuilders( &flow.SeqBuilder{}, &flow.IfBuilder{}, &flow.LoopBuilder{}, &flow.RangeBuilder{}, &flow.BiBuilder{}, ) if err != nil { panic(err) } } type Pipeline struct { Flow *flow.Seq } func (p *Pipeline) Run(globalRunParams map[string]any, dynamicParams map[string]any) *RunToken { token := NewRunToken() go func() { result, err := p.Flow.Run(&component.GlobalRunParams{ CustomRunParams: globalRunParams, }, dynamicParams) if err != nil { token.Err = err token.Done() return } token.Result = result token.Done() }() return token } func NewPipelineFromYaml(yamlPath string) (*Pipeline, error) { if strutils.IsStringEmpty(yamlPath) { return nil, errors.New("没有传递流水线定义文件") } if !fileutils.PathExists(yamlPath) { return nil, errors.New("流水线定义文件不存在") } yamlBytes, err := os.ReadFile(yamlPath) if err != nil { return nil, errors.New("读取流水线定义文件失败: " + err.Error()) } def := new(Definition) err = yaml.Unmarshal(yamlBytes, def) if err != nil { return nil, errors.New("Unmarshal流水线定义文件失败: " + err.Error()) } return def.NewPipeline() } func NewPipelineFromYamlStr(yamlStr string) (*Pipeline, error) { if strutils.IsStringEmpty(yamlStr) { return nil, errors.New("没有传递流水线定义") } def := new(Definition) err := yaml.Unmarshal([]byte(yamlStr), def) if err != nil { return nil, errors.New("Unmarshal流水线定义文件失败: " + err.Error()) } return def.NewPipeline() } func LoadDynamicParamsFromYaml(yamlPath string) (map[string]any, error) { if strutils.IsStringEmpty(yamlPath) { return nil, errors.New("没有传递流水线动态参数文件不存在") } if !fileutils.PathExists(yamlPath) { return nil, errors.New("流水线动态参数文件不存在") } yamlBytes, err := os.ReadFile(yamlPath) if err != nil { return nil, errors.New("读取流水线动态参数文件失败: " + err.Error()) } dynamicParams := make(map[string]any) err = yaml.Unmarshal(yamlBytes, dynamicParams) if err != nil { return nil, err } return dynamicParams, nil } func LoadDynamicParamsFromYamlStr(yamlStr string) (map[string]any, error) { if strutils.IsStringEmpty(yamlStr) { return nil, errors.New("没有传递流水线动态参数不存在") } dynamicParams := make(map[string]any) err := yaml.Unmarshal([]byte(yamlStr), dynamicParams) if err != nil { return nil, err } return dynamicParams, nil }