workflow.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. package client
  2. import (
  3. "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow"
  4. "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
  5. "github.com/pkg/errors"
  6. )
  7. type CreateWorkflowParams struct {
  8. Namespace string
  9. WorkflowDefinition string
  10. }
  11. // CreateWorkflow 创建工作流
  12. func (c *Client) CreateWorkflow(params CreateWorkflowParams) (string, error) {
  13. wf := new(v1alpha1.Workflow)
  14. err := unmarshalYamlToJsonStruct([]byte(params.WorkflowDefinition), wf)
  15. if err != nil {
  16. return "", err
  17. }
  18. response, err := c.workflowService.CreateWorkflow(c.ctx, &workflow.WorkflowCreateRequest{
  19. Namespace: params.Namespace,
  20. Workflow: wf,
  21. })
  22. if err != nil {
  23. return "", errors.New(err.Error())
  24. }
  25. return response.Name, nil
  26. }
  27. type DeleteWorkflowParams struct {
  28. Namespace string
  29. Name string
  30. }
  31. // DeleteWorkflow 删除工作流
  32. func (c *Client) DeleteWorkflow(params DeleteWorkflowParams) error {
  33. _, err := c.workflowService.DeleteWorkflow(c.ctx, &workflow.WorkflowDeleteRequest{
  34. Namespace: params.Namespace,
  35. Name: params.Name,
  36. })
  37. if err != nil {
  38. return errors.New(err.Error())
  39. }
  40. return nil
  41. }
  42. type GetWorkflowsInNamespaceParams struct {
  43. Namespace string
  44. }
  45. // GetWorkflowsInNamespace 获取命名空间下的工作流
  46. func (c *Client) GetWorkflowsInNamespace(params GetWorkflowsInNamespaceParams) (v1alpha1.Workflows, error) {
  47. response, err := c.workflowService.ListWorkflows(c.ctx, &workflow.WorkflowListRequest{
  48. Namespace: params.Namespace,
  49. })
  50. if err != nil {
  51. return nil, errors.New(err.Error())
  52. }
  53. return response.Items, nil
  54. }
  55. type GetWorkflowParams struct {
  56. Namespace string
  57. Name string
  58. }
  59. // GetWorkflow 获取指定的工作流
  60. func (c *Client) GetWorkflow(params GetWorkflowParams) (*v1alpha1.Workflow, error) {
  61. response, err := c.workflowService.GetWorkflow(c.ctx, &workflow.WorkflowGetRequest{
  62. Namespace: params.Namespace,
  63. Name: params.Name,
  64. })
  65. if err != nil {
  66. return nil, errors.New(err.Error())
  67. }
  68. return response, nil
  69. }
  70. type LintWorkflowParams struct {
  71. Namespace string
  72. WorkflowDefinition string
  73. }
  74. // LintWorkflow 检查工作流定义语法
  75. func (c *Client) LintWorkflow(params LintWorkflowParams) error {
  76. wf := new(v1alpha1.Workflow)
  77. err := unmarshalYamlToJsonStruct([]byte(params.WorkflowDefinition), wf)
  78. if err != nil {
  79. return err
  80. }
  81. _, err = c.workflowService.LintWorkflow(c.ctx, &workflow.WorkflowLintRequest{
  82. Namespace: params.Namespace,
  83. Workflow: wf,
  84. })
  85. if err != nil {
  86. return errors.New(err.Error())
  87. }
  88. return nil
  89. }
  90. type SubmitWorkflowFromWorkflowTemplateParams struct {
  91. Namespace string
  92. TemplateName string
  93. Parameters []string
  94. }
  95. // SubmitWorkflowFromWorkflowTemplate 基于工作流模板提交工作流
  96. func (c *Client) SubmitWorkflowFromWorkflowTemplate(params SubmitWorkflowFromWorkflowTemplateParams) (string, error) {
  97. response, err := c.workflowService.SubmitWorkflow(c.ctx, &workflow.WorkflowSubmitRequest{
  98. Namespace: params.Namespace,
  99. ResourceKind: "WorkflowTemplate",
  100. ResourceName: params.TemplateName,
  101. SubmitOptions: &v1alpha1.SubmitOpts{
  102. Parameters: params.Parameters,
  103. },
  104. })
  105. if err != nil {
  106. return "", errors.New(err.Error())
  107. }
  108. return response.Name, nil
  109. }
  110. type ResubmitWorkflowParams struct {
  111. Namespace string
  112. Name string
  113. Memoized bool
  114. ResubmitParameters []string
  115. }
  116. // ResubmitWorkflow 重提交工作流
  117. // 有三种方式可以用来重复提交(可以结合使用):重新运行,基于缓存的和传递重提交参数
  118. // 基于缓存的必须在Error和Failed的工作流上才可以使用
  119. func (c *Client) ResubmitWorkflow(params ResubmitWorkflowParams) (string, error) {
  120. response, err := c.workflowService.ResubmitWorkflow(c.ctx, &workflow.WorkflowResubmitRequest{
  121. Namespace: params.Namespace,
  122. Name: params.Name,
  123. Memoized: params.Memoized,
  124. Parameters: params.ResubmitParameters,
  125. })
  126. if err != nil {
  127. return "", errors.New(err.Error())
  128. }
  129. return response.Name, nil
  130. }
  131. type RetryWorkflowParams struct {
  132. Namespace string
  133. Name string
  134. RetryParameters []string
  135. RetrySuccessfulWorkflow bool
  136. RetryWorkflowNodeFieldSelector string
  137. }
  138. // RetryWorkflow 重新运行工作流(默认只能失败的工作流上重新运行)
  139. func (c *Client) RetryWorkflow(params RetryWorkflowParams) error {
  140. _, err := c.workflowService.RetryWorkflow(c.ctx, &workflow.WorkflowRetryRequest{
  141. Namespace: params.Namespace,
  142. Name: params.Name,
  143. RestartSuccessful: params.RetrySuccessfulWorkflow,
  144. NodeFieldSelector: params.RetryWorkflowNodeFieldSelector,
  145. Parameters: params.RetryParameters,
  146. })
  147. if err != nil {
  148. return errors.New(err.Error())
  149. }
  150. return nil
  151. }
  152. type StopWorkflowParams struct {
  153. Namespace string
  154. Name string
  155. NodeFieldSelector string
  156. Message string
  157. }
  158. // StopWorkflow 终止工作流运行,会调用所有的退出处理器
  159. func (c *Client) StopWorkflow(params StopWorkflowParams) error {
  160. _, err := c.workflowService.StopWorkflow(c.ctx, &workflow.WorkflowStopRequest{
  161. Namespace: params.Namespace,
  162. Name: params.Name,
  163. NodeFieldSelector: params.NodeFieldSelector,
  164. Message: params.Message,
  165. })
  166. if err != nil {
  167. return errors.New(err.Error())
  168. }
  169. return nil
  170. }
  171. type TerminateWorkflowParams struct {
  172. Namespace string
  173. Name string
  174. }
  175. // TerminateWorkflow 终止工作流运行,不调用所有的退出处理器
  176. func (c *Client) TerminateWorkflow(params TerminateWorkflowParams) error {
  177. _, err := c.workflowService.TerminateWorkflow(c.ctx, &workflow.WorkflowTerminateRequest{
  178. Namespace: params.Namespace,
  179. Name: params.Name,
  180. })
  181. if err != nil {
  182. return errors.New(err.Error())
  183. }
  184. return nil
  185. }
  186. type SetWorkflowParams struct {
  187. Namespace string
  188. Name string
  189. NodeFieldSelector string
  190. OutputParameters string
  191. Message string
  192. Phase string
  193. }
  194. // SetWorkflow 设置工作流的参数
  195. func (c *Client) SetWorkflow(params SetWorkflowParams) error {
  196. _, err := c.workflowService.SetWorkflow(c.ctx, &workflow.WorkflowSetRequest{
  197. Namespace: params.Namespace,
  198. Name: params.Name,
  199. NodeFieldSelector: params.NodeFieldSelector,
  200. Message: params.Message,
  201. Phase: params.Phase,
  202. OutputParameters: params.OutputParameters,
  203. })
  204. if err != nil {
  205. return errors.New(err.Error())
  206. }
  207. return nil
  208. }
  209. type SuspendWorkflowParams struct {
  210. Namespace string
  211. Name string
  212. }
  213. // SuspendWorkflow 挂起工作流
  214. func (c *Client) SuspendWorkflow(params SuspendWorkflowParams) error {
  215. _, err := c.workflowService.SuspendWorkflow(c.ctx, &workflow.WorkflowSuspendRequest{
  216. Namespace: params.Namespace,
  217. Name: params.Name,
  218. })
  219. if err != nil {
  220. return errors.New(err.Error())
  221. }
  222. return nil
  223. }
  224. type ResumeWorkflowParams struct {
  225. Namespace string
  226. Name string
  227. NodeFieldSelector string
  228. }
  229. // ResumeWorkflow 恢复被挂起的工作流
  230. func (c *Client) ResumeWorkflow(params ResumeWorkflowParams) error {
  231. _, err := c.workflowService.ResumeWorkflow(c.ctx, &workflow.WorkflowResumeRequest{
  232. Namespace: params.Namespace,
  233. Name: params.Name,
  234. NodeFieldSelector: params.NodeFieldSelector,
  235. })
  236. if err != nil {
  237. return errors.New(err.Error())
  238. }
  239. return nil
  240. }