workflow.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501
  1. package client
  2. import (
  3. "github.com/pkg/errors"
  4. "net/http"
  5. )
  6. const (
  7. createWorkflowRelativeUrl = "/api/v1/workflows/{namespace}"
  8. deleteWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}"
  9. getWorkflowsInNamespaceRelativeUrl = "/api/v1/workflows/{namespace}"
  10. getWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}"
  11. lintWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/lint"
  12. submitWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/submit"
  13. resubmitWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}/resubmit"
  14. retryWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}/retry"
  15. setWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}/set"
  16. stopWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}/stop"
  17. terminateWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}/terminate"
  18. suspendWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}/suspend"
  19. resumeWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}/resume"
  20. getEventsStreamRelativeUrl = "/api/v1/stream/events/{namespace}"
  21. getWorkflowEventsStreamRelativeUrl = "/api/v1/workflow-events/{namespace}"
  22. )
  23. type CreateWorkflowParams struct {
  24. Namespace string
  25. WorkflowDefinition map[string]any
  26. }
  27. // CreateWorkflow 创建工作流
  28. func (c *Client) CreateWorkflow(params CreateWorkflowParams) (string, error) {
  29. responseMap := make(map[string]any)
  30. resp, err := c.restyClient.R().
  31. SetHeader("Content-Type", "application/json").
  32. SetAuthToken(c.token).
  33. SetPathParams(map[string]string{
  34. "namespace": params.Namespace,
  35. }).
  36. SetBody(map[string]any{
  37. "namespace": params.Namespace,
  38. "workflow": params.WorkflowDefinition,
  39. }).
  40. SetResult(&responseMap).
  41. SetError(&responseMap).
  42. Post(createWorkflowRelativeUrl)
  43. if err != nil {
  44. return "", errors.New(err.Error())
  45. }
  46. switch resp.StatusCode() {
  47. case http.StatusOK:
  48. metadata, ok := responseMap["metadata"]
  49. if !ok {
  50. return "", errors.New("metadata为空")
  51. }
  52. metadataMap, ok := metadata.(map[string]any)
  53. if !ok {
  54. return "", errors.New("metadata不是map")
  55. }
  56. workflowName, ok := metadataMap["name"]
  57. if !ok {
  58. return "", errors.New("metadata中没有工作流名称")
  59. }
  60. workflowNameStr, ok := workflowName.(string)
  61. if !ok {
  62. return "", errors.New("工作流名称不是字符串")
  63. }
  64. return workflowNameStr, nil
  65. case http.StatusConflict:
  66. return "", errors.New("工作流已存在")
  67. default:
  68. message, ok := responseMap["message"]
  69. if !ok {
  70. return "", errors.Errorf("%v", resp.Status())
  71. }
  72. return "", errors.Errorf("%v, %v", resp.Status(), message)
  73. }
  74. }
  75. type DeleteWorkflowParams struct {
  76. Namespace string
  77. Name string
  78. }
  79. // DeleteWorkflow 删除工作流
  80. func (c *Client) DeleteWorkflow(params DeleteWorkflowParams) error {
  81. responseMap := make(map[string]any)
  82. resp, err := c.restyClient.R().
  83. SetHeader("Content-Type", "application/json").
  84. SetAuthToken(c.token).
  85. SetPathParams(map[string]string{
  86. "namespace": params.Namespace,
  87. "name": params.Name,
  88. }).
  89. SetResult(&responseMap).
  90. SetError(&responseMap).
  91. Delete(deleteWorkflowRelativeUrl)
  92. if err != nil {
  93. return errors.New(err.Error())
  94. }
  95. switch resp.StatusCode() {
  96. case http.StatusOK:
  97. return nil
  98. default:
  99. message, ok := responseMap["message"]
  100. if !ok {
  101. return errors.Errorf("%v", resp.Status())
  102. }
  103. return errors.Errorf("%v, %v", resp.Status(), message)
  104. }
  105. }
  106. type GetWorkflowsInNamespaceParams struct {
  107. Namespace string
  108. }
  109. // GetWorkflowsInNamespace 获取命名空间下的工作流
  110. func (c *Client) GetWorkflowsInNamespace(params GetWorkflowsInNamespaceParams) ([]map[string]any, error) {
  111. responseMap := make(map[string]any)
  112. resp, err := c.restyClient.R().
  113. SetHeader("Content-Type", "application/json").
  114. SetAuthToken(c.token).
  115. SetPathParams(map[string]string{
  116. "namespace": params.Namespace,
  117. }).
  118. SetResult(&responseMap).
  119. SetError(&responseMap).
  120. Get(getWorkflowsInNamespaceRelativeUrl)
  121. if err != nil {
  122. return nil, errors.New(err.Error())
  123. }
  124. switch resp.StatusCode() {
  125. case http.StatusOK:
  126. itemsValue, ok := responseMap["items"]
  127. if !ok {
  128. return nil, errors.New("没有获取到items参数")
  129. }
  130. items, ok := itemsValue.([]any)
  131. if !ok {
  132. return nil, errors.New("items不是slice")
  133. }
  134. templateDefinitions := make([]map[string]any, len(items))
  135. for i, item := range items {
  136. templateDefinition, ok := item.(map[string]any)
  137. if !ok {
  138. return nil, errors.New("item无法转换为map[string]any")
  139. }
  140. templateDefinitions[i] = templateDefinition
  141. }
  142. return templateDefinitions, nil
  143. default:
  144. message, ok := responseMap["message"]
  145. if !ok {
  146. return nil, errors.Errorf("%v", resp.Status())
  147. }
  148. return nil, errors.Errorf("%v, %v", resp.Status(), message)
  149. }
  150. }
  151. type GetWorkflowParams struct {
  152. Namespace string
  153. Name string
  154. }
  155. // GetWorkflow 获取指定的工作流
  156. func (c *Client) GetWorkflow(params GetWorkflowParams) (map[string]any, error) {
  157. responseMap := make(map[string]any)
  158. resp, err := c.restyClient.R().
  159. SetHeader("Content-Type", "application/json").
  160. SetAuthToken(c.token).
  161. SetPathParams(map[string]string{
  162. "namespace": params.Namespace,
  163. "name": params.Name,
  164. }).
  165. SetResult(&responseMap).
  166. SetError(&responseMap).
  167. Get(getWorkflowRelativeUrl)
  168. if err != nil {
  169. return nil, errors.New(err.Error())
  170. }
  171. switch resp.StatusCode() {
  172. case http.StatusOK:
  173. return responseMap, nil
  174. default:
  175. message, ok := responseMap["message"]
  176. if !ok {
  177. return nil, errors.Errorf("%v", resp.Status())
  178. }
  179. return nil, errors.Errorf("%v, %v", resp.Status(), message)
  180. }
  181. }
  182. type LintWorkflowParams struct {
  183. Namespace string
  184. WorkflowDefinition map[string]any
  185. }
  186. // LintWorkflow 检查工作流定义语法
  187. func (c *Client) LintWorkflow(params LintWorkflowParams) error {
  188. responseMap := make(map[string]any)
  189. resp, err := c.restyClient.R().
  190. SetHeader("Content-Type", "application/json").
  191. SetAuthToken(c.token).
  192. SetPathParams(map[string]string{
  193. "namespace": params.Namespace,
  194. }).
  195. SetBody(map[string]any{
  196. "namespace": params.Namespace,
  197. "workflow": params.WorkflowDefinition,
  198. }).
  199. SetResult(&responseMap).
  200. SetError(&responseMap).
  201. Post(lintWorkflowRelativeUrl)
  202. if err != nil {
  203. return errors.New(err.Error())
  204. }
  205. switch resp.StatusCode() {
  206. case http.StatusOK:
  207. return nil
  208. case http.StatusConflict:
  209. return errors.New("工作流已存在")
  210. default:
  211. message, ok := responseMap["message"]
  212. if !ok {
  213. return errors.Errorf("%v", resp.Status())
  214. }
  215. return errors.Errorf("%v, %v", resp.Status(), message)
  216. }
  217. }
  218. type SubmitWorkflowFromWorkflowTemplateParams struct {
  219. Namespace string
  220. TemplateName string
  221. Parameters []string
  222. }
  223. // SubmitWorkflowFromWorkflowTemplate 基于工作流模板提交工作流
  224. func (c *Client) SubmitWorkflowFromWorkflowTemplate(params SubmitWorkflowFromWorkflowTemplateParams) (string, error) {
  225. responseMap := make(map[string]any)
  226. resp, err := c.restyClient.R().
  227. SetHeader("Content-Type", "application/json").
  228. SetAuthToken(c.token).
  229. SetPathParams(map[string]string{
  230. "namespace": params.Namespace,
  231. }).
  232. SetBody(map[string]any{
  233. "namespace": params.Namespace,
  234. "resourceKind": "WorkflowTemplate",
  235. "resourceName": params.TemplateName,
  236. "submitOptions": map[string]any{
  237. "parameters": params.Parameters,
  238. },
  239. }).
  240. SetResult(&responseMap).
  241. SetError(&responseMap).
  242. Post(submitWorkflowRelativeUrl)
  243. if err != nil {
  244. return "", errors.New(err.Error())
  245. }
  246. switch resp.StatusCode() {
  247. case http.StatusOK:
  248. metadata, ok := responseMap["metadata"]
  249. if !ok {
  250. return "", errors.New("metadata为空")
  251. }
  252. metadataMap, ok := metadata.(map[string]any)
  253. if !ok {
  254. return "", errors.New("metadata不是map")
  255. }
  256. workflowName, ok := metadataMap["name"]
  257. if !ok {
  258. return "", errors.New("metadata中没有工作流名称")
  259. }
  260. workflowNameStr, ok := workflowName.(string)
  261. if !ok {
  262. return "", errors.New("工作流名称不是字符串")
  263. }
  264. return workflowNameStr, nil
  265. case http.StatusConflict:
  266. return "", errors.New("工作流已存在")
  267. default:
  268. message, ok := responseMap["message"]
  269. if !ok {
  270. return "", errors.Errorf("%v", resp.Status())
  271. }
  272. return "", errors.Errorf("%v, %v", resp.Status(), message)
  273. }
  274. }
  275. type ResubmitWorkflowParams struct {
  276. Namespace string
  277. Name string
  278. Memoized bool
  279. ResubmitParameters []string
  280. }
  281. // ResubmitWorkflow 重提交工作流
  282. // 有三种方式可以用来重复提交(可以结合使用):重新运行,基于缓存的和传递重提交参数
  283. // 基于缓存的必须在Error和Failed的工作流上才可以使用
  284. func (c *Client) ResubmitWorkflow(params ResubmitWorkflowParams) (string, error) {
  285. responseMap := make(map[string]any)
  286. resp, err := c.restyClient.R().
  287. SetHeader("Content-Type", "application/json").
  288. SetAuthToken(c.token).
  289. SetPathParams(map[string]string{
  290. "namespace": params.Namespace,
  291. "name": params.Name,
  292. }).
  293. SetBody(map[string]any{
  294. "namespace": params.Namespace,
  295. "name": params.Name,
  296. "memoized": params.Memoized,
  297. "parameters": params.ResubmitParameters,
  298. }).
  299. SetResult(&responseMap).
  300. SetError(&responseMap).
  301. Put(resubmitWorkflowRelativeUrl)
  302. if err != nil {
  303. return "", errors.New(err.Error())
  304. }
  305. switch resp.StatusCode() {
  306. case http.StatusOK:
  307. metadata, ok := responseMap["metadata"]
  308. if !ok {
  309. return "", errors.New("metadata为空")
  310. }
  311. metadataMap, ok := metadata.(map[string]any)
  312. if !ok {
  313. return "", errors.New("metadata不是map")
  314. }
  315. workflowName, ok := metadataMap["name"]
  316. if !ok {
  317. return "", errors.New("metadata中没有工作流名称")
  318. }
  319. workflowNameStr, ok := workflowName.(string)
  320. if !ok {
  321. return "", errors.New("工作流名称不是字符串")
  322. }
  323. return workflowNameStr, nil
  324. case http.StatusConflict:
  325. return "", errors.New("工作流已存在")
  326. default:
  327. message, ok := responseMap["message"]
  328. if !ok {
  329. return "", errors.Errorf("%v", resp.Status())
  330. }
  331. return "", errors.Errorf("%v, %v", resp.Status(), message)
  332. }
  333. }
  334. type RetryWorkflowParams struct {
  335. Namespace string
  336. Name string
  337. RetryParameters []string
  338. RetryOnSuccessfulWorkflow bool
  339. RetryWorkflowNodeFieldSelector string
  340. }
  341. // RetryWorkflow 重新运行工作流(默认只能失败的工作流上重新运行)
  342. func (c *Client) RetryWorkflow(params RetryWorkflowParams) error {
  343. responseMap := make(map[string]any)
  344. resp, err := c.restyClient.R().
  345. SetHeader("Content-Type", "application/json").
  346. SetAuthToken(c.token).
  347. SetPathParams(map[string]string{
  348. "namespace": params.Namespace,
  349. "name": params.Name,
  350. }).
  351. SetBody(map[string]any{
  352. "namespace": params.Namespace,
  353. "name": params.Name,
  354. "parameters": params.RetryParameters,
  355. "restartSuccessful": params.RetryOnSuccessfulWorkflow,
  356. "nodeFieldSelector": params.RetryWorkflowNodeFieldSelector,
  357. }).
  358. SetResult(&responseMap).
  359. SetError(&responseMap).
  360. Put(retryWorkflowRelativeUrl)
  361. if err != nil {
  362. return errors.New(err.Error())
  363. }
  364. switch resp.StatusCode() {
  365. case http.StatusConflict:
  366. return errors.New("工作流已存在")
  367. default:
  368. message, ok := responseMap["message"]
  369. if !ok {
  370. return errors.Errorf("%v", resp.Status())
  371. }
  372. return errors.Errorf("%v, %v", resp.Status(), message)
  373. }
  374. }
  375. type StopWorkflowParams struct {
  376. Namespace string
  377. Name string
  378. NodeFieldSelector string
  379. Message string
  380. }
  381. // StopWorkflow 终止工作流运行,会调用所有的退出处理器
  382. func (c *Client) StopWorkflow(params StopWorkflowParams) error {
  383. return nil
  384. }
  385. type TerminalWorkflowParams struct {
  386. Namespace string
  387. Name string
  388. }
  389. // TerminalWorkflow 终止工作流运行,不调用所有的退出处理器
  390. func (c *Client) TerminalWorkflow(params TerminalWorkflowParams) error {
  391. return nil
  392. }
  393. type SetWorkflowParams struct {
  394. Namespace string
  395. Name string
  396. NodeFieldSelector string
  397. Message string
  398. Phase string
  399. OutputParameters []string
  400. }
  401. // SetWorkflow 设置工作流的参数
  402. func (c *Client) SetWorkflow(params SetWorkflowParams) error {
  403. return nil
  404. }
  405. type SuspendWorkflowParams struct {
  406. Namespace string
  407. Name string
  408. }
  409. // SuspendWorkflow 挂起工作流
  410. func (c *Client) SuspendWorkflow(params SuspendWorkflowParams) error {
  411. return nil
  412. }
  413. type ResumeWorkflowParams struct {
  414. Namespace string
  415. Name string
  416. NodeFieldSelector string
  417. }
  418. // ResumeWorkflow 恢复被挂起的工作流
  419. func (c *Client) ResumeWorkflow(params ResumeWorkflowParams) error {
  420. return nil
  421. }
  422. type EventCallback func(event map[string]any, eventErr error) error
  423. type GetWorkflowEventsStreamParams struct {
  424. Namespace string
  425. Name string
  426. }
  427. // GetWorkflowEventsStream 监听工作流事件
  428. func (c *Client) GetWorkflowEventsStream(params GetWorkflowEventsStreamParams, callback EventCallback) error {
  429. return nil
  430. }