| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650 |
- package client
- import (
- "github.com/pkg/errors"
- "net/http"
- )
- const (
- createWorkflowRelativeUrl = "/api/v1/workflows/{namespace}"
- deleteWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}"
- getWorkflowsInNamespaceRelativeUrl = "/api/v1/workflows/{namespace}"
- getWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}"
- lintWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/lint"
- submitWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/submit"
- resubmitWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}/resubmit"
- retryWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}/retry"
- stopWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}/stop"
- terminateWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}/terminate"
- setWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}/set"
- suspendWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}/suspend"
- resumeWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}/resume"
- getWorkflowEventsStreamRelativeUrl = "/api/v1/workflow-events/{namespace}"
- )
- type CreateWorkflowParams struct {
- Namespace string
- WorkflowDefinition map[string]any
- }
- // CreateWorkflow 创建工作流
- func (c *Client) CreateWorkflow(params CreateWorkflowParams) (string, error) {
- responseMap := make(map[string]any)
- resp, err := c.restyClient.R().
- SetHeader("Content-Type", "application/json").
- SetAuthToken(c.token).
- SetPathParams(map[string]string{
- "namespace": params.Namespace,
- }).
- SetBody(map[string]any{
- "namespace": params.Namespace,
- "workflow": params.WorkflowDefinition,
- }).
- SetResult(&responseMap).
- SetError(&responseMap).
- Post(createWorkflowRelativeUrl)
- if err != nil {
- return "", errors.New(err.Error())
- }
- switch resp.StatusCode() {
- case http.StatusOK:
- metadata, ok := responseMap["metadata"]
- if !ok {
- return "", errors.New("metadata为空")
- }
- metadataMap, ok := metadata.(map[string]any)
- if !ok {
- return "", errors.New("metadata不是map")
- }
- workflowName, ok := metadataMap["name"]
- if !ok {
- return "", errors.New("metadata中没有工作流名称")
- }
- workflowNameStr, ok := workflowName.(string)
- if !ok {
- return "", errors.New("工作流名称不是字符串")
- }
- return workflowNameStr, nil
- case http.StatusConflict:
- return "", errors.New("工作流已存在")
- default:
- message, ok := responseMap["message"]
- if !ok {
- return "", errors.Errorf("%v", resp.Status())
- }
- return "", errors.Errorf("%v, %v", resp.Status(), message)
- }
- }
- type DeleteWorkflowParams struct {
- Namespace string
- Name string
- }
- // DeleteWorkflow 删除工作流
- func (c *Client) DeleteWorkflow(params DeleteWorkflowParams) error {
- responseMap := make(map[string]any)
- resp, err := c.restyClient.R().
- SetHeader("Content-Type", "application/json").
- SetAuthToken(c.token).
- SetPathParams(map[string]string{
- "namespace": params.Namespace,
- "name": params.Name,
- }).
- SetResult(&responseMap).
- SetError(&responseMap).
- Delete(deleteWorkflowRelativeUrl)
- if err != nil {
- return errors.New(err.Error())
- }
- switch resp.StatusCode() {
- case http.StatusOK:
- return nil
- default:
- message, ok := responseMap["message"]
- if !ok {
- return errors.Errorf("%v", resp.Status())
- }
- return errors.Errorf("%v, %v", resp.Status(), message)
- }
- }
- type GetWorkflowsInNamespaceParams struct {
- Namespace string
- }
- // GetWorkflowsInNamespace 获取命名空间下的工作流
- func (c *Client) GetWorkflowsInNamespace(params GetWorkflowsInNamespaceParams) ([]map[string]any, error) {
- responseMap := make(map[string]any)
- resp, err := c.restyClient.R().
- SetHeader("Content-Type", "application/json").
- SetAuthToken(c.token).
- SetPathParams(map[string]string{
- "namespace": params.Namespace,
- }).
- SetResult(&responseMap).
- SetError(&responseMap).
- Get(getWorkflowsInNamespaceRelativeUrl)
- if err != nil {
- return nil, errors.New(err.Error())
- }
- switch resp.StatusCode() {
- case http.StatusOK:
- itemsValue, ok := responseMap["items"]
- if !ok {
- return nil, errors.New("没有获取到items参数")
- }
- items, ok := itemsValue.([]any)
- if !ok {
- return nil, errors.New("items不是slice")
- }
- templateDefinitions := make([]map[string]any, len(items))
- for i, item := range items {
- templateDefinition, ok := item.(map[string]any)
- if !ok {
- return nil, errors.New("item无法转换为map[string]any")
- }
- templateDefinitions[i] = templateDefinition
- }
- return templateDefinitions, nil
- default:
- message, ok := responseMap["message"]
- if !ok {
- return nil, errors.Errorf("%v", resp.Status())
- }
- return nil, errors.Errorf("%v, %v", resp.Status(), message)
- }
- }
- type GetWorkflowParams struct {
- Namespace string
- Name string
- }
- // GetWorkflow 获取指定的工作流
- func (c *Client) GetWorkflow(params GetWorkflowParams) (map[string]any, error) {
- responseMap := make(map[string]any)
- resp, err := c.restyClient.R().
- SetHeader("Content-Type", "application/json").
- SetAuthToken(c.token).
- SetPathParams(map[string]string{
- "namespace": params.Namespace,
- "name": params.Name,
- }).
- SetResult(&responseMap).
- SetError(&responseMap).
- Get(getWorkflowRelativeUrl)
- if err != nil {
- return nil, errors.New(err.Error())
- }
- switch resp.StatusCode() {
- case http.StatusOK:
- return responseMap, nil
- default:
- message, ok := responseMap["message"]
- if !ok {
- return nil, errors.Errorf("%v", resp.Status())
- }
- return nil, errors.Errorf("%v, %v", resp.Status(), message)
- }
- }
- type LintWorkflowParams struct {
- Namespace string
- WorkflowDefinition map[string]any
- }
- // LintWorkflow 检查工作流定义语法
- func (c *Client) LintWorkflow(params LintWorkflowParams) error {
- responseMap := make(map[string]any)
- resp, err := c.restyClient.R().
- SetHeader("Content-Type", "application/json").
- SetAuthToken(c.token).
- SetPathParams(map[string]string{
- "namespace": params.Namespace,
- }).
- SetBody(map[string]any{
- "namespace": params.Namespace,
- "workflow": params.WorkflowDefinition,
- }).
- SetResult(&responseMap).
- SetError(&responseMap).
- Post(lintWorkflowRelativeUrl)
- if err != nil {
- return errors.New(err.Error())
- }
- switch resp.StatusCode() {
- case http.StatusOK:
- return nil
- default:
- message, ok := responseMap["message"]
- if !ok {
- return errors.Errorf("%v", resp.Status())
- }
- return errors.Errorf("%v, %v", resp.Status(), message)
- }
- }
- type SubmitWorkflowFromWorkflowTemplateParams struct {
- Namespace string
- TemplateName string
- Parameters []string
- }
- // SubmitWorkflowFromWorkflowTemplate 基于工作流模板提交工作流
- func (c *Client) SubmitWorkflowFromWorkflowTemplate(params SubmitWorkflowFromWorkflowTemplateParams) (string, error) {
- responseMap := make(map[string]any)
- resp, err := c.restyClient.R().
- SetHeader("Content-Type", "application/json").
- SetAuthToken(c.token).
- SetPathParams(map[string]string{
- "namespace": params.Namespace,
- }).
- SetBody(map[string]any{
- "namespace": params.Namespace,
- "resourceKind": "WorkflowTemplate",
- "resourceName": params.TemplateName,
- "submitOptions": map[string]any{
- "parameters": params.Parameters,
- },
- }).
- SetResult(&responseMap).
- SetError(&responseMap).
- Post(submitWorkflowRelativeUrl)
- if err != nil {
- return "", errors.New(err.Error())
- }
- switch resp.StatusCode() {
- case http.StatusOK:
- metadata, ok := responseMap["metadata"]
- if !ok {
- return "", errors.New("metadata为空")
- }
- metadataMap, ok := metadata.(map[string]any)
- if !ok {
- return "", errors.New("metadata不是map")
- }
- workflowName, ok := metadataMap["name"]
- if !ok {
- return "", errors.New("metadata中没有工作流名称")
- }
- workflowNameStr, ok := workflowName.(string)
- if !ok {
- return "", errors.New("工作流名称不是字符串")
- }
- return workflowNameStr, nil
- default:
- message, ok := responseMap["message"]
- if !ok {
- return "", errors.Errorf("%v", resp.Status())
- }
- return "", errors.Errorf("%v, %v", resp.Status(), message)
- }
- }
- type ResubmitWorkflowParams struct {
- Namespace string
- Name string
- Memoized bool
- ResubmitParameters []string
- }
- // ResubmitWorkflow 重提交工作流
- // 有三种方式可以用来重复提交(可以结合使用):重新运行,基于缓存的和传递重提交参数
- // 基于缓存的必须在Error和Failed的工作流上才可以使用
- func (c *Client) ResubmitWorkflow(params ResubmitWorkflowParams) (string, error) {
- responseMap := make(map[string]any)
- resp, err := c.restyClient.R().
- SetHeader("Content-Type", "application/json").
- SetAuthToken(c.token).
- SetPathParams(map[string]string{
- "namespace": params.Namespace,
- "name": params.Name,
- }).
- SetBody(map[string]any{
- "namespace": params.Namespace,
- "name": params.Name,
- "memoized": params.Memoized,
- "parameters": params.ResubmitParameters,
- }).
- SetResult(&responseMap).
- SetError(&responseMap).
- Put(resubmitWorkflowRelativeUrl)
- if err != nil {
- return "", errors.New(err.Error())
- }
- switch resp.StatusCode() {
- case http.StatusOK:
- metadata, ok := responseMap["metadata"]
- if !ok {
- return "", errors.New("metadata为空")
- }
- metadataMap, ok := metadata.(map[string]any)
- if !ok {
- return "", errors.New("metadata不是map")
- }
- workflowName, ok := metadataMap["name"]
- if !ok {
- return "", errors.New("metadata中没有工作流名称")
- }
- workflowNameStr, ok := workflowName.(string)
- if !ok {
- return "", errors.New("工作流名称不是字符串")
- }
- return workflowNameStr, nil
- default:
- message, ok := responseMap["message"]
- if !ok {
- return "", errors.Errorf("%v", resp.Status())
- }
- return "", errors.Errorf("%v, %v", resp.Status(), message)
- }
- }
- type RetryWorkflowParams struct {
- Namespace string
- Name string
- RetryParameters []string
- RetryOnSuccessfulWorkflow bool
- RetryWorkflowNodeFieldSelector string
- }
- // RetryWorkflow 重新运行工作流(默认只能失败的工作流上重新运行)
- func (c *Client) RetryWorkflow(params RetryWorkflowParams) error {
- responseMap := make(map[string]any)
- resp, err := c.restyClient.R().
- SetHeader("Content-Type", "application/json").
- SetAuthToken(c.token).
- SetPathParams(map[string]string{
- "namespace": params.Namespace,
- "name": params.Name,
- }).
- SetBody(map[string]any{
- "namespace": params.Namespace,
- "name": params.Name,
- "parameters": params.RetryParameters,
- "restartSuccessful": params.RetryOnSuccessfulWorkflow,
- "nodeFieldSelector": params.RetryWorkflowNodeFieldSelector,
- }).
- SetResult(&responseMap).
- SetError(&responseMap).
- Put(retryWorkflowRelativeUrl)
- if err != nil {
- return errors.New(err.Error())
- }
- switch resp.StatusCode() {
- case http.StatusOK:
- return nil
- default:
- message, ok := responseMap["message"]
- if !ok {
- return errors.Errorf("%v", resp.Status())
- }
- return errors.Errorf("%v, %v", resp.Status(), message)
- }
- }
- type StopWorkflowParams struct {
- Namespace string
- Name string
- NodeFieldSelector string
- Message string
- }
- // StopWorkflow 终止工作流运行,会调用所有的退出处理器
- func (c *Client) StopWorkflow(params StopWorkflowParams) error {
- responseMap := make(map[string]any)
- resp, err := c.restyClient.R().
- SetHeader("Content-Type", "application/json").
- SetAuthToken(c.token).
- SetPathParams(map[string]string{
- "namespace": params.Namespace,
- "name": params.Name,
- }).
- SetBody(map[string]any{
- "namespace": params.Namespace,
- "name": params.Name,
- "nodeFieldSelector": params.NodeFieldSelector,
- "message": params.Message,
- }).
- SetResult(&responseMap).
- SetError(&responseMap).
- Put(stopWorkflowRelativeUrl)
- if err != nil {
- return errors.New(err.Error())
- }
- switch resp.StatusCode() {
- case http.StatusOK:
- return nil
- default:
- message, ok := responseMap["message"]
- if !ok {
- return errors.Errorf("%v", resp.Status())
- }
- return errors.Errorf("%v, %v", resp.Status(), message)
- }
- }
- type TerminateWorkflowParams struct {
- Namespace string
- Name string
- }
- // TerminateWorkflow 终止工作流运行,不调用所有的退出处理器
- func (c *Client) TerminateWorkflow(params TerminateWorkflowParams) error {
- responseMap := make(map[string]any)
- resp, err := c.restyClient.R().
- SetHeader("Content-Type", "application/json").
- SetAuthToken(c.token).
- SetPathParams(map[string]string{
- "namespace": params.Namespace,
- "name": params.Name,
- }).
- SetBody(map[string]any{
- "namespace": params.Namespace,
- "name": params.Name,
- }).
- SetResult(&responseMap).
- SetError(&responseMap).
- Put(terminateWorkflowRelativeUrl)
- if err != nil {
- return errors.New(err.Error())
- }
- switch resp.StatusCode() {
- case http.StatusOK:
- return nil
- default:
- message, ok := responseMap["message"]
- if !ok {
- return errors.Errorf("%v", resp.Status())
- }
- return errors.Errorf("%v, %v", resp.Status(), message)
- }
- }
- type SetWorkflowParams struct {
- Namespace string
- Name string
- NodeFieldSelector string
- OutputParameters string
- Message string
- Phase string
- }
- // SetWorkflow 设置工作流的参数
- func (c *Client) SetWorkflow(params SetWorkflowParams) error {
- responseMap := make(map[string]any)
- resp, err := c.restyClient.R().
- SetHeader("Content-Type", "application/json").
- SetAuthToken(c.token).
- SetPathParams(map[string]string{
- "namespace": params.Namespace,
- "name": params.Name,
- }).
- SetBody(map[string]any{
- "namespace": params.Namespace,
- "name": params.Name,
- "nodeFieldSelector": params.NodeFieldSelector,
- "outputParameters": params.OutputParameters,
- "message": params.Message,
- "phase": params.Phase,
- }).
- SetResult(&responseMap).
- SetError(&responseMap).
- Put(setWorkflowRelativeUrl)
- if err != nil {
- return errors.New(err.Error())
- }
- switch resp.StatusCode() {
- case http.StatusOK:
- return nil
- default:
- message, ok := responseMap["message"]
- if !ok {
- return errors.Errorf("%v", resp.Status())
- }
- return errors.Errorf("%v, %v", resp.Status(), message)
- }
- }
- type SuspendWorkflowParams struct {
- Namespace string
- Name string
- }
- // SuspendWorkflow 挂起工作流
- func (c *Client) SuspendWorkflow(params SuspendWorkflowParams) error {
- responseMap := make(map[string]any)
- resp, err := c.restyClient.R().
- SetHeader("Content-Type", "application/json").
- SetAuthToken(c.token).
- SetPathParams(map[string]string{
- "namespace": params.Namespace,
- "name": params.Name,
- }).
- SetBody(map[string]any{
- "namespace": params.Namespace,
- "name": params.Name,
- }).
- SetResult(&responseMap).
- SetError(&responseMap).
- Put(suspendWorkflowRelativeUrl)
- if err != nil {
- return errors.New(err.Error())
- }
- switch resp.StatusCode() {
- case http.StatusOK:
- return nil
- default:
- message, ok := responseMap["message"]
- if !ok {
- return errors.Errorf("%v", resp.Status())
- }
- return errors.Errorf("%v, %v", resp.Status(), message)
- }
- }
- type ResumeWorkflowParams struct {
- Namespace string
- Name string
- NodeFieldSelector string
- }
- // ResumeWorkflow 恢复被挂起的工作流
- func (c *Client) ResumeWorkflow(params ResumeWorkflowParams) error {
- responseMap := make(map[string]any)
- resp, err := c.restyClient.R().
- SetHeader("Content-Type", "application/json").
- SetAuthToken(c.token).
- SetPathParams(map[string]string{
- "namespace": params.Namespace,
- "name": params.Name,
- }).
- SetBody(map[string]any{
- "namespace": params.Namespace,
- "name": params.Name,
- }).
- SetResult(&responseMap).
- SetError(&responseMap).
- Put(resumeWorkflowRelativeUrl)
- if err != nil {
- return errors.New(err.Error())
- }
- switch resp.StatusCode() {
- case http.StatusOK:
- return nil
- default:
- message, ok := responseMap["message"]
- if !ok {
- return errors.Errorf("%v", resp.Status())
- }
- return errors.Errorf("%v, %v", resp.Status(), message)
- }
- }
- type EventCallback func(event map[string]any, eventErr error) error
- type GetWorkflowEventsStreamParams struct {
- Namespace string
- Name string
- }
- // GetWorkflowEventsStream 监听工作流事件
- func (c *Client) GetWorkflowEventsStream(params GetWorkflowEventsStreamParams, callback EventCallback) error {
- return nil
- }
|