# 流水线(Pipeline)
流水线是用来定义流程并执行相关操作的抽象。下图给出了流水线的一个简单抽象。
![image][pipeline-image]
我们定义的流水线包含以下几种概念:
1. 节点:封装某一种或者某一类操作,这个操作由使用流水线包的项目按照自己的需求自行实现, 我们也会逐渐添加一些预定义的节点供大家使用;
2. 流程:节点的容器,规定了其内节点的执行顺序,预定义的流程包括顺序(seq)流程, 条件(if)流程,循环(loop)流程,遍历(range)流程,交换参数(bi)流程,也可按照需要自定义流程;
3. 流水线:由流程和节点组成,其根流程本质上就是一个顺序流程, 其下可以包含各种其他流程或者节点。
结构如下图所示,整体采用了组合模式
![image][class-image]
## 定义流水线
定义流水线需要不断交错进行两个步骤,节点或流程开发和流水线编排。
### 节点或流程开发
代码定义了Component接口,从上面的类图中可以看到,无论是流程还是节点,本质上就是一个Component,需要定义自己的流程或者节点,仅需要实现Component接口
```go
// "git.sxidc.com/go-tools/utils/pipeline/component"
type Component interface {
GetType() string // 返回组件类型,如内部预定义的顺序流程的,返回seq
GetName() string // 返回组件实例的名称,这个名称是在流水线编排中传递进来的
Run(globalRunParams *GlobalRunParams, dynamicParams map[string]any) (any, error) // 执行逻辑,由于流水线使用了Go的协程,这里返回一个RunToken,需要Wait在RunToken上,以获取结果
}
```
为了简化开发,我们提供了一个Component的基类,定义一个自己的Component可以用下面更简单的方法
```go
type FooComponent struct {
component.BaseComponent
}
func (f *FooComponent) Run(globalRunParams *component.GlobalRunParams, dynamicParams map[string]any) (any, error) {
return f.OnRun(globalRunParams, dynamicParams,
func(globalRunParams *component.GlobalRunParams, dynamicParams map[string]any) (any, error) {
// 完成运行逻辑
// globalRunParams全局运行参数,运行流水线章节详细说明
// dynamicParams动态运行参数,运行流水线章节详细说明
// 返回值是本组件运行后的结果及出错信息
})
}
```
组件的创建使用了Builder模式,需要定义一个实现了Builder接口的组件构建器,Builder接口如下所示:
```go
// "git.sxidc.com/go-tools/utils/pipeline/component"
type Builder interface {
ProductType() string // 返回制品类型,应当与上面实现的Component的GetType方法返回值一致
Build(name string, buildParams map[string]any, runParams map[string]any) (Component, error) // 构建过程
}
```
但是我们定义的流程和节点还需要进行注册才能使用,注册需要调用如下函数:
```go
// "git.sxidc.com/go-tools/utils/pipeline/component"
func RegisterComponentBuilders(builders ...Builder) error
```
如果要动态注销,需要调用下面的接口:
```go
// "git.sxidc.com/go-tools/utils/pipeline/component"
func UnRegisterComponents(typeNames []string)
```
如果此时想单独测试自己编写的组件,可以使用下面的函数创建组件
```go
// "git.sxidc.com/go-tools/utils/pipeline/component"
func BuildComponent(typeName string, name string, buildParams map[string]any, runParams map[string]any) (Component, error)
```
### 流水线编排
有两种编排流水线的方式:代码通过构造Definition结构编排流水线和通过YAML编排
#### 通过Definition编排
Definition结构定义如下:
```go
// "git.sxidc.com/go-tools/pipeline/utils"
type Definition struct {
Name string `yaml:"name"` // 流水线名称
Components []ComponentDefinition `yaml:"components"` // 组件定义
}
type ComponentDefinition struct {
Type string `yaml:"type"` // 组件类型,确保是注册过的组件
Name string `yaml:"name"` // 组件名称
BuildParams map[string]any `yaml:"build_params"` // 组件构建参数,不同的组件需要的构建参数不同
RunParams map[string]any `yaml:"run_params"` // 构建时指定的静态运行参数
}
```
使用Definition需要填充对应的属性,然后运行Definition的方法即可创建流水线:
```go
// "git.sxidc.com/go-tools/pipeline/utils"
func (def *Definition) NewPipeline() (*Pipeline, error)
```
#### 通过YAML编排
通过yaml文件编排本质上还是通过Definition生成,编写的YAML文件格式从上面的Definition的tag中就能看到,下面给出一个测试中用到的YAML定义:
```yaml
name: test
components:
- type: seq
name: seq-flow
build_params:
components:
- type: println
name: seq-flow-node
run_params:
content: seq-flow-node
- type: println
name: pipeline-node
run_params:
content: pipeline-node
- type: if
name: if-flow
build_params:
condition:
type: seq
name: condition_flow
build_params:
components:
- type: bool
name: bool
run_params:
op: pass
value: true
condition_true:
type: seq
name: true-flow
build_params:
components:
- type: println
name: if-node-true
run_params:
content: if-node-true
condition_false:
type: println
name: if-node-false
run_params:
content: if-node-false
- type: loop
name: loop-flow
build_params:
condition:
type: bool
name: condition-node
run_params:
op: rand
sub:
type: seq
name: sub-flow
build_params:
components:
- type: println
name: loop-sub-node
run_params:
content: loop-sub-node
- type: range
name: range-flow
build_params:
values:
- range first
- range second
sub:
type: println
name: range-sub-node
run_params:
content: range-sub-node
- type: bi
name: bi-flow
build_params:
components:
- type: println
name: bi-flow-node
run_params:
content: bi-flow-node
run_params:
is_bi: true
left_params: left
right_params: right
```
后面章节会给出几种内置流程的构建参数和运行参数,可以对照查看上面的编排。YAML编排可以将内容写入文件,也可以是内存中保存的,有两个函数可以使用YAML编排创建流水线:
```go
// "git.sxidc.com/go-tools/pipeline/utils"
// 通过YAML定义文件创建流水线
func NewPipelineFromYaml(yamlPath string) (*Pipeline, error)
// 通过YAML字符串创建流水线
func NewPipelineFromYamlStr(yamlStr string) (*Pipeline, error)
```
## 运行流水线
运行流水线,这里只需要调用Pipeline的方法即可
```go
// "git.sxidc.com/go-tools/pipeline/utils"
func (p *Pipeline) Run(globalRunParams map[string]any, dynamicParams map[string]any) *component.RunToken
```
下面是一段样例代码
```go
token := p.Run(globalRunParams, dynamicParams)
if token.Wait(); token.Err != nil {
t.Fatal(token.Err)
}
fmt.Println(token.Result)
```
这里可以看到有两个参数,分别是全局运行参数和动态运行参数,这里对运行流水线的运行参数进行说明,运行流水线的参数有三种:
1. 全局运行参数:该运行参数会在流水线中的所有流程和节点之间传递,也可以在运行时利用该参数传递组件之间共享的参数;
2. 构建时静态运行参数:构建一个组件(流程或者节点)的时候,我们可以传递静态运行参数,该参数会在组件运行过程中被使用;
3. 动态运行参数:用来在运行时指定的运行参数,如果同时指定了静态运行参数,则会覆盖静态运行参数
动态运行时参数有两种构造方式:一种是使用map直接构造,另一种是利用YAML构造,但不管使用哪种方法,动态运行时参数都是通过组件的名称进行查找的,如下是对应上面YAML编排示例的动态运行参数构造:
```yaml
seq-flow:
seq-flow-node:
content: "!!!!seq-flow-node!!!!"
pipeline-node:
content: "!!!!pipeline-node!!!!"
if-flow:
condition_flow:
bool:
op: pass
value: true
true-flow:
if-node-true:
content: "!!!!if-node-true!!!!"
if-node-false:
content: "!!!!if-node-false!!!!"
loop-flow:
sub-flow:
loop-sub-node:
content: "!!!!loop-sub-node!!!!"
range-flow:
range-sub-node:
content: "!!!!range-sub-node!!!!"
bi-flow:
is_bi: true
left_params: "!!!!left!!!!"
right_params: "!!!!right!!!!"
bi-flow-node:
content: "!!!!bi-flow-node!!!!"
```
## 附:自定义的几种流程的构建参数和运行参数
### 顺序流程
流程类型:seq
构建参数:
```yaml
components: # 包含的子组件定义数组
- type: 任何的流程或者节点类型
name: 该组件在流水线中的名称
build_params: # 该类型组件的构建参数
...
run_params: # 该类型组件的静态运行时参数
...
```
运行参数:无
### 条件流程
流程类型:if
构建参数:
```yaml
condition: # 条件判断组件,返回结果必须为bool
type: 任何的流程或者节点类型
name: 该组件在流水线中的名称
build_params: # 该类型组件的构建参数
...
run_params: # 该类型组件的静态运行时参数
...
condition_true: # 条件为真时运行的组件
type: 任何的流程或者节点类型
name: 该组件在流水线中的名称
build_params: # 该类型组件的构建参数
...
run_params: # 该类型组件的静态运行时参数
...
condition_false: # 条件为假时运行的组件
type: 任何的流程或者节点类型
name: 该组件在流水线中的名称
build_params: # 该类型组件的构建参数
...
run_params: # 该类型组件的静态运行时参数
...
```
运行参数:无
### 循环流程
流程类型:loop
构建参数:
```yaml
condition: # 条件判断组件,返回结果必须为bool
type: 任何的流程或者节点类型
name: 该组件在流水线中的名称
build_params: # 该类型组件的构建参数
...
run_params: # 该类型组件的静态运行时参数
...
sub: # 循环体组件
type: 任何的流程或者节点类型
name: 该组件在流水线中的名称
build_params: # 该类型组件的构建参数
...
run_params: # 该类型组件的静态运行时参数
...
```
运行参数:无
### 遍历流程
流程类型:range
构建参数:
```yaml
values: # 需要遍历的值的数组
- 遍历值
sub:
type: 任何的流程或者节点类型
name: 该组件在流水线中的名称
build_params: # 该类型组件的构建参数
...
run_params: # 该类型组件的静态运行时参数
...
```
运行参数:无
** 该流程会在子组件的动态运行参数中植入一个键为range_once_value的参数值,该值给出了当前range到的值,另外,还会植入一个键为range_index的参数值(int类型),给出遍历到的Index。
### 交换流程
流程类型:bi
构建参数:
```yaml
components:
- type: 任何的流程或者节点类型
name: 该组件在流水线中的名称
build_params: # 该类型组件的构建参数
...
run_params: # 该类型组件的静态运行时参数
...
```
运行参数:
```yaml
is_bi: 是否双向运行(交换参数)
left_params: 左参数,对应Go的any类型
right_params: 右参数,对应Go的any类型
```
** 该流程会在每轮运行时在动态运行参数中植入两个参数bi_left和bi_right,分别代表本轮执行的左参数和右参数
[pipeline-image]:
[class-image]: