workflow.go 7.3 KB


  1. package client
  2. import (
  3. "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow"
  4. "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
  5. "github.com/pkg/errors"
  6. )
  7. type CreateWorkflowParams struct {
  8. Namespace string
  9. WorkflowDefinition string
  10. }
  11. // CreateWorkflow 创建工作流
  12. func (c *Client) CreateWorkflow(params CreateWorkflowParams) (string, error) {
  13. wf := new(v1alpha1.Workflow)
  14. err := unmarshalYamlToJsonStruct([]byte(params.WorkflowDefinition), wf)
  15. if err != nil {
  16. return "", err
  17. }
  18. response, err := c.workflowService.CreateWorkflow(c.ctx, &workflow.WorkflowCreateRequest{
  19. Namespace: params.Namespace,
  20. Workflow: wf,
  21. })
  22. if err != nil {
  23. return "", errors.New(err.Error())
  24. }
  25. return response.Name, nil
  26. }
  27. type DeleteWorkflowParams struct {
  28. Namespace string
  29. Name string
  30. }
  31. // DeleteWorkflow 删除工作流
  32. func (c *Client) DeleteWorkflow(params DeleteWorkflowParams) error {
  33. _, err := c.workflowService.DeleteWorkflow(c.ctx, &workflow.WorkflowDeleteRequest{
  34. Namespace: params.Namespace,
  35. Name: params.Name,
  36. })
  37. if err != nil {
  38. return errors.New(err.Error())
  39. }
  40. return nil
  41. }
  42. type GetWorkflowsInNamespaceParams struct {
  43. Namespace string
  44. }
  45. // GetWorkflowsInNamespace 获取命名空间下的工作流
  46. func (c *Client) GetWorkflowsInNamespace(params GetWorkflowsInNamespaceParams) (v1alpha1.Workflows, error) {
  47. response, err := c.workflowService.ListWorkflows(c.ctx, &workflow.WorkflowListRequest{
  48. Namespace: params.Namespace,
  49. })
  50. if err != nil {
  51. return nil, errors.New(err.Error())
  52. }
  53. return response.Items, nil
  54. }
  55. type GetWorkflowParams struct {
  56. Namespace string
  57. Name string
  58. }
  59. // GetWorkflow 获取指定的工作流
  60. func (c *Client) GetWorkflow(params GetWorkflowParams) (*v1alpha1.Workflow, error) {
  61. response, err := c.workflowService.GetWorkflow(c.ctx, &workflow.WorkflowGetRequest{
  62. Namespace: params.Namespace,
  63. Name: params.Name,
  64. })
  65. if err != nil {
  66. return nil, errors.New(err.Error())
  67. }
  68. return response, nil
  69. }
  70. type LintWorkflowParams struct {
  71. Namespace string
  72. WorkflowDefinition string
  73. }
  74. // LintWorkflow 检查工作流定义语法
  75. func (c *Client) LintWorkflow(params LintWorkflowParams) error {
  76. wf := new(v1alpha1.Workflow)
  77. err := unmarshalYamlToJsonStruct([]byte(params.WorkflowDefinition), wf)
  78. if err != nil {
  79. return err
  80. }
  81. _, err = c.workflowService.LintWorkflow(c.ctx, &workflow.WorkflowLintRequest{
  82. Namespace: params.Namespace,
  83. Workflow: wf,
  84. })
  85. if err != nil {
  86. return errors.New(err.Error())
  87. }
  88. return nil
  89. }
  90. type SubmitWorkflowFromWorkflowTemplateParams struct {
  91. Namespace string
  92. TemplateName string
  93. Parameters []string
  94. Labels string
  95. }
  96. // SubmitWorkflowFromWorkflowTemplate 基于工作流模板提交工作流
  97. func (c *Client) SubmitWorkflowFromWorkflowTemplate(params SubmitWorkflowFromWorkflowTemplateParams) (string, error) {
  98. response, err := c.workflowService.SubmitWorkflow(c.ctx, &workflow.WorkflowSubmitRequest{
  99. Namespace: params.Namespace,
  100. ResourceKind: "WorkflowTemplate",
  101. ResourceName: params.TemplateName,
  102. SubmitOptions: &v1alpha1.SubmitOpts{
  103. Parameters: params.Parameters,
  104. Labels: params.Labels,
  105. },
  106. })
  107. if err != nil {
  108. return "", errors.New(err.Error())
  109. }
  110. return response.Name, nil
  111. }
  112. type ResubmitWorkflowParams struct {
  113. Namespace string
  114. Name string
  115. Memoized bool
  116. ResubmitParameters []string
  117. }
  118. // ResubmitWorkflow 重提交工作流
  119. // 有三种方式可以用来重复提交(可以结合使用):重新运行,基于缓存的和传递重提交参数
  120. // 基于缓存的必须在Error和Failed的工作流上才可以使用
  121. func (c *Client) ResubmitWorkflow(params ResubmitWorkflowParams) (string, error) {
  122. response, err := c.workflowService.ResubmitWorkflow(c.ctx, &workflow.WorkflowResubmitRequest{
  123. Namespace: params.Namespace,
  124. Name: params.Name,
  125. Memoized: params.Memoized,
  126. Parameters: params.ResubmitParameters,
  127. })
  128. if err != nil {
  129. return "", errors.New(err.Error())
  130. }
  131. return response.Name, nil
  132. }
  133. type RetryWorkflowParams struct {
  134. Namespace string
  135. Name string
  136. RetryParameters []string
  137. RetrySuccessfulWorkflow bool
  138. RetryWorkflowNodeFieldSelector string
  139. }
  140. // RetryWorkflow 重新运行工作流(默认只能失败的工作流上重新运行)
  141. func (c *Client) RetryWorkflow(params RetryWorkflowParams) error {
  142. _, err := c.workflowService.RetryWorkflow(c.ctx, &workflow.WorkflowRetryRequest{
  143. Namespace: params.Namespace,
  144. Name: params.Name,
  145. RestartSuccessful: params.RetrySuccessfulWorkflow,
  146. NodeFieldSelector: params.RetryWorkflowNodeFieldSelector,
  147. Parameters: params.RetryParameters,
  148. })
  149. if err != nil {
  150. return errors.New(err.Error())
  151. }
  152. return nil
  153. }
  154. type StopWorkflowParams struct {
  155. Namespace string
  156. Name string
  157. NodeFieldSelector string
  158. Message string
  159. }
  160. // StopWorkflow 终止工作流运行,会调用所有的退出处理器
  161. func (c *Client) StopWorkflow(params StopWorkflowParams) error {
  162. _, err := c.workflowService.StopWorkflow(c.ctx, &workflow.WorkflowStopRequest{
  163. Namespace: params.Namespace,
  164. Name: params.Name,
  165. NodeFieldSelector: params.NodeFieldSelector,
  166. Message: params.Message,
  167. })
  168. if err != nil {
  169. return errors.New(err.Error())
  170. }
  171. return nil
  172. }
  173. type TerminateWorkflowParams struct {
  174. Namespace string
  175. Name string
  176. }
  177. // TerminateWorkflow 终止工作流运行,不调用所有的退出处理器
  178. func (c *Client) TerminateWorkflow(params TerminateWorkflowParams) error {
  179. _, err := c.workflowService.TerminateWorkflow(c.ctx, &workflow.WorkflowTerminateRequest{
  180. Namespace: params.Namespace,
  181. Name: params.Name,
  182. })
  183. if err != nil {
  184. return errors.New(err.Error())
  185. }
  186. return nil
  187. }
  188. type SetWorkflowParams struct {
  189. Namespace string
  190. Name string
  191. NodeFieldSelector string
  192. OutputParameters string
  193. Message string
  194. Phase string
  195. }
  196. // SetWorkflow 设置工作流的参数
  197. func (c *Client) SetWorkflow(params SetWorkflowParams) error {
  198. _, err := c.workflowService.SetWorkflow(c.ctx, &workflow.WorkflowSetRequest{
  199. Namespace: params.Namespace,
  200. Name: params.Name,
  201. NodeFieldSelector: params.NodeFieldSelector,
  202. Message: params.Message,
  203. Phase: params.Phase,
  204. OutputParameters: params.OutputParameters,
  205. })
  206. if err != nil {
  207. return errors.New(err.Error())
  208. }
  209. return nil
  210. }
  211. type SuspendWorkflowParams struct {
  212. Namespace string
  213. Name string
  214. }
  215. // SuspendWorkflow 挂起工作流
  216. func (c *Client) SuspendWorkflow(params SuspendWorkflowParams) error {
  217. _, err := c.workflowService.SuspendWorkflow(c.ctx, &workflow.WorkflowSuspendRequest{
  218. Namespace: params.Namespace,
  219. Name: params.Name,
  220. })
  221. if err != nil {
  222. return errors.New(err.Error())
  223. }
  224. return nil
  225. }
  226. type ResumeWorkflowParams struct {
  227. Namespace string
  228. Name string
  229. NodeFieldSelector string
  230. }
  231. // ResumeWorkflow 恢复被挂起的工作流
  232. func (c *Client) ResumeWorkflow(params ResumeWorkflowParams) error {
  233. _, err := c.workflowService.ResumeWorkflow(c.ctx, &workflow.WorkflowResumeRequest{
  234. Namespace: params.Namespace,
  235. Name: params.Name,
  236. NodeFieldSelector: params.NodeFieldSelector,
  237. })
  238. if err != nil {
  239. return errors.New(err.Error())
  240. }
  241. return nil
  242. }