workflow_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. package test
  2. import (
  3. "git.sxidc.com/go-tools/argo-api"
  4. "git.sxidc.com/go-tools/argo-api/client"
  5. "github.com/pkg/errors"
  6. "gopkg.in/yaml.v3"
  7. "testing"
  8. "time"
  9. )
  10. func TestWorkflowBase(t *testing.T) {
  11. argo.Init(baseUrl, token, client.WithTimeoutSec(10))
  12. defer argo.Destroy()
  13. workflowDefinition := make(map[string]any)
  14. err := yaml.Unmarshal(workflowYamlStr, &workflowDefinition)
  15. if err != nil {
  16. panic(err)
  17. }
  18. err = argo.GetInstance().LintWorkflow(client.LintWorkflowParams{
  19. Namespace: namespace,
  20. WorkflowDefinition: workflowDefinition,
  21. })
  22. if err != nil {
  23. t.Fatalf("%+v\n", err)
  24. }
  25. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  26. Namespace: namespace,
  27. WorkflowDefinition: workflowDefinition,
  28. })
  29. if err != nil {
  30. t.Fatalf("%+v\n", err)
  31. }
  32. defer func() {
  33. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  34. Namespace: namespace,
  35. Name: workflowName,
  36. })
  37. if err != nil {
  38. t.Fatalf("%+v\n", err)
  39. }
  40. }()
  41. createdWorkflowDefinitions, err := argo.GetInstance().GetWorkflowsInNamespace(client.GetWorkflowsInNamespaceParams{
  42. Namespace: namespace,
  43. })
  44. if err != nil {
  45. t.Fatalf("%+v\n", err)
  46. }
  47. if len(createdWorkflowDefinitions) != 1 {
  48. t.Fatalf("%+v\n", errors.Errorf("数量不一致: %v", len(createdWorkflowDefinitions)))
  49. }
  50. createdWorkflowDefinition, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
  51. Namespace: namespace,
  52. Name: workflowName,
  53. })
  54. if err != nil {
  55. t.Fatalf("%+v\n", err)
  56. }
  57. compareDefinitionMap(t, createdWorkflowDefinition, createdWorkflowDefinitions[0])
  58. compareDefinitionMap(t, createdWorkflowDefinitions[0], createdWorkflowDefinition)
  59. }
  60. func TestSubmitWorkflow(t *testing.T) {
  61. argo.Init(baseUrl, token, client.WithTimeoutSec(10))
  62. defer argo.Destroy()
  63. templateDefinition := make(map[string]any)
  64. err := yaml.Unmarshal(workflowTemplateWithParamsYamlStr, &templateDefinition)
  65. if err != nil {
  66. panic(err)
  67. }
  68. err = argo.GetInstance().CreateWorkflowTemplate(client.CreateWorkflowTemplateParams{
  69. Namespace: namespace,
  70. TemplateDefinition: templateDefinition,
  71. })
  72. if err != nil {
  73. t.Fatalf("%+v\n", err)
  74. }
  75. defer func() {
  76. err := argo.GetInstance().DeleteWorkflowTemplate(client.DeleteWorkflowTemplateParams{
  77. Namespace: namespace,
  78. Name: workflowTemplateWithParamsName,
  79. })
  80. if err != nil {
  81. t.Fatalf("%+v\n", err)
  82. }
  83. }()
  84. submitWorkflowName, err := argo.GetInstance().SubmitWorkflowFromWorkflowTemplate(client.SubmitWorkflowFromWorkflowTemplateParams{
  85. Namespace: namespace,
  86. TemplateName: workflowTemplateWithParamsName,
  87. Parameters: []string{"message=Hello Submit"},
  88. })
  89. if err != nil {
  90. t.Fatalf("%+v\n", err)
  91. }
  92. defer func() {
  93. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  94. Namespace: namespace,
  95. Name: submitWorkflowName,
  96. })
  97. if err != nil {
  98. t.Fatalf("%+v\n", err)
  99. }
  100. }()
  101. submittedWorkflowDefinition, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
  102. Namespace: namespace,
  103. Name: submitWorkflowName,
  104. })
  105. if err != nil {
  106. t.Fatalf("%+v\n", err)
  107. }
  108. specMap := submittedWorkflowDefinition["spec"].(map[string]any)
  109. argumentMap := specMap["arguments"].(map[string]any)
  110. parametersMap := argumentMap["parameters"].([]any)
  111. if parametersMap[0].(map[string]any)["name"] != "message" {
  112. t.Fatalf("%+v\n", errors.Errorf("参数名称错误: %v", parametersMap[0].(map[string]any)["name"]))
  113. }
  114. if parametersMap[0].(map[string]any)["value"] != "Hello Submit" {
  115. t.Fatalf("%+v\n", errors.Errorf("参数值错误: %v", parametersMap[0].(map[string]any)["value"]))
  116. }
  117. }
  118. func TestResubmitWorkflow(t *testing.T) {
  119. argo.Init(baseUrl, token, client.WithTimeoutSec(10))
  120. defer argo.Destroy()
  121. workflowDefinition := make(map[string]any)
  122. err := yaml.Unmarshal(workflowYamlStr, &workflowDefinition)
  123. if err != nil {
  124. panic(err)
  125. }
  126. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  127. Namespace: namespace,
  128. WorkflowDefinition: workflowDefinition,
  129. })
  130. if err != nil {
  131. t.Fatalf("%+v\n", err)
  132. }
  133. defer func() {
  134. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  135. Namespace: namespace,
  136. Name: workflowName,
  137. })
  138. if err != nil {
  139. t.Fatalf("%+v\n", err)
  140. }
  141. }()
  142. resubmittedWorkflowName, err := argo.GetInstance().ResubmitWorkflow(client.ResubmitWorkflowParams{
  143. Namespace: namespace,
  144. Name: workflowName,
  145. })
  146. if err != nil {
  147. t.Fatalf("%+v\n", err)
  148. }
  149. defer func() {
  150. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  151. Namespace: namespace,
  152. Name: resubmittedWorkflowName,
  153. })
  154. if err != nil {
  155. t.Fatalf("%+v\n", err)
  156. }
  157. }()
  158. }
  159. func TestRetryWorkflow(t *testing.T) {
  160. argo.Init(baseUrl, token, client.WithTimeoutSec(10))
  161. defer argo.Destroy()
  162. workflowDefinition := make(map[string]any)
  163. err := yaml.Unmarshal(workflowYamlStr, &workflowDefinition)
  164. if err != nil {
  165. panic(err)
  166. }
  167. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  168. Namespace: namespace,
  169. WorkflowDefinition: workflowDefinition,
  170. })
  171. if err != nil {
  172. t.Fatalf("%+v\n", err)
  173. }
  174. defer func() {
  175. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  176. Namespace: namespace,
  177. Name: workflowName,
  178. })
  179. if err != nil {
  180. t.Fatalf("%+v\n", err)
  181. }
  182. }()
  183. err = argo.GetInstance().TerminateWorkflow(client.TerminateWorkflowParams{
  184. Namespace: namespace,
  185. Name: workflowName,
  186. })
  187. if err != nil {
  188. t.Fatalf("%+v\n", err)
  189. }
  190. for {
  191. currentWorkflowDefinitions, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
  192. Namespace: namespace,
  193. Name: workflowName,
  194. })
  195. if err != nil {
  196. t.Fatalf("%+v\n", err)
  197. }
  198. if currentWorkflowDefinitions["status"].(map[string]any)["phase"] != "Running" {
  199. break
  200. }
  201. time.Sleep(1 * time.Second)
  202. }
  203. err = argo.GetInstance().RetryWorkflow(client.RetryWorkflowParams{
  204. Namespace: namespace,
  205. Name: workflowName,
  206. })
  207. if err != nil {
  208. t.Fatalf("%+v\n", err)
  209. }
  210. }
  211. func TestStopWorkflow(t *testing.T) {
  212. argo.Init(baseUrl, token, client.WithTimeoutSec(10))
  213. defer argo.Destroy()
  214. workflowDefinition := make(map[string]any)
  215. err := yaml.Unmarshal(workflowYamlStr, &workflowDefinition)
  216. if err != nil {
  217. panic(err)
  218. }
  219. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  220. Namespace: namespace,
  221. WorkflowDefinition: workflowDefinition,
  222. })
  223. if err != nil {
  224. t.Fatalf("%+v\n", err)
  225. }
  226. defer func() {
  227. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  228. Namespace: namespace,
  229. Name: workflowName,
  230. })
  231. if err != nil {
  232. t.Fatalf("%+v\n", err)
  233. }
  234. }()
  235. err = argo.GetInstance().StopWorkflow(client.StopWorkflowParams{
  236. Namespace: namespace,
  237. Name: workflowName,
  238. })
  239. if err != nil {
  240. t.Fatalf("%+v\n", err)
  241. }
  242. for {
  243. currentWorkflowDefinitions, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
  244. Namespace: namespace,
  245. Name: workflowName,
  246. })
  247. if err != nil {
  248. t.Fatalf("%+v\n", err)
  249. }
  250. if currentWorkflowDefinitions["status"].(map[string]any)["phase"] != "Running" {
  251. break
  252. }
  253. time.Sleep(1 * time.Second)
  254. }
  255. }
  256. func TestTerminateWorkflow(t *testing.T) {
  257. argo.Init(baseUrl, token, client.WithTimeoutSec(10))
  258. defer argo.Destroy()
  259. workflowDefinition := make(map[string]any)
  260. err := yaml.Unmarshal(workflowYamlStr, &workflowDefinition)
  261. if err != nil {
  262. panic(err)
  263. }
  264. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  265. Namespace: namespace,
  266. WorkflowDefinition: workflowDefinition,
  267. })
  268. if err != nil {
  269. t.Fatalf("%+v\n", err)
  270. }
  271. defer func() {
  272. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  273. Namespace: namespace,
  274. Name: workflowName,
  275. })
  276. if err != nil {
  277. t.Fatalf("%+v\n", err)
  278. }
  279. }()
  280. err = argo.GetInstance().TerminateWorkflow(client.TerminateWorkflowParams{
  281. Namespace: namespace,
  282. Name: workflowName,
  283. })
  284. if err != nil {
  285. t.Fatalf("%+v\n", err)
  286. }
  287. for {
  288. currentWorkflowDefinitions, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
  289. Namespace: namespace,
  290. Name: workflowName,
  291. })
  292. if err != nil {
  293. t.Fatalf("%+v\n", err)
  294. }
  295. if currentWorkflowDefinitions["status"].(map[string]any)["phase"] != "Running" {
  296. break
  297. }
  298. time.Sleep(1 * time.Second)
  299. }
  300. }
  301. func TestSetWorkflow(t *testing.T) {
  302. argo.Init(baseUrl, token, client.WithTimeoutSec(10))
  303. defer argo.Destroy()
  304. workflowDefinition := make(map[string]any)
  305. err := yaml.Unmarshal(workflowYamlStr, &workflowDefinition)
  306. if err != nil {
  307. panic(err)
  308. }
  309. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  310. Namespace: namespace,
  311. WorkflowDefinition: workflowDefinition,
  312. })
  313. if err != nil {
  314. t.Fatalf("%+v\n", err)
  315. }
  316. defer func() {
  317. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  318. Namespace: namespace,
  319. Name: workflowName,
  320. })
  321. if err != nil {
  322. t.Fatalf("%+v\n", err)
  323. }
  324. }()
  325. for {
  326. currentWorkflowDefinitions, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
  327. Namespace: namespace,
  328. Name: workflowName,
  329. })
  330. if err != nil {
  331. t.Fatalf("%+v\n", err)
  332. }
  333. nodes, ok := currentWorkflowDefinitions["status"].(map[string]any)["nodes"]
  334. if !ok {
  335. time.Sleep(1 * time.Second)
  336. continue
  337. }
  338. findA := false
  339. nodesMap := nodes.(map[string]any)
  340. for _, node := range nodesMap {
  341. nodeMap := node.(map[string]any)
  342. if nodeMap["displayName"].(string) == "A" {
  343. findA = true
  344. }
  345. }
  346. if findA {
  347. break
  348. }
  349. time.Sleep(1 * time.Second)
  350. }
  351. err = argo.GetInstance().SetWorkflow(client.SetWorkflowParams{
  352. Namespace: namespace,
  353. Name: workflowName,
  354. NodeFieldSelector: "displayName=A",
  355. OutputParameters: `{"approve": "pass"}`,
  356. })
  357. if err != nil {
  358. t.Fatalf("%+v\n", err)
  359. }
  360. currentWorkflowDefinitions, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
  361. Namespace: namespace,
  362. Name: workflowName,
  363. })
  364. if err != nil {
  365. t.Fatalf("%+v\n", err)
  366. }
  367. nodesMap := currentWorkflowDefinitions["status"].(map[string]any)["nodes"].(map[string]any)
  368. for _, node := range nodesMap {
  369. nodeMap := node.(map[string]any)
  370. if nodeMap["displayName"].(string) == "A" {
  371. outputParams := nodeMap["outputs"].(map[string]any)["parameters"].([]any)
  372. approveParamsMap := outputParams[0].(map[string]any)
  373. if approveParamsMap["name"] == "approve" && approveParamsMap["value"] == "pass" {
  374. return
  375. }
  376. t.Fatalf("%+v\n", errors.Errorf("输出参数不正确: %v", approveParamsMap["value"]))
  377. }
  378. }
  379. t.Fatalf("%+v\n", errors.New("找不到对应的节点"))
  380. }