123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528 |
- package test
- import (
- "git.sxidc.com/go-tools/argo-api"
- "git.sxidc.com/go-tools/argo-api/client"
- "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
- "github.com/pkg/errors"
- "sync"
- "testing"
- "time"
- )
- func TestWorkflowBase(t *testing.T) {
- err := argo.Init(kubeConfig)
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- defer argo.Destroy()
- err = argo.GetInstance().LintWorkflow(client.LintWorkflowParams{
- Namespace: namespace,
- WorkflowDefinition: workflowYamlStr,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
- Namespace: namespace,
- WorkflowDefinition: workflowYamlStr,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- defer func() {
- err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
- Namespace: namespace,
- Name: workflowName,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- }()
- wfs, err := argo.GetInstance().GetWorkflowsInNamespace(client.GetWorkflowsInNamespaceParams{
- Namespace: namespace,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- if len(wfs) != 1 {
- t.Fatalf("%+v\n", errors.Errorf("数量不一致: %v", len(wfs)))
- }
- wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
- Namespace: namespace,
- Name: workflowName,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- if wf.Namespace != wfs[0].Namespace {
- t.Fatalf("%+v\n", errors.Errorf("命名空间不一致: one: %s other: %s", wf.Namespace, wfs[0].Namespace))
- }
- if wf.Name != wfs[0].Name {
- t.Fatalf("%+v\n", errors.Errorf("名称不一致: one: %s other: %s", wf.Name, wfs[0].Name))
- }
- }
- func TestSubmitWorkflow(t *testing.T) {
- err := argo.Init(kubeConfig)
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- defer argo.Destroy()
- err = argo.GetInstance().CreateWorkflowTemplate(client.CreateWorkflowTemplateParams{
- Namespace: namespace,
- TemplateDefinition: workflowTemplateWithParamsYamlStr,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- defer func() {
- err := argo.GetInstance().DeleteWorkflowTemplate(client.DeleteWorkflowTemplateParams{
- Namespace: namespace,
- Name: workflowTemplateWithParamsName,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- }()
- submitWorkflowName, err := argo.GetInstance().SubmitWorkflowFromWorkflowTemplate(client.SubmitWorkflowFromWorkflowTemplateParams{
- Namespace: namespace,
- TemplateName: workflowTemplateWithParamsName,
- Parameters: []string{"message=Hello Submit"},
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- defer func() {
- err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
- Namespace: namespace,
- Name: submitWorkflowName,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- }()
- wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
- Namespace: namespace,
- Name: submitWorkflowName,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- if wf.Spec.Arguments.Parameters[0].Name != "message" {
- t.Fatalf("%+v\n", errors.Errorf("参数名称错误: %v", wf.Spec.Arguments.Parameters[0].Name))
- }
- if wf.Spec.Arguments.Parameters[0].Value.String() != "Hello Submit" {
- t.Fatalf("%+v\n", errors.Errorf("参数值错误: %v", wf.Spec.Arguments.Parameters[0].Value.String()))
- }
- }
- func TestResubmitWorkflow(t *testing.T) {
- err := argo.Init(kubeConfig)
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- defer argo.Destroy()
- workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
- Namespace: namespace,
- WorkflowDefinition: workflowYamlStr,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- defer func() {
- err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
- Namespace: namespace,
- Name: workflowName,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- }()
- resubmittedWorkflowName, err := argo.GetInstance().ResubmitWorkflow(client.ResubmitWorkflowParams{
- Namespace: namespace,
- Name: workflowName,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- defer func() {
- err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
- Namespace: namespace,
- Name: resubmittedWorkflowName,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- }()
- }
- func TestRetryWorkflow(t *testing.T) {
- err := argo.Init(kubeConfig)
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- defer argo.Destroy()
- workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
- Namespace: namespace,
- WorkflowDefinition: workflowYamlStr,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- defer func() {
- err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
- Namespace: namespace,
- Name: workflowName,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- }()
- terminalCalled := false
- wg := sync.WaitGroup{}
- wg.Add(1)
- err = argo.GetInstance().RegisterWorkflowWatcher(namespace, workflowName, func(wf *v1alpha1.Workflow) {
- if terminalCalled {
- if wf.Status.Phase == "Running" {
- return
- }
- wg.Done()
- }
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- defer argo.GetInstance().UnregisterWorkflowWatcher(namespace, workflowName)
- err = argo.GetInstance().TerminateWorkflow(client.TerminateWorkflowParams{
- Namespace: namespace,
- Name: workflowName,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- terminalCalled = true
- wg.Wait()
- err = argo.GetInstance().RetryWorkflow(client.RetryWorkflowParams{
- Namespace: namespace,
- Name: workflowName,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- }
- func TestStopWorkflow(t *testing.T) {
- err := argo.Init(kubeConfig)
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- defer argo.Destroy()
- workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
- Namespace: namespace,
- WorkflowDefinition: workflowYamlStr,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- defer func() {
- err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
- Namespace: namespace,
- Name: workflowName,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- }()
- stopCalled := false
- wg := sync.WaitGroup{}
- wg.Add(1)
- err = argo.GetInstance().RegisterWorkflowWatcher(namespace, workflowName, func(wf *v1alpha1.Workflow) {
- if stopCalled {
- if wf.Status.Phase == "Running" {
- return
- }
- wg.Done()
- }
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- defer argo.GetInstance().UnregisterWorkflowWatcher(namespace, workflowName)
- err = argo.GetInstance().StopWorkflow(client.StopWorkflowParams{
- Namespace: namespace,
- Name: workflowName,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- stopCalled = true
- wg.Wait()
- }
- func TestTerminateWorkflow(t *testing.T) {
- err := argo.Init(kubeConfig)
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- defer argo.Destroy()
- workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
- Namespace: namespace,
- WorkflowDefinition: workflowYamlStr,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- defer func() {
- err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
- Namespace: namespace,
- Name: workflowName,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- }()
- terminalCalled := false
- wg := sync.WaitGroup{}
- wg.Add(1)
- err = argo.GetInstance().RegisterWorkflowWatcher(namespace, workflowName, func(wf *v1alpha1.Workflow) {
- if terminalCalled {
- if wf.Status.Phase == "Running" {
- return
- }
- wg.Done()
- }
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- defer argo.GetInstance().UnregisterWorkflowWatcher(namespace, workflowName)
- err = argo.GetInstance().TerminateWorkflow(client.TerminateWorkflowParams{
- Namespace: namespace,
- Name: workflowName,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- terminalCalled = true
- wg.Wait()
- }
- func TestSetWorkflow(t *testing.T) {
- err := argo.Init(kubeConfig)
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- defer argo.Destroy()
- workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
- Namespace: namespace,
- WorkflowDefinition: workflowYamlStr,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- defer func() {
- err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
- Namespace: namespace,
- Name: workflowName,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- }()
- for {
- wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
- Namespace: namespace,
- Name: workflowName,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- findA := false
- for _, node := range wf.Status.Nodes {
- if node.DisplayName == "A" {
- findA = true
- }
- }
- if findA {
- break
- }
- }
- err = argo.GetInstance().SetWorkflow(client.SetWorkflowParams{
- Namespace: namespace,
- Name: workflowName,
- NodeFieldSelector: "displayName=A",
- OutputParameters: `{"front_json": "pass"}`,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
- Namespace: namespace,
- Name: workflowName,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- for _, node := range wf.Status.Nodes {
- if node.DisplayName == "A" {
- if node.Outputs.Parameters[0].Name == "front_json" && node.Outputs.Parameters[0].Value.String() == "pass" {
- return
- }
- t.Fatalf("%+v\n", errors.Errorf("输出参数不正确: %v", node.Outputs.Parameters[0].Value.String()))
- }
- }
- t.Fatalf("%+v\n", errors.New("找不到对应的节点"))
- }
- func TestSuspendAndResumeWorkflow(t *testing.T) {
- err := argo.Init(kubeConfig)
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- defer argo.Destroy()
- workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
- Namespace: namespace,
- WorkflowDefinition: workflowWithParamsYamlStr,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- defer func() {
- err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
- Namespace: namespace,
- Name: workflowName,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- }()
- suspendCalled := false
- wg1 := sync.WaitGroup{}
- wg1.Add(1)
- err = argo.GetInstance().RegisterWorkflowWatcher(namespace, workflowName, func(wf *v1alpha1.Workflow) {
- if suspendCalled {
- if wf.Status.Phase != "Running" {
- return
- }
- if wf.Spec.Suspend == nil || *wf.Spec.Suspend != true {
- return
- }
- wg1.Done()
- }
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- defer argo.GetInstance().UnregisterWorkflowWatcher(namespace, workflowName)
- err = argo.GetInstance().SuspendWorkflow(client.SuspendWorkflowParams{
- Namespace: namespace,
- Name: workflowName,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- suspendCalled = true
- wg1.Wait()
- err = argo.GetInstance().ResumeWorkflow(client.ResumeWorkflowParams{
- Namespace: namespace,
- Name: workflowName,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- // resume后不会推送消息
- for {
- wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
- Namespace: namespace,
- Name: workflowName,
- })
- if err != nil {
- t.Fatalf("%+v\n", err)
- }
- if wf.Spec.Suspend == nil || *wf.Spec.Suspend == false {
- break
- }
- time.Sleep(1 * time.Second)
- }
- }
|