workflow_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529
  1. package test
  2. import (
  3. "git.sxidc.com/go-tools/argo-api"
  4. "git.sxidc.com/go-tools/argo-api/client"
  5. "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
  6. "github.com/pkg/errors"
  7. "sync"
  8. "testing"
  9. "time"
  10. )
  11. func TestWorkflowBase(t *testing.T) {
  12. err := argo.Init(kubeConfig)
  13. if err != nil {
  14. t.Fatalf("%+v\n", err)
  15. }
  16. defer argo.Destroy()
  17. err = argo.GetInstance().LintWorkflow(client.LintWorkflowParams{
  18. Namespace: namespace,
  19. WorkflowDefinition: workflowYamlStr,
  20. })
  21. if err != nil {
  22. t.Fatalf("%+v\n", err)
  23. }
  24. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  25. Namespace: namespace,
  26. WorkflowDefinition: workflowYamlStr,
  27. })
  28. if err != nil {
  29. t.Fatalf("%+v\n", err)
  30. }
  31. defer func() {
  32. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  33. Namespace: namespace,
  34. Name: workflowName,
  35. })
  36. if err != nil {
  37. t.Fatalf("%+v\n", err)
  38. }
  39. }()
  40. wfs, err := argo.GetInstance().GetWorkflowsInNamespace(client.GetWorkflowsInNamespaceParams{
  41. Namespace: namespace,
  42. })
  43. if err != nil {
  44. t.Fatalf("%+v\n", err)
  45. }
  46. if len(wfs) != 1 {
  47. t.Fatalf("%+v\n", errors.Errorf("数量不一致: %v", len(wfs)))
  48. }
  49. wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
  50. Namespace: namespace,
  51. Name: workflowName,
  52. })
  53. if err != nil {
  54. t.Fatalf("%+v\n", err)
  55. }
  56. if wf.Namespace != wfs[0].Namespace {
  57. t.Fatalf("%+v\n", errors.Errorf("命名空间不一致: one: %s other: %s", wf.Namespace, wfs[0].Namespace))
  58. }
  59. if wf.Name != wfs[0].Name {
  60. t.Fatalf("%+v\n", errors.Errorf("名称不一致: one: %s other: %s", wf.Name, wfs[0].Name))
  61. }
  62. }
  63. func TestSubmitWorkflow(t *testing.T) {
  64. err := argo.Init(kubeConfig)
  65. if err != nil {
  66. t.Fatalf("%+v\n", err)
  67. }
  68. defer argo.Destroy()
  69. //err = argo.GetInstance().CreateWorkflowTemplate(client.CreateWorkflowTemplateParams{
  70. // Namespace: namespace,
  71. // TemplateDefinition: workflowTemplateWithParamsYamlStr,
  72. //})
  73. //if err != nil {
  74. // t.Fatalf("%+v\n", err)
  75. //}
  76. //
  77. //defer func() {
  78. // err := argo.GetInstance().DeleteWorkflowTemplate(client.DeleteWorkflowTemplateParams{
  79. // Namespace: namespace,
  80. // Name: workflowTemplateWithParamsName,
  81. // })
  82. // if err != nil {
  83. // t.Fatalf("%+v\n", err)
  84. // }
  85. //}()
  86. submitWorkflowName, err := argo.GetInstance().SubmitWorkflowFromWorkflowTemplate(client.SubmitWorkflowFromWorkflowTemplateParams{
  87. Namespace: "default",
  88. TemplateName: "approval-next-user-2",
  89. Parameters: []string{"message=Hello Submit"},
  90. Labels: "A-user-id=aaa,B-user-id=bbb,C-user-id=ccc",
  91. })
  92. if err != nil {
  93. t.Fatalf("%+v\n", err)
  94. }
  95. defer func() {
  96. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  97. Namespace: namespace,
  98. Name: submitWorkflowName,
  99. })
  100. if err != nil {
  101. t.Fatalf("%+v\n", err)
  102. }
  103. }()
  104. //wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
  105. // Namespace: namespace,
  106. // Name: submitWorkflowName,
  107. //})
  108. //if err != nil {
  109. // t.Fatalf("%+v\n", err)
  110. //}
  111. //
  112. //if wf.Spec.Arguments.Parameters[0].Name != "message" {
  113. // t.Fatalf("%+v\n", errors.Errorf("参数名称错误: %v", wf.Spec.Arguments.Parameters[0].Name))
  114. //}
  115. //
  116. //if wf.Spec.Arguments.Parameters[0].Value.String() != "Hello Submit" {
  117. // t.Fatalf("%+v\n", errors.Errorf("参数值错误: %v", wf.Spec.Arguments.Parameters[0].Value.String()))
  118. //}
  119. }
  120. func TestResubmitWorkflow(t *testing.T) {
  121. err := argo.Init(kubeConfig)
  122. if err != nil {
  123. t.Fatalf("%+v\n", err)
  124. }
  125. defer argo.Destroy()
  126. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  127. Namespace: namespace,
  128. WorkflowDefinition: workflowYamlStr,
  129. })
  130. if err != nil {
  131. t.Fatalf("%+v\n", err)
  132. }
  133. defer func() {
  134. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  135. Namespace: namespace,
  136. Name: workflowName,
  137. })
  138. if err != nil {
  139. t.Fatalf("%+v\n", err)
  140. }
  141. }()
  142. resubmittedWorkflowName, err := argo.GetInstance().ResubmitWorkflow(client.ResubmitWorkflowParams{
  143. Namespace: namespace,
  144. Name: workflowName,
  145. })
  146. if err != nil {
  147. t.Fatalf("%+v\n", err)
  148. }
  149. defer func() {
  150. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  151. Namespace: namespace,
  152. Name: resubmittedWorkflowName,
  153. })
  154. if err != nil {
  155. t.Fatalf("%+v\n", err)
  156. }
  157. }()
  158. }
  159. func TestRetryWorkflow(t *testing.T) {
  160. err := argo.Init(kubeConfig)
  161. if err != nil {
  162. t.Fatalf("%+v\n", err)
  163. }
  164. defer argo.Destroy()
  165. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  166. Namespace: namespace,
  167. WorkflowDefinition: workflowYamlStr,
  168. })
  169. if err != nil {
  170. t.Fatalf("%+v\n", err)
  171. }
  172. defer func() {
  173. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  174. Namespace: namespace,
  175. Name: workflowName,
  176. })
  177. if err != nil {
  178. t.Fatalf("%+v\n", err)
  179. }
  180. }()
  181. terminalCalled := false
  182. wg := sync.WaitGroup{}
  183. wg.Add(1)
  184. err = argo.GetInstance().RegisterWorkflowWatcher(namespace, workflowName, func(wf *v1alpha1.Workflow) {
  185. if terminalCalled {
  186. if wf.Status.Phase == "Running" {
  187. return
  188. }
  189. wg.Done()
  190. }
  191. })
  192. if err != nil {
  193. t.Fatalf("%+v\n", err)
  194. }
  195. defer argo.GetInstance().UnregisterWorkflowWatcher(namespace, workflowName)
  196. err = argo.GetInstance().TerminateWorkflow(client.TerminateWorkflowParams{
  197. Namespace: namespace,
  198. Name: workflowName,
  199. })
  200. if err != nil {
  201. t.Fatalf("%+v\n", err)
  202. }
  203. terminalCalled = true
  204. wg.Wait()
  205. err = argo.GetInstance().RetryWorkflow(client.RetryWorkflowParams{
  206. Namespace: namespace,
  207. Name: workflowName,
  208. })
  209. if err != nil {
  210. t.Fatalf("%+v\n", err)
  211. }
  212. }
  213. func TestStopWorkflow(t *testing.T) {
  214. err := argo.Init(kubeConfig)
  215. if err != nil {
  216. t.Fatalf("%+v\n", err)
  217. }
  218. defer argo.Destroy()
  219. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  220. Namespace: namespace,
  221. WorkflowDefinition: workflowYamlStr,
  222. })
  223. if err != nil {
  224. t.Fatalf("%+v\n", err)
  225. }
  226. defer func() {
  227. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  228. Namespace: namespace,
  229. Name: workflowName,
  230. })
  231. if err != nil {
  232. t.Fatalf("%+v\n", err)
  233. }
  234. }()
  235. stopCalled := false
  236. wg := sync.WaitGroup{}
  237. wg.Add(1)
  238. err = argo.GetInstance().RegisterWorkflowWatcher(namespace, workflowName, func(wf *v1alpha1.Workflow) {
  239. if stopCalled {
  240. if wf.Status.Phase == "Running" {
  241. return
  242. }
  243. wg.Done()
  244. }
  245. })
  246. if err != nil {
  247. t.Fatalf("%+v\n", err)
  248. }
  249. defer argo.GetInstance().UnregisterWorkflowWatcher(namespace, workflowName)
  250. err = argo.GetInstance().StopWorkflow(client.StopWorkflowParams{
  251. Namespace: namespace,
  252. Name: workflowName,
  253. })
  254. if err != nil {
  255. t.Fatalf("%+v\n", err)
  256. }
  257. stopCalled = true
  258. wg.Wait()
  259. }
  260. func TestTerminateWorkflow(t *testing.T) {
  261. err := argo.Init(kubeConfig)
  262. if err != nil {
  263. t.Fatalf("%+v\n", err)
  264. }
  265. defer argo.Destroy()
  266. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  267. Namespace: namespace,
  268. WorkflowDefinition: workflowYamlStr,
  269. })
  270. if err != nil {
  271. t.Fatalf("%+v\n", err)
  272. }
  273. defer func() {
  274. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  275. Namespace: namespace,
  276. Name: workflowName,
  277. })
  278. if err != nil {
  279. t.Fatalf("%+v\n", err)
  280. }
  281. }()
  282. terminalCalled := false
  283. wg := sync.WaitGroup{}
  284. wg.Add(1)
  285. err = argo.GetInstance().RegisterWorkflowWatcher(namespace, workflowName, func(wf *v1alpha1.Workflow) {
  286. if terminalCalled {
  287. if wf.Status.Phase == "Running" {
  288. return
  289. }
  290. wg.Done()
  291. }
  292. })
  293. if err != nil {
  294. t.Fatalf("%+v\n", err)
  295. }
  296. defer argo.GetInstance().UnregisterWorkflowWatcher(namespace, workflowName)
  297. err = argo.GetInstance().TerminateWorkflow(client.TerminateWorkflowParams{
  298. Namespace: namespace,
  299. Name: workflowName,
  300. })
  301. if err != nil {
  302. t.Fatalf("%+v\n", err)
  303. }
  304. terminalCalled = true
  305. wg.Wait()
  306. }
  307. func TestSetWorkflow(t *testing.T) {
  308. err := argo.Init(kubeConfig)
  309. if err != nil {
  310. t.Fatalf("%+v\n", err)
  311. }
  312. defer argo.Destroy()
  313. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  314. Namespace: namespace,
  315. WorkflowDefinition: workflowYamlStr,
  316. })
  317. if err != nil {
  318. t.Fatalf("%+v\n", err)
  319. }
  320. defer func() {
  321. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  322. Namespace: namespace,
  323. Name: workflowName,
  324. })
  325. if err != nil {
  326. t.Fatalf("%+v\n", err)
  327. }
  328. }()
  329. for {
  330. wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
  331. Namespace: namespace,
  332. Name: workflowName,
  333. })
  334. if err != nil {
  335. t.Fatalf("%+v\n", err)
  336. }
  337. findA := false
  338. for _, node := range wf.Status.Nodes {
  339. if node.DisplayName == "A" {
  340. findA = true
  341. }
  342. }
  343. if findA {
  344. break
  345. }
  346. }
  347. err = argo.GetInstance().SetWorkflow(client.SetWorkflowParams{
  348. Namespace: namespace,
  349. Name: workflowName,
  350. NodeFieldSelector: "displayName=A",
  351. OutputParameters: `{"front_json": "pass"}`,
  352. })
  353. if err != nil {
  354. t.Fatalf("%+v\n", err)
  355. }
  356. wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
  357. Namespace: namespace,
  358. Name: workflowName,
  359. })
  360. if err != nil {
  361. t.Fatalf("%+v\n", err)
  362. }
  363. for _, node := range wf.Status.Nodes {
  364. if node.DisplayName == "A" {
  365. if node.Outputs.Parameters[0].Name == "front_json" && node.Outputs.Parameters[0].Value.String() == "pass" {
  366. return
  367. }
  368. t.Fatalf("%+v\n", errors.Errorf("输出参数不正确: %v", node.Outputs.Parameters[0].Value.String()))
  369. }
  370. }
  371. t.Fatalf("%+v\n", errors.New("找不到对应的节点"))
  372. }
  373. func TestSuspendAndResumeWorkflow(t *testing.T) {
  374. err := argo.Init(kubeConfig)
  375. if err != nil {
  376. t.Fatalf("%+v\n", err)
  377. }
  378. defer argo.Destroy()
  379. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  380. Namespace: namespace,
  381. WorkflowDefinition: workflowWithParamsYamlStr,
  382. })
  383. if err != nil {
  384. t.Fatalf("%+v\n", err)
  385. }
  386. defer func() {
  387. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  388. Namespace: namespace,
  389. Name: workflowName,
  390. })
  391. if err != nil {
  392. t.Fatalf("%+v\n", err)
  393. }
  394. }()
  395. suspendCalled := false
  396. wg1 := sync.WaitGroup{}
  397. wg1.Add(1)
  398. err = argo.GetInstance().RegisterWorkflowWatcher(namespace, workflowName, func(wf *v1alpha1.Workflow) {
  399. if suspendCalled {
  400. if wf.Status.Phase != "Running" {
  401. return
  402. }
  403. if wf.Spec.Suspend == nil || *wf.Spec.Suspend != true {
  404. return
  405. }
  406. wg1.Done()
  407. }
  408. })
  409. if err != nil {
  410. t.Fatalf("%+v\n", err)
  411. }
  412. defer argo.GetInstance().UnregisterWorkflowWatcher(namespace, workflowName)
  413. err = argo.GetInstance().SuspendWorkflow(client.SuspendWorkflowParams{
  414. Namespace: namespace,
  415. Name: workflowName,
  416. })
  417. if err != nil {
  418. t.Fatalf("%+v\n", err)
  419. }
  420. suspendCalled = true
  421. wg1.Wait()
  422. err = argo.GetInstance().ResumeWorkflow(client.ResumeWorkflowParams{
  423. Namespace: namespace,
  424. Name: workflowName,
  425. })
  426. if err != nil {
  427. t.Fatalf("%+v\n", err)
  428. }
  429. // resume后不会推送消息
  430. for {
  431. wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
  432. Namespace: namespace,
  433. Name: workflowName,
  434. })
  435. if err != nil {
  436. t.Fatalf("%+v\n", err)
  437. }
  438. if wf.Spec.Suspend == nil || *wf.Spec.Suspend == false {
  439. break
  440. }
  441. time.Sleep(1 * time.Second)
  442. }
  443. }