# 流水线(Pipeline) 流水线是用来定义流程并执行相关操作的抽象。下图给出了流水线的一个简单抽象。 ![image][pipeline-image] 我们定义的流水线包含以下几种概念: 1. 节点:封装某一种或者某一类操作,这个操作由使用流水线包的项目按照自己的需求自行实现, 我们也会逐渐添加一些预定义的节点供大家使用; 2. 流程:节点的容器,规定了其内节点的执行顺序,预定义的流程包括顺序(seq)流程, 条件(if)流程,循环(loop)流程,遍历(range)流程,交换参数(bi)流程,也可按照需要自定义流程; 3. 流水线:由流程和节点组成,其根流程本质上就是一个顺序流程, 其下可以包含各种其他流程或者节点。 结构如下图所示,整体采用了组合模式 ![image][class-image] ## 定义流水线 定义流水线需要不断交错进行两个步骤,节点或流程开发和流水线编排。 ### 节点或流程开发 代码定义了Component接口,从上面的类图中可以看到,无论是流程还是节点,本质上就是一个Component,需要定义自己的流程或者节点,仅需要实现Component接口 ```go // "git.sxidc.com/go-tools/utils/pipeline/component" type Component interface { GetType() string // 返回组件类型,如内部预定义的顺序流程的,返回seq GetName() string // 返回组件实例的名称,这个名称是在流水线编排中传递进来的 Run(globalRunParams *GlobalRunParams, dynamicParams map[string]any) (any, error) // 执行逻辑,由于流水线使用了Go的协程,这里返回一个RunToken,需要Wait在RunToken上,以获取结果 } ``` 为了简化开发,我们提供了一个Component的基类,定义一个自己的Component可以用下面更简单的方法 ```go type FooComponent struct { component.BaseComponent } func (f *FooComponent) Run(globalRunParams *component.GlobalRunParams, dynamicParams map[string]any) (any, error) { return f.OnRun(globalRunParams, dynamicParams, func(globalRunParams *component.GlobalRunParams, dynamicParams map[string]any) (any, error) { // 完成运行逻辑 // globalRunParams全局运行参数,运行流水线章节详细说明 // dynamicParams动态运行参数,运行流水线章节详细说明 // 返回值是本组件运行后的结果及出错信息 }) } ``` 组件的创建使用了Builder模式,需要定义一个实现了Builder接口的组件构建器,Builder接口如下所示: ```go // "git.sxidc.com/go-tools/utils/pipeline/component" type Builder interface { ProductType() string // 返回制品类型,应当与上面实现的Component的GetType方法返回值一致 Build(name string, buildParams map[string]any, runParams map[string]any) (Component, error) // 构建过程 } ``` 但是我们定义的流程和节点还需要进行注册才能使用,注册需要调用如下函数: ```go // "git.sxidc.com/go-tools/utils/pipeline/component" func RegisterComponentBuilders(builders ...Builder) error ``` 如果要动态注销,需要调用下面的接口: ```go // "git.sxidc.com/go-tools/utils/pipeline/component" func UnRegisterComponents(typeNames []string) ``` 如果此时想单独测试自己编写的组件,可以使用下面的函数创建组件 ```go // "git.sxidc.com/go-tools/utils/pipeline/component" func BuildComponent(typeName string, name string, buildParams map[string]any, runParams map[string]any) (Component, error) ``` ### 流水线编排 有两种编排流水线的方式:代码通过构造Definition结构编排流水线和通过YAML编排 #### 通过Definition编排 Definition结构定义如下: ```go // "git.sxidc.com/go-tools/pipeline/utils" type Definition struct { Name string `yaml:"name"` // 流水线名称 Components []ComponentDefinition `yaml:"components"` // 组件定义 } type ComponentDefinition struct { Type string `yaml:"type"` // 组件类型,确保是注册过的组件 Name string `yaml:"name"` // 组件名称 BuildParams map[string]any `yaml:"build_params"` // 组件构建参数,不同的组件需要的构建参数不同 RunParams map[string]any `yaml:"run_params"` // 构建时指定的静态运行参数 } ``` 使用Definition需要填充对应的属性,然后运行Definition的方法即可创建流水线: ```go // "git.sxidc.com/go-tools/pipeline/utils" func (def *Definition) NewPipeline() (*Pipeline, error) ``` #### 通过YAML编排 通过yaml文件编排本质上还是通过Definition生成,编写的YAML文件格式从上面的Definition的tag中就能看到,下面给出一个测试中用到的YAML定义: ```yaml name: test components: - type: seq name: seq-flow build_params: components: - type: println name: seq-flow-node run_params: content: seq-flow-node - type: println name: pipeline-node run_params: content: pipeline-node - type: if name: if-flow build_params: condition: type: seq name: condition_flow build_params: components: - type: bool name: bool run_params: op: pass value: true condition_true: type: seq name: true-flow build_params: components: - type: println name: if-node-true run_params: content: if-node-true condition_false: type: println name: if-node-false run_params: content: if-node-false - type: loop name: loop-flow build_params: condition: type: bool name: condition-node run_params: op: rand sub: type: seq name: sub-flow build_params: components: - type: println name: loop-sub-node run_params: content: loop-sub-node - type: range name: range-flow build_params: values: - range first - range second sub: type: println name: range-sub-node run_params: content: range-sub-node - type: bi name: bi-flow build_params: components: - type: println name: bi-flow-node run_params: content: bi-flow-node run_params: is_bi: true left_params: left right_params: right ``` 后面章节会给出几种内置流程的构建参数和运行参数,可以对照查看上面的编排。YAML编排可以将内容写入文件,也可以是内存中保存的,有两个函数可以使用YAML编排创建流水线: ```go // "git.sxidc.com/go-tools/pipeline/utils" // 通过YAML定义文件创建流水线 func NewPipelineFromYaml(yamlPath string) (*Pipeline, error) // 通过YAML字符串创建流水线 func NewPipelineFromYamlStr(yamlStr string) (*Pipeline, error) ``` ## 运行流水线 运行流水线,这里只需要调用Pipeline的方法即可 ```go // "git.sxidc.com/go-tools/pipeline/utils" func (p *Pipeline) Run(globalRunParams map[string]any, dynamicParams map[string]any) *component.RunToken ``` 下面是一段样例代码 ```go token := p.Run(globalRunParams, dynamicParams) if token.Wait(); token.Err != nil { t.Fatal(token.Err) } fmt.Println(token.Result) ``` 这里可以看到有两个参数,分别是全局运行参数和动态运行参数,这里对运行流水线的运行参数进行说明,运行流水线的参数有三种: 1. 全局运行参数:该运行参数会在流水线中的所有流程和节点之间传递,也可以在运行时利用该参数传递组件之间共享的参数; 2. 构建时静态运行参数:构建一个组件(流程或者节点)的时候,我们可以传递静态运行参数,该参数会在组件运行过程中被使用; 3. 动态运行参数:用来在运行时指定的运行参数,如果同时指定了静态运行参数,则会覆盖静态运行参数 动态运行时参数有两种构造方式:一种是使用map直接构造,另一种是利用YAML构造,但不管使用哪种方法,动态运行时参数都是通过组件的名称进行查找的,如下是对应上面YAML编排示例的动态运行参数构造: ```yaml seq-flow: seq-flow-node: content: "!!!!seq-flow-node!!!!" pipeline-node: content: "!!!!pipeline-node!!!!" if-flow: condition_flow: bool: op: pass value: true true-flow: if-node-true: content: "!!!!if-node-true!!!!" if-node-false: content: "!!!!if-node-false!!!!" loop-flow: sub-flow: loop-sub-node: content: "!!!!loop-sub-node!!!!" range-flow: range-sub-node: content: "!!!!range-sub-node!!!!" bi-flow: is_bi: true left_params: "!!!!left!!!!" right_params: "!!!!right!!!!" bi-flow-node: content: "!!!!bi-flow-node!!!!" ``` ## 附:自定义的几种流程的构建参数和运行参数 ### 顺序流程 流程类型:seq 构建参数: ```yaml components: # 包含的子组件定义数组 - type: 任何的流程或者节点类型 name: 该组件在流水线中的名称 build_params: # 该类型组件的构建参数 ... run_params: # 该类型组件的静态运行时参数 ... ``` 运行参数:无 ### 条件流程 流程类型:if 构建参数: ```yaml condition: # 条件判断组件,返回结果必须为bool type: 任何的流程或者节点类型 name: 该组件在流水线中的名称 build_params: # 该类型组件的构建参数 ... run_params: # 该类型组件的静态运行时参数 ... condition_true: # 条件为真时运行的组件 type: 任何的流程或者节点类型 name: 该组件在流水线中的名称 build_params: # 该类型组件的构建参数 ... run_params: # 该类型组件的静态运行时参数 ... condition_false: # 条件为假时运行的组件 type: 任何的流程或者节点类型 name: 该组件在流水线中的名称 build_params: # 该类型组件的构建参数 ... run_params: # 该类型组件的静态运行时参数 ... ``` 运行参数:无 ### 循环流程 流程类型:loop 构建参数: ```yaml condition: # 条件判断组件,返回结果必须为bool type: 任何的流程或者节点类型 name: 该组件在流水线中的名称 build_params: # 该类型组件的构建参数 ... run_params: # 该类型组件的静态运行时参数 ... sub: # 循环体组件 type: 任何的流程或者节点类型 name: 该组件在流水线中的名称 build_params: # 该类型组件的构建参数 ... run_params: # 该类型组件的静态运行时参数 ... ``` 运行参数:无 ### 遍历流程 流程类型:range 构建参数: ```yaml values: # 需要遍历的值的数组 - 遍历值 sub: type: 任何的流程或者节点类型 name: 该组件在流水线中的名称 build_params: # 该类型组件的构建参数 ... run_params: # 该类型组件的静态运行时参数 ... ``` 运行参数:无 ** 该流程会在子组件的动态运行参数中植入一个键为range_once_value的参数值,该值给出了当前range到的值,另外,还会植入一个键为range_index的参数值(int类型),给出遍历到的Index。 ### 交换流程 流程类型:bi 构建参数: ```yaml components: - type: 任何的流程或者节点类型 name: 该组件在流水线中的名称 build_params: # 该类型组件的构建参数 ... run_params: # 该类型组件的静态运行时参数 ... ``` 运行参数: ```yaml is_bi: 是否双向运行(交换参数) left_params: 左参数,对应Go的any类型 right_params: 右参数,对应Go的any类型 ``` ** 该流程会在每轮运行时在动态运行参数中植入两个参数bi_left和bi_right,分别代表本轮执行的左参数和右参数 [pipeline-image]: [class-image]: