workflow_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528
  1. package test
  2. import (
  3. "git.sxidc.com/go-tools/argo-api"
  4. "git.sxidc.com/go-tools/argo-api/client"
  5. "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
  6. "github.com/pkg/errors"
  7. "sync"
  8. "testing"
  9. "time"
  10. )
  11. func TestWorkflowBase(t *testing.T) {
  12. err := argo.Init(kubeConfig)
  13. if err != nil {
  14. t.Fatalf("%+v\n", err)
  15. }
  16. defer argo.Destroy()
  17. err = argo.GetInstance().LintWorkflow(client.LintWorkflowParams{
  18. Namespace: namespace,
  19. WorkflowDefinition: workflowYamlStr,
  20. })
  21. if err != nil {
  22. t.Fatalf("%+v\n", err)
  23. }
  24. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  25. Namespace: namespace,
  26. WorkflowDefinition: workflowYamlStr,
  27. })
  28. if err != nil {
  29. t.Fatalf("%+v\n", err)
  30. }
  31. defer func() {
  32. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  33. Namespace: namespace,
  34. Name: workflowName,
  35. })
  36. if err != nil {
  37. t.Fatalf("%+v\n", err)
  38. }
  39. }()
  40. wfs, err := argo.GetInstance().GetWorkflowsInNamespace(client.GetWorkflowsInNamespaceParams{
  41. Namespace: namespace,
  42. })
  43. if err != nil {
  44. t.Fatalf("%+v\n", err)
  45. }
  46. if len(wfs) != 1 {
  47. t.Fatalf("%+v\n", errors.Errorf("数量不一致: %v", len(wfs)))
  48. }
  49. wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
  50. Namespace: namespace,
  51. Name: workflowName,
  52. })
  53. if err != nil {
  54. t.Fatalf("%+v\n", err)
  55. }
  56. if wf.Namespace != wfs[0].Namespace {
  57. t.Fatalf("%+v\n", errors.Errorf("命名空间不一致: one: %s other: %s", wf.Namespace, wfs[0].Namespace))
  58. }
  59. if wf.Name != wfs[0].Name {
  60. t.Fatalf("%+v\n", errors.Errorf("名称不一致: one: %s other: %s", wf.Name, wfs[0].Name))
  61. }
  62. }
  63. func TestSubmitWorkflow(t *testing.T) {
  64. err := argo.Init(kubeConfig)
  65. if err != nil {
  66. t.Fatalf("%+v\n", err)
  67. }
  68. defer argo.Destroy()
  69. err = argo.GetInstance().CreateWorkflowTemplate(client.CreateWorkflowTemplateParams{
  70. Namespace: namespace,
  71. TemplateDefinition: workflowTemplateWithParamsYamlStr,
  72. })
  73. if err != nil {
  74. t.Fatalf("%+v\n", err)
  75. }
  76. defer func() {
  77. err := argo.GetInstance().DeleteWorkflowTemplate(client.DeleteWorkflowTemplateParams{
  78. Namespace: namespace,
  79. Name: workflowTemplateWithParamsName,
  80. })
  81. if err != nil {
  82. t.Fatalf("%+v\n", err)
  83. }
  84. }()
  85. submitWorkflowName, err := argo.GetInstance().SubmitWorkflowFromWorkflowTemplate(client.SubmitWorkflowFromWorkflowTemplateParams{
  86. Namespace: namespace,
  87. TemplateName: workflowTemplateWithParamsName,
  88. Parameters: []string{"message=Hello Submit"},
  89. })
  90. if err != nil {
  91. t.Fatalf("%+v\n", err)
  92. }
  93. defer func() {
  94. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  95. Namespace: namespace,
  96. Name: submitWorkflowName,
  97. })
  98. if err != nil {
  99. t.Fatalf("%+v\n", err)
  100. }
  101. }()
  102. wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
  103. Namespace: namespace,
  104. Name: submitWorkflowName,
  105. })
  106. if err != nil {
  107. t.Fatalf("%+v\n", err)
  108. }
  109. if wf.Spec.Arguments.Parameters[0].Name != "message" {
  110. t.Fatalf("%+v\n", errors.Errorf("参数名称错误: %v", wf.Spec.Arguments.Parameters[0].Name))
  111. }
  112. if wf.Spec.Arguments.Parameters[0].Value.String() != "Hello Submit" {
  113. t.Fatalf("%+v\n", errors.Errorf("参数值错误: %v", wf.Spec.Arguments.Parameters[0].Value.String()))
  114. }
  115. }
  116. func TestResubmitWorkflow(t *testing.T) {
  117. err := argo.Init(kubeConfig)
  118. if err != nil {
  119. t.Fatalf("%+v\n", err)
  120. }
  121. defer argo.Destroy()
  122. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  123. Namespace: namespace,
  124. WorkflowDefinition: workflowYamlStr,
  125. })
  126. if err != nil {
  127. t.Fatalf("%+v\n", err)
  128. }
  129. defer func() {
  130. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  131. Namespace: namespace,
  132. Name: workflowName,
  133. })
  134. if err != nil {
  135. t.Fatalf("%+v\n", err)
  136. }
  137. }()
  138. resubmittedWorkflowName, err := argo.GetInstance().ResubmitWorkflow(client.ResubmitWorkflowParams{
  139. Namespace: namespace,
  140. Name: workflowName,
  141. })
  142. if err != nil {
  143. t.Fatalf("%+v\n", err)
  144. }
  145. defer func() {
  146. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  147. Namespace: namespace,
  148. Name: resubmittedWorkflowName,
  149. })
  150. if err != nil {
  151. t.Fatalf("%+v\n", err)
  152. }
  153. }()
  154. }
  155. func TestRetryWorkflow(t *testing.T) {
  156. err := argo.Init(kubeConfig)
  157. if err != nil {
  158. t.Fatalf("%+v\n", err)
  159. }
  160. defer argo.Destroy()
  161. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  162. Namespace: namespace,
  163. WorkflowDefinition: workflowYamlStr,
  164. })
  165. if err != nil {
  166. t.Fatalf("%+v\n", err)
  167. }
  168. defer func() {
  169. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  170. Namespace: namespace,
  171. Name: workflowName,
  172. })
  173. if err != nil {
  174. t.Fatalf("%+v\n", err)
  175. }
  176. }()
  177. terminalCalled := false
  178. wg := sync.WaitGroup{}
  179. wg.Add(1)
  180. err = argo.GetInstance().RegisterWorkflowWatcher(namespace, workflowName, func(wf *v1alpha1.Workflow) {
  181. if terminalCalled {
  182. if wf.Status.Phase == "Running" {
  183. return
  184. }
  185. wg.Done()
  186. }
  187. })
  188. if err != nil {
  189. t.Fatalf("%+v\n", err)
  190. }
  191. defer argo.GetInstance().UnregisterWorkflowWatcher(namespace, workflowName)
  192. err = argo.GetInstance().TerminateWorkflow(client.TerminateWorkflowParams{
  193. Namespace: namespace,
  194. Name: workflowName,
  195. })
  196. if err != nil {
  197. t.Fatalf("%+v\n", err)
  198. }
  199. terminalCalled = true
  200. wg.Wait()
  201. err = argo.GetInstance().RetryWorkflow(client.RetryWorkflowParams{
  202. Namespace: namespace,
  203. Name: workflowName,
  204. })
  205. if err != nil {
  206. t.Fatalf("%+v\n", err)
  207. }
  208. }
  209. func TestStopWorkflow(t *testing.T) {
  210. err := argo.Init(kubeConfig)
  211. if err != nil {
  212. t.Fatalf("%+v\n", err)
  213. }
  214. defer argo.Destroy()
  215. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  216. Namespace: namespace,
  217. WorkflowDefinition: workflowYamlStr,
  218. })
  219. if err != nil {
  220. t.Fatalf("%+v\n", err)
  221. }
  222. defer func() {
  223. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  224. Namespace: namespace,
  225. Name: workflowName,
  226. })
  227. if err != nil {
  228. t.Fatalf("%+v\n", err)
  229. }
  230. }()
  231. stopCalled := false
  232. wg := sync.WaitGroup{}
  233. wg.Add(1)
  234. err = argo.GetInstance().RegisterWorkflowWatcher(namespace, workflowName, func(wf *v1alpha1.Workflow) {
  235. if stopCalled {
  236. if wf.Status.Phase == "Running" {
  237. return
  238. }
  239. wg.Done()
  240. }
  241. })
  242. if err != nil {
  243. t.Fatalf("%+v\n", err)
  244. }
  245. defer argo.GetInstance().UnregisterWorkflowWatcher(namespace, workflowName)
  246. err = argo.GetInstance().StopWorkflow(client.StopWorkflowParams{
  247. Namespace: namespace,
  248. Name: workflowName,
  249. })
  250. if err != nil {
  251. t.Fatalf("%+v\n", err)
  252. }
  253. stopCalled = true
  254. wg.Wait()
  255. }
  256. func TestTerminateWorkflow(t *testing.T) {
  257. err := argo.Init(kubeConfig)
  258. if err != nil {
  259. t.Fatalf("%+v\n", err)
  260. }
  261. defer argo.Destroy()
  262. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  263. Namespace: namespace,
  264. WorkflowDefinition: workflowYamlStr,
  265. })
  266. if err != nil {
  267. t.Fatalf("%+v\n", err)
  268. }
  269. defer func() {
  270. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  271. Namespace: namespace,
  272. Name: workflowName,
  273. })
  274. if err != nil {
  275. t.Fatalf("%+v\n", err)
  276. }
  277. }()
  278. terminalCalled := false
  279. wg := sync.WaitGroup{}
  280. wg.Add(1)
  281. err = argo.GetInstance().RegisterWorkflowWatcher(namespace, workflowName, func(wf *v1alpha1.Workflow) {
  282. if terminalCalled {
  283. if wf.Status.Phase == "Running" {
  284. return
  285. }
  286. wg.Done()
  287. }
  288. })
  289. if err != nil {
  290. t.Fatalf("%+v\n", err)
  291. }
  292. defer argo.GetInstance().UnregisterWorkflowWatcher(namespace, workflowName)
  293. err = argo.GetInstance().TerminateWorkflow(client.TerminateWorkflowParams{
  294. Namespace: namespace,
  295. Name: workflowName,
  296. })
  297. if err != nil {
  298. t.Fatalf("%+v\n", err)
  299. }
  300. terminalCalled = true
  301. wg.Wait()
  302. }
  303. func TestSetWorkflow(t *testing.T) {
  304. err := argo.Init(kubeConfig)
  305. if err != nil {
  306. t.Fatalf("%+v\n", err)
  307. }
  308. defer argo.Destroy()
  309. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  310. Namespace: namespace,
  311. WorkflowDefinition: workflowYamlStr,
  312. })
  313. if err != nil {
  314. t.Fatalf("%+v\n", err)
  315. }
  316. defer func() {
  317. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  318. Namespace: namespace,
  319. Name: workflowName,
  320. })
  321. if err != nil {
  322. t.Fatalf("%+v\n", err)
  323. }
  324. }()
  325. for {
  326. wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
  327. Namespace: namespace,
  328. Name: workflowName,
  329. })
  330. if err != nil {
  331. t.Fatalf("%+v\n", err)
  332. }
  333. findA := false
  334. for _, node := range wf.Status.Nodes {
  335. if node.DisplayName == "A" {
  336. findA = true
  337. }
  338. }
  339. if findA {
  340. break
  341. }
  342. }
  343. err = argo.GetInstance().SetWorkflow(client.SetWorkflowParams{
  344. Namespace: namespace,
  345. Name: workflowName,
  346. NodeFieldSelector: "displayName=A",
  347. OutputParameters: `{"front_json": "pass"}`,
  348. })
  349. if err != nil {
  350. t.Fatalf("%+v\n", err)
  351. }
  352. wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
  353. Namespace: namespace,
  354. Name: workflowName,
  355. })
  356. if err != nil {
  357. t.Fatalf("%+v\n", err)
  358. }
  359. for _, node := range wf.Status.Nodes {
  360. if node.DisplayName == "A" {
  361. if node.Outputs.Parameters[0].Name == "front_json" && node.Outputs.Parameters[0].Value.String() == "pass" {
  362. return
  363. }
  364. t.Fatalf("%+v\n", errors.Errorf("输出参数不正确: %v", node.Outputs.Parameters[0].Value.String()))
  365. }
  366. }
  367. t.Fatalf("%+v\n", errors.New("找不到对应的节点"))
  368. }
  369. func TestSuspendAndResumeWorkflow(t *testing.T) {
  370. err := argo.Init(kubeConfig)
  371. if err != nil {
  372. t.Fatalf("%+v\n", err)
  373. }
  374. defer argo.Destroy()
  375. workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
  376. Namespace: namespace,
  377. WorkflowDefinition: workflowWithParamsYamlStr,
  378. })
  379. if err != nil {
  380. t.Fatalf("%+v\n", err)
  381. }
  382. defer func() {
  383. err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
  384. Namespace: namespace,
  385. Name: workflowName,
  386. })
  387. if err != nil {
  388. t.Fatalf("%+v\n", err)
  389. }
  390. }()
  391. suspendCalled := false
  392. wg1 := sync.WaitGroup{}
  393. wg1.Add(1)
  394. err = argo.GetInstance().RegisterWorkflowWatcher(namespace, workflowName, func(wf *v1alpha1.Workflow) {
  395. if suspendCalled {
  396. if wf.Status.Phase != "Running" {
  397. return
  398. }
  399. if wf.Spec.Suspend == nil || *wf.Spec.Suspend != true {
  400. return
  401. }
  402. wg1.Done()
  403. }
  404. })
  405. if err != nil {
  406. t.Fatalf("%+v\n", err)
  407. }
  408. defer argo.GetInstance().UnregisterWorkflowWatcher(namespace, workflowName)
  409. err = argo.GetInstance().SuspendWorkflow(client.SuspendWorkflowParams{
  410. Namespace: namespace,
  411. Name: workflowName,
  412. })
  413. if err != nil {
  414. t.Fatalf("%+v\n", err)
  415. }
  416. suspendCalled = true
  417. wg1.Wait()
  418. err = argo.GetInstance().ResumeWorkflow(client.ResumeWorkflowParams{
  419. Namespace: namespace,
  420. Name: workflowName,
  421. })
  422. if err != nil {
  423. t.Fatalf("%+v\n", err)
  424. }
  425. // resume后不会推送消息
  426. for {
  427. wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
  428. Namespace: namespace,
  429. Name: workflowName,
  430. })
  431. if err != nil {
  432. t.Fatalf("%+v\n", err)
  433. }
  434. if wf.Spec.Suspend == nil || *wf.Spec.Suspend == false {
  435. break
  436. }
  437. time.Sleep(1 * time.Second)
  438. }
  439. }