123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- package pipeline
- import (
- "git.sxidc.com/go-tools/utils/fileutils"
- "git.sxidc.com/go-tools/utils/pipeline/component"
- "git.sxidc.com/go-tools/utils/pipeline/component/flow"
- "git.sxidc.com/go-tools/utils/strutils"
- "github.com/pkg/errors"
- "gopkg.in/yaml.v3"
- "os"
- )
- func init() {
- err := component.RegisterComponentBuilders(
- &flow.SeqBuilder{}, &flow.IfBuilder{}, &flow.LoopBuilder{}, &flow.RangeBuilder{}, &flow.BiBuilder{},
- )
- if err != nil {
- panic(err)
- }
- }
- type Pipeline struct {
- Flow *flow.Seq
- }
- func (p *Pipeline) Run(globalRunParams map[string]any, dynamicParams map[string]any) *RunToken {
- token := NewRunToken()
- go func() {
- result, err := p.Flow.Run(&component.GlobalRunParams{
- CustomRunParams: globalRunParams,
- }, dynamicParams)
- if err != nil {
- token.Err = err
- token.Done()
- return
- }
- token.Result = result
- token.Done()
- }()
- return token
- }
- func NewPipelineFromYaml(yamlPath string) (*Pipeline, error) {
- if strutils.IsStringEmpty(yamlPath) {
- return nil, errors.New("没有传递流水线定义文件")
- }
- if !fileutils.PathExists(yamlPath) {
- return nil, errors.New("流水线定义文件不存在")
- }
- yamlBytes, err := os.ReadFile(yamlPath)
- if err != nil {
- return nil, errors.New("读取流水线定义文件失败: " + err.Error())
- }
- def := new(Definition)
- err = yaml.Unmarshal(yamlBytes, def)
- if err != nil {
- return nil, errors.New("Unmarshal流水线定义文件失败: " + err.Error())
- }
- return def.NewPipeline()
- }
- func NewPipelineFromYamlStr(yamlStr string) (*Pipeline, error) {
- if strutils.IsStringEmpty(yamlStr) {
- return nil, errors.New("没有传递流水线定义")
- }
- def := new(Definition)
- err := yaml.Unmarshal([]byte(yamlStr), def)
- if err != nil {
- return nil, errors.New("Unmarshal流水线定义文件失败: " + err.Error())
- }
- return def.NewPipeline()
- }
- func LoadDynamicParamsFromYaml(yamlPath string) (map[string]any, error) {
- if strutils.IsStringEmpty(yamlPath) {
- return nil, errors.New("没有传递流水线动态参数文件不存在")
- }
- if !fileutils.PathExists(yamlPath) {
- return nil, errors.New("流水线动态参数文件不存在")
- }
- yamlBytes, err := os.ReadFile(yamlPath)
- if err != nil {
- return nil, errors.New("读取流水线动态参数文件失败: " + err.Error())
- }
- dynamicParams := make(map[string]any)
- err = yaml.Unmarshal(yamlBytes, dynamicParams)
- if err != nil {
- return nil, err
- }
- return dynamicParams, nil
- }
- func LoadDynamicParamsFromYamlStr(yamlStr string) (map[string]any, error) {
- if strutils.IsStringEmpty(yamlStr) {
- return nil, errors.New("没有传递流水线动态参数不存在")
- }
- dynamicParams := make(map[string]any)
- err := yaml.Unmarshal([]byte(yamlStr), dynamicParams)
- if err != nil {
- return nil, err
- }
- return dynamicParams, nil
- }
|