workflow.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650
  1. package client
  2. import (
  3. "github.com/pkg/errors"
  4. "net/http"
  5. )
  6. const (
  7. createWorkflowRelativeUrl = "/api/v1/workflows/{namespace}"
  8. deleteWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}"
  9. getWorkflowsInNamespaceRelativeUrl = "/api/v1/workflows/{namespace}"
  10. getWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}"
  11. lintWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/lint"
  12. submitWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/submit"
  13. resubmitWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}/resubmit"
  14. retryWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}/retry"
  15. stopWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}/stop"
  16. terminateWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}/terminate"
  17. setWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}/set"
  18. suspendWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}/suspend"
  19. resumeWorkflowRelativeUrl = "/api/v1/workflows/{namespace}/{name}/resume"
  20. getWorkflowEventsStreamRelativeUrl = "/api/v1/workflow-events/{namespace}"
  21. )
  22. type CreateWorkflowParams struct {
  23. Namespace string
  24. WorkflowDefinition map[string]any
  25. }
  26. // CreateWorkflow 创建工作流
  27. func (c *Client) CreateWorkflow(params CreateWorkflowParams) (string, error) {
  28. responseMap := make(map[string]any)
  29. resp, err := c.restyClient.R().
  30. SetHeader("Content-Type", "application/json").
  31. SetAuthToken(c.token).
  32. SetPathParams(map[string]string{
  33. "namespace": params.Namespace,
  34. }).
  35. SetBody(map[string]any{
  36. "namespace": params.Namespace,
  37. "workflow": params.WorkflowDefinition,
  38. }).
  39. SetResult(&responseMap).
  40. SetError(&responseMap).
  41. Post(createWorkflowRelativeUrl)
  42. if err != nil {
  43. return "", errors.New(err.Error())
  44. }
  45. switch resp.StatusCode() {
  46. case http.StatusOK:
  47. metadata, ok := responseMap["metadata"]
  48. if !ok {
  49. return "", errors.New("metadata为空")
  50. }
  51. metadataMap, ok := metadata.(map[string]any)
  52. if !ok {
  53. return "", errors.New("metadata不是map")
  54. }
  55. workflowName, ok := metadataMap["name"]
  56. if !ok {
  57. return "", errors.New("metadata中没有工作流名称")
  58. }
  59. workflowNameStr, ok := workflowName.(string)
  60. if !ok {
  61. return "", errors.New("工作流名称不是字符串")
  62. }
  63. return workflowNameStr, nil
  64. case http.StatusConflict:
  65. return "", errors.New("工作流已存在")
  66. default:
  67. message, ok := responseMap["message"]
  68. if !ok {
  69. return "", errors.Errorf("%v", resp.Status())
  70. }
  71. return "", errors.Errorf("%v, %v", resp.Status(), message)
  72. }
  73. }
  74. type DeleteWorkflowParams struct {
  75. Namespace string
  76. Name string
  77. }
  78. // DeleteWorkflow 删除工作流
  79. func (c *Client) DeleteWorkflow(params DeleteWorkflowParams) error {
  80. responseMap := make(map[string]any)
  81. resp, err := c.restyClient.R().
  82. SetHeader("Content-Type", "application/json").
  83. SetAuthToken(c.token).
  84. SetPathParams(map[string]string{
  85. "namespace": params.Namespace,
  86. "name": params.Name,
  87. }).
  88. SetResult(&responseMap).
  89. SetError(&responseMap).
  90. Delete(deleteWorkflowRelativeUrl)
  91. if err != nil {
  92. return errors.New(err.Error())
  93. }
  94. switch resp.StatusCode() {
  95. case http.StatusOK:
  96. return nil
  97. default:
  98. message, ok := responseMap["message"]
  99. if !ok {
  100. return errors.Errorf("%v", resp.Status())
  101. }
  102. return errors.Errorf("%v, %v", resp.Status(), message)
  103. }
  104. }
  105. type GetWorkflowsInNamespaceParams struct {
  106. Namespace string
  107. }
  108. // GetWorkflowsInNamespace 获取命名空间下的工作流
  109. func (c *Client) GetWorkflowsInNamespace(params GetWorkflowsInNamespaceParams) ([]map[string]any, error) {
  110. responseMap := make(map[string]any)
  111. resp, err := c.restyClient.R().
  112. SetHeader("Content-Type", "application/json").
  113. SetAuthToken(c.token).
  114. SetPathParams(map[string]string{
  115. "namespace": params.Namespace,
  116. }).
  117. SetResult(&responseMap).
  118. SetError(&responseMap).
  119. Get(getWorkflowsInNamespaceRelativeUrl)
  120. if err != nil {
  121. return nil, errors.New(err.Error())
  122. }
  123. switch resp.StatusCode() {
  124. case http.StatusOK:
  125. itemsValue, ok := responseMap["items"]
  126. if !ok {
  127. return nil, errors.New("没有获取到items参数")
  128. }
  129. items, ok := itemsValue.([]any)
  130. if !ok {
  131. return nil, errors.New("items不是slice")
  132. }
  133. templateDefinitions := make([]map[string]any, len(items))
  134. for i, item := range items {
  135. templateDefinition, ok := item.(map[string]any)
  136. if !ok {
  137. return nil, errors.New("item无法转换为map[string]any")
  138. }
  139. templateDefinitions[i] = templateDefinition
  140. }
  141. return templateDefinitions, nil
  142. default:
  143. message, ok := responseMap["message"]
  144. if !ok {
  145. return nil, errors.Errorf("%v", resp.Status())
  146. }
  147. return nil, errors.Errorf("%v, %v", resp.Status(), message)
  148. }
  149. }
  150. type GetWorkflowParams struct {
  151. Namespace string
  152. Name string
  153. }
  154. // GetWorkflow 获取指定的工作流
  155. func (c *Client) GetWorkflow(params GetWorkflowParams) (map[string]any, error) {
  156. responseMap := make(map[string]any)
  157. resp, err := c.restyClient.R().
  158. SetHeader("Content-Type", "application/json").
  159. SetAuthToken(c.token).
  160. SetPathParams(map[string]string{
  161. "namespace": params.Namespace,
  162. "name": params.Name,
  163. }).
  164. SetResult(&responseMap).
  165. SetError(&responseMap).
  166. Get(getWorkflowRelativeUrl)
  167. if err != nil {
  168. return nil, errors.New(err.Error())
  169. }
  170. switch resp.StatusCode() {
  171. case http.StatusOK:
  172. return responseMap, nil
  173. default:
  174. message, ok := responseMap["message"]
  175. if !ok {
  176. return nil, errors.Errorf("%v", resp.Status())
  177. }
  178. return nil, errors.Errorf("%v, %v", resp.Status(), message)
  179. }
  180. }
  181. type LintWorkflowParams struct {
  182. Namespace string
  183. WorkflowDefinition map[string]any
  184. }
  185. // LintWorkflow 检查工作流定义语法
  186. func (c *Client) LintWorkflow(params LintWorkflowParams) error {
  187. responseMap := make(map[string]any)
  188. resp, err := c.restyClient.R().
  189. SetHeader("Content-Type", "application/json").
  190. SetAuthToken(c.token).
  191. SetPathParams(map[string]string{
  192. "namespace": params.Namespace,
  193. }).
  194. SetBody(map[string]any{
  195. "namespace": params.Namespace,
  196. "workflow": params.WorkflowDefinition,
  197. }).
  198. SetResult(&responseMap).
  199. SetError(&responseMap).
  200. Post(lintWorkflowRelativeUrl)
  201. if err != nil {
  202. return errors.New(err.Error())
  203. }
  204. switch resp.StatusCode() {
  205. case http.StatusOK:
  206. return nil
  207. default:
  208. message, ok := responseMap["message"]
  209. if !ok {
  210. return errors.Errorf("%v", resp.Status())
  211. }
  212. return errors.Errorf("%v, %v", resp.Status(), message)
  213. }
  214. }
  215. type SubmitWorkflowFromWorkflowTemplateParams struct {
  216. Namespace string
  217. TemplateName string
  218. Parameters []string
  219. }
  220. // SubmitWorkflowFromWorkflowTemplate 基于工作流模板提交工作流
  221. func (c *Client) SubmitWorkflowFromWorkflowTemplate(params SubmitWorkflowFromWorkflowTemplateParams) (string, error) {
  222. responseMap := make(map[string]any)
  223. resp, err := c.restyClient.R().
  224. SetHeader("Content-Type", "application/json").
  225. SetAuthToken(c.token).
  226. SetPathParams(map[string]string{
  227. "namespace": params.Namespace,
  228. }).
  229. SetBody(map[string]any{
  230. "namespace": params.Namespace,
  231. "resourceKind": "WorkflowTemplate",
  232. "resourceName": params.TemplateName,
  233. "submitOptions": map[string]any{
  234. "parameters": params.Parameters,
  235. },
  236. }).
  237. SetResult(&responseMap).
  238. SetError(&responseMap).
  239. Post(submitWorkflowRelativeUrl)
  240. if err != nil {
  241. return "", errors.New(err.Error())
  242. }
  243. switch resp.StatusCode() {
  244. case http.StatusOK:
  245. metadata, ok := responseMap["metadata"]
  246. if !ok {
  247. return "", errors.New("metadata为空")
  248. }
  249. metadataMap, ok := metadata.(map[string]any)
  250. if !ok {
  251. return "", errors.New("metadata不是map")
  252. }
  253. workflowName, ok := metadataMap["name"]
  254. if !ok {
  255. return "", errors.New("metadata中没有工作流名称")
  256. }
  257. workflowNameStr, ok := workflowName.(string)
  258. if !ok {
  259. return "", errors.New("工作流名称不是字符串")
  260. }
  261. return workflowNameStr, nil
  262. default:
  263. message, ok := responseMap["message"]
  264. if !ok {
  265. return "", errors.Errorf("%v", resp.Status())
  266. }
  267. return "", errors.Errorf("%v, %v", resp.Status(), message)
  268. }
  269. }
  270. type ResubmitWorkflowParams struct {
  271. Namespace string
  272. Name string
  273. Memoized bool
  274. ResubmitParameters []string
  275. }
  276. // ResubmitWorkflow 重提交工作流
  277. // 有三种方式可以用来重复提交(可以结合使用):重新运行,基于缓存的和传递重提交参数
  278. // 基于缓存的必须在Error和Failed的工作流上才可以使用
  279. func (c *Client) ResubmitWorkflow(params ResubmitWorkflowParams) (string, error) {
  280. responseMap := make(map[string]any)
  281. resp, err := c.restyClient.R().
  282. SetHeader("Content-Type", "application/json").
  283. SetAuthToken(c.token).
  284. SetPathParams(map[string]string{
  285. "namespace": params.Namespace,
  286. "name": params.Name,
  287. }).
  288. SetBody(map[string]any{
  289. "namespace": params.Namespace,
  290. "name": params.Name,
  291. "memoized": params.Memoized,
  292. "parameters": params.ResubmitParameters,
  293. }).
  294. SetResult(&responseMap).
  295. SetError(&responseMap).
  296. Put(resubmitWorkflowRelativeUrl)
  297. if err != nil {
  298. return "", errors.New(err.Error())
  299. }
  300. switch resp.StatusCode() {
  301. case http.StatusOK:
  302. metadata, ok := responseMap["metadata"]
  303. if !ok {
  304. return "", errors.New("metadata为空")
  305. }
  306. metadataMap, ok := metadata.(map[string]any)
  307. if !ok {
  308. return "", errors.New("metadata不是map")
  309. }
  310. workflowName, ok := metadataMap["name"]
  311. if !ok {
  312. return "", errors.New("metadata中没有工作流名称")
  313. }
  314. workflowNameStr, ok := workflowName.(string)
  315. if !ok {
  316. return "", errors.New("工作流名称不是字符串")
  317. }
  318. return workflowNameStr, nil
  319. default:
  320. message, ok := responseMap["message"]
  321. if !ok {
  322. return "", errors.Errorf("%v", resp.Status())
  323. }
  324. return "", errors.Errorf("%v, %v", resp.Status(), message)
  325. }
  326. }
  327. type RetryWorkflowParams struct {
  328. Namespace string
  329. Name string
  330. RetryParameters []string
  331. RetryOnSuccessfulWorkflow bool
  332. RetryWorkflowNodeFieldSelector string
  333. }
  334. // RetryWorkflow 重新运行工作流(默认只能失败的工作流上重新运行)
  335. func (c *Client) RetryWorkflow(params RetryWorkflowParams) error {
  336. responseMap := make(map[string]any)
  337. resp, err := c.restyClient.R().
  338. SetHeader("Content-Type", "application/json").
  339. SetAuthToken(c.token).
  340. SetPathParams(map[string]string{
  341. "namespace": params.Namespace,
  342. "name": params.Name,
  343. }).
  344. SetBody(map[string]any{
  345. "namespace": params.Namespace,
  346. "name": params.Name,
  347. "parameters": params.RetryParameters,
  348. "restartSuccessful": params.RetryOnSuccessfulWorkflow,
  349. "nodeFieldSelector": params.RetryWorkflowNodeFieldSelector,
  350. }).
  351. SetResult(&responseMap).
  352. SetError(&responseMap).
  353. Put(retryWorkflowRelativeUrl)
  354. if err != nil {
  355. return errors.New(err.Error())
  356. }
  357. switch resp.StatusCode() {
  358. case http.StatusOK:
  359. return nil
  360. default:
  361. message, ok := responseMap["message"]
  362. if !ok {
  363. return errors.Errorf("%v", resp.Status())
  364. }
  365. return errors.Errorf("%v, %v", resp.Status(), message)
  366. }
  367. }
  368. type StopWorkflowParams struct {
  369. Namespace string
  370. Name string
  371. NodeFieldSelector string
  372. Message string
  373. }
  374. // StopWorkflow 终止工作流运行,会调用所有的退出处理器
  375. func (c *Client) StopWorkflow(params StopWorkflowParams) error {
  376. responseMap := make(map[string]any)
  377. resp, err := c.restyClient.R().
  378. SetHeader("Content-Type", "application/json").
  379. SetAuthToken(c.token).
  380. SetPathParams(map[string]string{
  381. "namespace": params.Namespace,
  382. "name": params.Name,
  383. }).
  384. SetBody(map[string]any{
  385. "namespace": params.Namespace,
  386. "name": params.Name,
  387. "nodeFieldSelector": params.NodeFieldSelector,
  388. "message": params.Message,
  389. }).
  390. SetResult(&responseMap).
  391. SetError(&responseMap).
  392. Put(stopWorkflowRelativeUrl)
  393. if err != nil {
  394. return errors.New(err.Error())
  395. }
  396. switch resp.StatusCode() {
  397. case http.StatusOK:
  398. return nil
  399. default:
  400. message, ok := responseMap["message"]
  401. if !ok {
  402. return errors.Errorf("%v", resp.Status())
  403. }
  404. return errors.Errorf("%v, %v", resp.Status(), message)
  405. }
  406. }
  407. type TerminateWorkflowParams struct {
  408. Namespace string
  409. Name string
  410. }
  411. // TerminateWorkflow 终止工作流运行,不调用所有的退出处理器
  412. func (c *Client) TerminateWorkflow(params TerminateWorkflowParams) error {
  413. responseMap := make(map[string]any)
  414. resp, err := c.restyClient.R().
  415. SetHeader("Content-Type", "application/json").
  416. SetAuthToken(c.token).
  417. SetPathParams(map[string]string{
  418. "namespace": params.Namespace,
  419. "name": params.Name,
  420. }).
  421. SetBody(map[string]any{
  422. "namespace": params.Namespace,
  423. "name": params.Name,
  424. }).
  425. SetResult(&responseMap).
  426. SetError(&responseMap).
  427. Put(terminateWorkflowRelativeUrl)
  428. if err != nil {
  429. return errors.New(err.Error())
  430. }
  431. switch resp.StatusCode() {
  432. case http.StatusOK:
  433. return nil
  434. default:
  435. message, ok := responseMap["message"]
  436. if !ok {
  437. return errors.Errorf("%v", resp.Status())
  438. }
  439. return errors.Errorf("%v, %v", resp.Status(), message)
  440. }
  441. }
  442. type SetWorkflowParams struct {
  443. Namespace string
  444. Name string
  445. NodeFieldSelector string
  446. OutputParameters string
  447. Message string
  448. Phase string
  449. }
  450. // SetWorkflow 设置工作流的参数
  451. func (c *Client) SetWorkflow(params SetWorkflowParams) error {
  452. responseMap := make(map[string]any)
  453. resp, err := c.restyClient.R().
  454. SetHeader("Content-Type", "application/json").
  455. SetAuthToken(c.token).
  456. SetPathParams(map[string]string{
  457. "namespace": params.Namespace,
  458. "name": params.Name,
  459. }).
  460. SetBody(map[string]any{
  461. "namespace": params.Namespace,
  462. "name": params.Name,
  463. "nodeFieldSelector": params.NodeFieldSelector,
  464. "outputParameters": params.OutputParameters,
  465. "message": params.Message,
  466. "phase": params.Phase,
  467. }).
  468. SetResult(&responseMap).
  469. SetError(&responseMap).
  470. Put(setWorkflowRelativeUrl)
  471. if err != nil {
  472. return errors.New(err.Error())
  473. }
  474. switch resp.StatusCode() {
  475. case http.StatusOK:
  476. return nil
  477. default:
  478. message, ok := responseMap["message"]
  479. if !ok {
  480. return errors.Errorf("%v", resp.Status())
  481. }
  482. return errors.Errorf("%v, %v", resp.Status(), message)
  483. }
  484. }
  485. type SuspendWorkflowParams struct {
  486. Namespace string
  487. Name string
  488. }
  489. // SuspendWorkflow 挂起工作流
  490. func (c *Client) SuspendWorkflow(params SuspendWorkflowParams) error {
  491. responseMap := make(map[string]any)
  492. resp, err := c.restyClient.R().
  493. SetHeader("Content-Type", "application/json").
  494. SetAuthToken(c.token).
  495. SetPathParams(map[string]string{
  496. "namespace": params.Namespace,
  497. "name": params.Name,
  498. }).
  499. SetBody(map[string]any{
  500. "namespace": params.Namespace,
  501. "name": params.Name,
  502. }).
  503. SetResult(&responseMap).
  504. SetError(&responseMap).
  505. Put(suspendWorkflowRelativeUrl)
  506. if err != nil {
  507. return errors.New(err.Error())
  508. }
  509. switch resp.StatusCode() {
  510. case http.StatusOK:
  511. return nil
  512. default:
  513. message, ok := responseMap["message"]
  514. if !ok {
  515. return errors.Errorf("%v", resp.Status())
  516. }
  517. return errors.Errorf("%v, %v", resp.Status(), message)
  518. }
  519. }
  520. type ResumeWorkflowParams struct {
  521. Namespace string
  522. Name string
  523. NodeFieldSelector string
  524. }
  525. // ResumeWorkflow 恢复被挂起的工作流
  526. func (c *Client) ResumeWorkflow(params ResumeWorkflowParams) error {
  527. responseMap := make(map[string]any)
  528. resp, err := c.restyClient.R().
  529. SetHeader("Content-Type", "application/json").
  530. SetAuthToken(c.token).
  531. SetPathParams(map[string]string{
  532. "namespace": params.Namespace,
  533. "name": params.Name,
  534. }).
  535. SetBody(map[string]any{
  536. "namespace": params.Namespace,
  537. "name": params.Name,
  538. }).
  539. SetResult(&responseMap).
  540. SetError(&responseMap).
  541. Put(resumeWorkflowRelativeUrl)
  542. if err != nil {
  543. return errors.New(err.Error())
  544. }
  545. switch resp.StatusCode() {
  546. case http.StatusOK:
  547. return nil
  548. default:
  549. message, ok := responseMap["message"]
  550. if !ok {
  551. return errors.Errorf("%v", resp.Status())
  552. }
  553. return errors.Errorf("%v, %v", resp.Status(), message)
  554. }
  555. }
  556. type EventCallback func(event map[string]any, eventErr error) error
  557. type GetWorkflowEventsStreamParams struct {
  558. Namespace string
  559. Name string
  560. }
  561. // GetWorkflowEventsStream 监听工作流事件
  562. func (c *Client) GetWorkflowEventsStream(params GetWorkflowEventsStreamParams, callback EventCallback) error {
  563. return nil
  564. }