workflow_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540
  1. package test
  2. import (
  3. "git.sxidc.com/go-tools/argo-api"
  4. "git.sxidc.com/go-tools/argo-api/client"
  5. "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
  6. "github.com/pkg/errors"
  7. "sync"
  8. "testing"
  9. "time"
  10. )
  11. func TestWorkflowBase(t *testing.T) {
  12. err := argo.Init(kubeConfig)
  13. if err != nil {
  14. t.Fatalf("%+v\n", err)
  15. }
  16. defer argo.Destroy()
  17. err = argo.GetInstance().LintWorkflow(client.LintWorkflowParams{
  18. Namespace: namespace,
  19. WorkflowDefinition: workflowYamlStr,
  20. })
  21. if err != nil {
  22. t.Fatalf("%+v\n", err)
  23. }
  24. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  25. Namespace: namespace,
  26. WorkflowDefinition: workflowYamlStr,
  27. })
  28. if err != nil {
  29. t.Fatalf("%+v\n", err)
  30. }
  31. defer func() {
  32. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  33. Namespace: namespace,
  34. Name: workflowName,
  35. })
  36. if err != nil {
  37. t.Fatalf("%+v\n", err)
  38. }
  39. }()
  40. wfs, err := argo.GetInstance().GetWorkflowsInNamespace(client.GetWorkflowsInNamespaceParams{
  41. Namespace: namespace,
  42. })
  43. if err != nil {
  44. t.Fatalf("%+v\n", err)
  45. }
  46. if len(wfs) != 1 {
  47. t.Fatalf("%+v\n", errors.Errorf("数量不一致: %v", len(wfs)))
  48. }
  49. wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
  50. Namespace: namespace,
  51. Name: workflowName,
  52. })
  53. if err != nil {
  54. t.Fatalf("%+v\n", err)
  55. }
  56. if wf.Namespace != wfs[0].Namespace {
  57. t.Fatalf("%+v\n", errors.Errorf("命名空间不一致: one: %s other: %s", wf.Namespace, wfs[0].Namespace))
  58. }
  59. if wf.Name != wfs[0].Name {
  60. t.Fatalf("%+v\n", errors.Errorf("名称不一致: one: %s other: %s", wf.Name, wfs[0].Name))
  61. }
  62. }
  63. func TestSubmitWorkflow(t *testing.T) {
  64. err := argo.Init(kubeConfig)
  65. if err != nil {
  66. t.Fatalf("%+v\n", err)
  67. }
  68. defer argo.Destroy()
  69. err = argo.GetInstance().CreateWorkflowTemplate(client.CreateWorkflowTemplateParams{
  70. Namespace: namespace,
  71. TemplateDefinition: workflowTemplateWithParamsYamlStr,
  72. })
  73. if err != nil {
  74. t.Fatalf("%+v\n", err)
  75. }
  76. defer func() {
  77. err := argo.GetInstance().DeleteWorkflowTemplate(client.DeleteWorkflowTemplateParams{
  78. Namespace: namespace,
  79. Name: workflowTemplateWithParamsName,
  80. })
  81. if err != nil {
  82. t.Fatalf("%+v\n", err)
  83. }
  84. }()
  85. submitWorkflowName, err := argo.GetInstance().SubmitWorkflowFromWorkflowTemplate(client.SubmitWorkflowFromWorkflowTemplateParams{
  86. Namespace: namespace,
  87. TemplateName: workflowTemplateWithParamsName,
  88. Parameters: []string{"message=Hello Submit"},
  89. })
  90. if err != nil {
  91. t.Fatalf("%+v\n", err)
  92. }
  93. defer func() {
  94. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  95. Namespace: namespace,
  96. Name: submitWorkflowName,
  97. })
  98. if err != nil {
  99. t.Fatalf("%+v\n", err)
  100. }
  101. }()
  102. wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
  103. Namespace: namespace,
  104. Name: submitWorkflowName,
  105. })
  106. if err != nil {
  107. t.Fatalf("%+v\n", err)
  108. }
  109. if wf.Spec.Arguments.Parameters[0].Name != "message" {
  110. t.Fatalf("%+v\n", errors.Errorf("参数名称错误: %v", wf.Spec.Arguments.Parameters[0].Name))
  111. }
  112. if wf.Spec.Arguments.Parameters[0].Value.String() != "Hello Submit" {
  113. t.Fatalf("%+v\n", errors.Errorf("参数值错误: %v", wf.Spec.Arguments.Parameters[0].Value.String()))
  114. }
  115. }
  116. func TestResubmitWorkflow(t *testing.T) {
  117. err := argo.Init(kubeConfig)
  118. if err != nil {
  119. t.Fatalf("%+v\n", err)
  120. }
  121. defer argo.Destroy()
  122. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  123. Namespace: namespace,
  124. WorkflowDefinition: workflowYamlStr,
  125. })
  126. if err != nil {
  127. t.Fatalf("%+v\n", err)
  128. }
  129. defer func() {
  130. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  131. Namespace: namespace,
  132. Name: workflowName,
  133. })
  134. if err != nil {
  135. t.Fatalf("%+v\n", err)
  136. }
  137. }()
  138. resubmittedWorkflowName, err := argo.GetInstance().ResubmitWorkflow(client.ResubmitWorkflowParams{
  139. Namespace: namespace,
  140. Name: workflowName,
  141. })
  142. if err != nil {
  143. t.Fatalf("%+v\n", err)
  144. }
  145. defer func() {
  146. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  147. Namespace: namespace,
  148. Name: resubmittedWorkflowName,
  149. })
  150. if err != nil {
  151. t.Fatalf("%+v\n", err)
  152. }
  153. }()
  154. }
  155. func TestRetryWorkflow(t *testing.T) {
  156. err := argo.Init(kubeConfig)
  157. if err != nil {
  158. t.Fatalf("%+v\n", err)
  159. }
  160. defer argo.Destroy()
  161. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  162. Namespace: namespace,
  163. WorkflowDefinition: workflowYamlStr,
  164. })
  165. if err != nil {
  166. t.Fatalf("%+v\n", err)
  167. }
  168. defer func() {
  169. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  170. Namespace: namespace,
  171. Name: workflowName,
  172. })
  173. if err != nil {
  174. t.Fatalf("%+v\n", err)
  175. }
  176. }()
  177. terminalCalled := false
  178. wg := sync.WaitGroup{}
  179. wg.Add(1)
  180. token, err := argo.GetInstance().WatchWorkflow(client.WatchWorkflowParams{
  181. Namespace: namespace,
  182. Name: workflowName,
  183. }, func(wf *v1alpha1.Workflow) {
  184. if terminalCalled {
  185. if wf.Status.Phase == "Running" {
  186. return
  187. }
  188. wg.Done()
  189. }
  190. })
  191. if err != nil {
  192. t.Fatalf("%+v\n", err)
  193. }
  194. defer token.Done()
  195. err = argo.GetInstance().TerminateWorkflow(client.TerminateWorkflowParams{
  196. Namespace: namespace,
  197. Name: workflowName,
  198. })
  199. if err != nil {
  200. t.Fatalf("%+v\n", err)
  201. }
  202. terminalCalled = true
  203. wg.Wait()
  204. err = argo.GetInstance().RetryWorkflow(client.RetryWorkflowParams{
  205. Namespace: namespace,
  206. Name: workflowName,
  207. })
  208. if err != nil {
  209. t.Fatalf("%+v\n", err)
  210. }
  211. }
  212. func TestStopWorkflow(t *testing.T) {
  213. err := argo.Init(kubeConfig)
  214. if err != nil {
  215. t.Fatalf("%+v\n", err)
  216. }
  217. defer argo.Destroy()
  218. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  219. Namespace: namespace,
  220. WorkflowDefinition: workflowYamlStr,
  221. })
  222. if err != nil {
  223. t.Fatalf("%+v\n", err)
  224. }
  225. defer func() {
  226. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  227. Namespace: namespace,
  228. Name: workflowName,
  229. })
  230. if err != nil {
  231. t.Fatalf("%+v\n", err)
  232. }
  233. }()
  234. stopCalled := false
  235. wg := sync.WaitGroup{}
  236. wg.Add(1)
  237. token, err := argo.GetInstance().WatchWorkflow(client.WatchWorkflowParams{
  238. Namespace: namespace,
  239. Name: workflowName,
  240. }, func(wf *v1alpha1.Workflow) {
  241. if stopCalled {
  242. if wf.Status.Phase == "Running" {
  243. return
  244. }
  245. wg.Done()
  246. }
  247. })
  248. if err != nil {
  249. t.Fatalf("%+v\n", err)
  250. }
  251. defer token.Done()
  252. err = argo.GetInstance().StopWorkflow(client.StopWorkflowParams{
  253. Namespace: namespace,
  254. Name: workflowName,
  255. })
  256. if err != nil {
  257. t.Fatalf("%+v\n", err)
  258. }
  259. stopCalled = true
  260. wg.Wait()
  261. }
  262. func TestTerminateWorkflow(t *testing.T) {
  263. err := argo.Init(kubeConfig)
  264. if err != nil {
  265. t.Fatalf("%+v\n", err)
  266. }
  267. defer argo.Destroy()
  268. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  269. Namespace: namespace,
  270. WorkflowDefinition: workflowYamlStr,
  271. })
  272. if err != nil {
  273. t.Fatalf("%+v\n", err)
  274. }
  275. defer func() {
  276. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  277. Namespace: namespace,
  278. Name: workflowName,
  279. })
  280. if err != nil {
  281. t.Fatalf("%+v\n", err)
  282. }
  283. }()
  284. terminalCalled := false
  285. wg := sync.WaitGroup{}
  286. wg.Add(1)
  287. token, err := argo.GetInstance().WatchWorkflow(client.WatchWorkflowParams{
  288. Namespace: namespace,
  289. Name: workflowName,
  290. }, func(wf *v1alpha1.Workflow) {
  291. if terminalCalled {
  292. if wf.Status.Phase == "Running" {
  293. return
  294. }
  295. wg.Done()
  296. }
  297. })
  298. if err != nil {
  299. t.Fatalf("%+v\n", err)
  300. }
  301. defer token.Done()
  302. err = argo.GetInstance().TerminateWorkflow(client.TerminateWorkflowParams{
  303. Namespace: namespace,
  304. Name: workflowName,
  305. })
  306. if err != nil {
  307. t.Fatalf("%+v\n", err)
  308. }
  309. terminalCalled = true
  310. wg.Wait()
  311. }
  312. func TestSetWorkflow(t *testing.T) {
  313. err := argo.Init(kubeConfig)
  314. if err != nil {
  315. t.Fatalf("%+v\n", err)
  316. }
  317. defer argo.Destroy()
  318. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  319. Namespace: namespace,
  320. WorkflowDefinition: workflowYamlStr,
  321. })
  322. if err != nil {
  323. t.Fatalf("%+v\n", err)
  324. }
  325. defer func() {
  326. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  327. Namespace: namespace,
  328. Name: workflowName,
  329. })
  330. if err != nil {
  331. t.Fatalf("%+v\n", err)
  332. }
  333. }()
  334. for {
  335. wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
  336. Namespace: namespace,
  337. Name: workflowName,
  338. })
  339. if err != nil {
  340. t.Fatalf("%+v\n", err)
  341. }
  342. findA := false
  343. for _, node := range wf.Status.Nodes {
  344. if node.DisplayName == "A" {
  345. findA = true
  346. }
  347. }
  348. if findA {
  349. break
  350. }
  351. }
  352. err = argo.GetInstance().SetWorkflow(client.SetWorkflowParams{
  353. Namespace: namespace,
  354. Name: workflowName,
  355. NodeFieldSelector: "displayName=A",
  356. OutputParameters: `{"approve": "pass"}`,
  357. })
  358. if err != nil {
  359. t.Fatalf("%+v\n", err)
  360. }
  361. wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
  362. Namespace: namespace,
  363. Name: workflowName,
  364. })
  365. if err != nil {
  366. t.Fatalf("%+v\n", err)
  367. }
  368. for _, node := range wf.Status.Nodes {
  369. if node.DisplayName == "A" {
  370. if node.Outputs.Parameters[0].Name == "approve" && node.Outputs.Parameters[0].Value.String() == "pass" {
  371. return
  372. }
  373. t.Fatalf("%+v\n", errors.Errorf("输出参数不正确: %v", node.Outputs.Parameters[0].Value.String()))
  374. }
  375. }
  376. t.Fatalf("%+v\n", errors.New("找不到对应的节点"))
  377. }
  378. func TestSuspendAndResumeWorkflow(t *testing.T) {
  379. err := argo.Init(kubeConfig)
  380. if err != nil {
  381. t.Fatalf("%+v\n", err)
  382. }
  383. defer argo.Destroy()
  384. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  385. Namespace: namespace,
  386. WorkflowDefinition: workflowWithParamsYamlStr,
  387. })
  388. if err != nil {
  389. t.Fatalf("%+v\n", err)
  390. }
  391. defer func() {
  392. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  393. Namespace: namespace,
  394. Name: workflowName,
  395. })
  396. if err != nil {
  397. t.Fatalf("%+v\n", err)
  398. }
  399. }()
  400. suspendCalled := false
  401. wg1 := sync.WaitGroup{}
  402. wg1.Add(1)
  403. token, err := argo.GetInstance().WatchWorkflow(client.WatchWorkflowParams{
  404. Namespace: namespace,
  405. Name: workflowName,
  406. }, func(wf *v1alpha1.Workflow) {
  407. if suspendCalled {
  408. if wf.Status.Phase != "Running" {
  409. return
  410. }
  411. if wf.Spec.Suspend == nil || *wf.Spec.Suspend != true {
  412. return
  413. }
  414. wg1.Done()
  415. }
  416. })
  417. if err != nil {
  418. t.Fatalf("%+v\n", err)
  419. }
  420. defer token.Done()
  421. err = argo.GetInstance().SuspendWorkflow(client.SuspendWorkflowParams{
  422. Namespace: namespace,
  423. Name: workflowName,
  424. })
  425. if err != nil {
  426. t.Fatalf("%+v\n", err)
  427. }
  428. suspendCalled = true
  429. wg1.Wait()
  430. err = argo.GetInstance().ResumeWorkflow(client.ResumeWorkflowParams{
  431. Namespace: namespace,
  432. Name: workflowName,
  433. })
  434. if err != nil {
  435. t.Fatalf("%+v\n", err)
  436. }
  437. // resume后不会推送消息
  438. for {
  439. wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
  440. Namespace: namespace,
  441. Name: workflowName,
  442. })
  443. if err != nil {
  444. t.Fatalf("%+v\n", err)
  445. }
  446. if wf.Spec.Suspend == nil || *wf.Spec.Suspend == false {
  447. break
  448. }
  449. time.Sleep(1 * time.Second)
  450. }
  451. }