package test import ( "git.sxidc.com/go-tools/argo-api" "git.sxidc.com/go-tools/argo-api/client" "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/pkg/errors" "sync" "testing" "time" ) func TestWorkflowBase(t *testing.T) { err := argo.Init(kubeConfig) if err != nil { t.Fatalf("%+v\n", err) } defer argo.Destroy() err = argo.GetInstance().LintWorkflow(client.LintWorkflowParams{ Namespace: namespace, WorkflowDefinition: workflowYamlStr, }) if err != nil { t.Fatalf("%+v\n", err) } workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{ Namespace: namespace, WorkflowDefinition: workflowYamlStr, }) if err != nil { t.Fatalf("%+v\n", err) } defer func() { err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{ Namespace: namespace, Name: workflowName, }) if err != nil { t.Fatalf("%+v\n", err) } }() wfs, err := argo.GetInstance().GetWorkflowsInNamespace(client.GetWorkflowsInNamespaceParams{ Namespace: namespace, }) if err != nil { t.Fatalf("%+v\n", err) } if len(wfs) != 1 { t.Fatalf("%+v\n", errors.Errorf("数量不一致: %v", len(wfs))) } wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{ Namespace: namespace, Name: workflowName, }) if err != nil { t.Fatalf("%+v\n", err) } if wf.Namespace != wfs[0].Namespace { t.Fatalf("%+v\n", errors.Errorf("命名空间不一致: one: %s other: %s", wf.Namespace, wfs[0].Namespace)) } if wf.Name != wfs[0].Name { t.Fatalf("%+v\n", errors.Errorf("名称不一致: one: %s other: %s", wf.Name, wfs[0].Name)) } } func TestSubmitWorkflow(t *testing.T) { err := argo.Init(kubeConfig) if err != nil { t.Fatalf("%+v\n", err) } defer argo.Destroy() err = argo.GetInstance().CreateWorkflowTemplate(client.CreateWorkflowTemplateParams{ Namespace: namespace, TemplateDefinition: workflowTemplateWithParamsYamlStr, }) if err != nil { t.Fatalf("%+v\n", err) } defer func() { err := argo.GetInstance().DeleteWorkflowTemplate(client.DeleteWorkflowTemplateParams{ Namespace: namespace, Name: workflowTemplateWithParamsName, }) if err != nil { t.Fatalf("%+v\n", err) } }() submitWorkflowName, err := argo.GetInstance().SubmitWorkflowFromWorkflowTemplate(client.SubmitWorkflowFromWorkflowTemplateParams{ Namespace: namespace, TemplateName: workflowTemplateWithParamsName, Parameters: []string{"message=Hello Submit"}, }) if err != nil { t.Fatalf("%+v\n", err) } defer func() { err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{ Namespace: namespace, Name: submitWorkflowName, }) if err != nil { t.Fatalf("%+v\n", err) } }() wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{ Namespace: namespace, Name: submitWorkflowName, }) if err != nil { t.Fatalf("%+v\n", err) } if wf.Spec.Arguments.Parameters[0].Name != "message" { t.Fatalf("%+v\n", errors.Errorf("参数名称错误: %v", wf.Spec.Arguments.Parameters[0].Name)) } if wf.Spec.Arguments.Parameters[0].Value.String() != "Hello Submit" { t.Fatalf("%+v\n", errors.Errorf("参数值错误: %v", wf.Spec.Arguments.Parameters[0].Value.String())) } } func TestResubmitWorkflow(t *testing.T) { err := argo.Init(kubeConfig) if err != nil { t.Fatalf("%+v\n", err) } defer argo.Destroy() workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{ Namespace: namespace, WorkflowDefinition: workflowYamlStr, }) if err != nil { t.Fatalf("%+v\n", err) } defer func() { err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{ Namespace: namespace, Name: workflowName, }) if err != nil { t.Fatalf("%+v\n", err) } }() resubmittedWorkflowName, err := argo.GetInstance().ResubmitWorkflow(client.ResubmitWorkflowParams{ Namespace: namespace, Name: workflowName, }) if err != nil { t.Fatalf("%+v\n", err) } defer func() { err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{ Namespace: namespace, Name: resubmittedWorkflowName, }) if err != nil { t.Fatalf("%+v\n", err) } }() } func TestRetryWorkflow(t *testing.T) { err := argo.Init(kubeConfig) if err != nil { t.Fatalf("%+v\n", err) } defer argo.Destroy() workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{ Namespace: namespace, WorkflowDefinition: workflowYamlStr, }) if err != nil { t.Fatalf("%+v\n", err) } defer func() { err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{ Namespace: namespace, Name: workflowName, }) if err != nil { t.Fatalf("%+v\n", err) } }() terminalCalled := false wg := sync.WaitGroup{} wg.Add(1) err = argo.GetInstance().RegisterWorkflowWatcher(namespace, workflowName, func(wf *v1alpha1.Workflow) { if terminalCalled { if wf.Status.Phase == "Running" { return } wg.Done() } }) if err != nil { t.Fatalf("%+v\n", err) } defer argo.GetInstance().UnregisterWorkflowWatcher(namespace, workflowName) err = argo.GetInstance().TerminateWorkflow(client.TerminateWorkflowParams{ Namespace: namespace, Name: workflowName, }) if err != nil { t.Fatalf("%+v\n", err) } terminalCalled = true wg.Wait() err = argo.GetInstance().RetryWorkflow(client.RetryWorkflowParams{ Namespace: namespace, Name: workflowName, }) if err != nil { t.Fatalf("%+v\n", err) } } func TestStopWorkflow(t *testing.T) { err := argo.Init(kubeConfig) if err != nil { t.Fatalf("%+v\n", err) } defer argo.Destroy() workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{ Namespace: namespace, WorkflowDefinition: workflowYamlStr, }) if err != nil { t.Fatalf("%+v\n", err) } defer func() { err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{ Namespace: namespace, Name: workflowName, }) if err != nil { t.Fatalf("%+v\n", err) } }() stopCalled := false wg := sync.WaitGroup{} wg.Add(1) err = argo.GetInstance().RegisterWorkflowWatcher(namespace, workflowName, func(wf *v1alpha1.Workflow) { if stopCalled { if wf.Status.Phase == "Running" { return } wg.Done() } }) if err != nil { t.Fatalf("%+v\n", err) } defer argo.GetInstance().UnregisterWorkflowWatcher(namespace, workflowName) err = argo.GetInstance().StopWorkflow(client.StopWorkflowParams{ Namespace: namespace, Name: workflowName, }) if err != nil { t.Fatalf("%+v\n", err) } stopCalled = true wg.Wait() } func TestTerminateWorkflow(t *testing.T) { err := argo.Init(kubeConfig) if err != nil { t.Fatalf("%+v\n", err) } defer argo.Destroy() workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{ Namespace: namespace, WorkflowDefinition: workflowYamlStr, }) if err != nil { t.Fatalf("%+v\n", err) } defer func() { err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{ Namespace: namespace, Name: workflowName, }) if err != nil { t.Fatalf("%+v\n", err) } }() terminalCalled := false wg := sync.WaitGroup{} wg.Add(1) err = argo.GetInstance().RegisterWorkflowWatcher(namespace, workflowName, func(wf *v1alpha1.Workflow) { if terminalCalled { if wf.Status.Phase == "Running" { return } wg.Done() } }) if err != nil { t.Fatalf("%+v\n", err) } defer argo.GetInstance().UnregisterWorkflowWatcher(namespace, workflowName) err = argo.GetInstance().TerminateWorkflow(client.TerminateWorkflowParams{ Namespace: namespace, Name: workflowName, }) if err != nil { t.Fatalf("%+v\n", err) } terminalCalled = true wg.Wait() } func TestSetWorkflow(t *testing.T) { err := argo.Init(kubeConfig) if err != nil { t.Fatalf("%+v\n", err) } defer argo.Destroy() workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{ Namespace: namespace, WorkflowDefinition: workflowYamlStr, }) if err != nil { t.Fatalf("%+v\n", err) } defer func() { err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{ Namespace: namespace, Name: workflowName, }) if err != nil { t.Fatalf("%+v\n", err) } }() for { wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{ Namespace: namespace, Name: workflowName, }) if err != nil { t.Fatalf("%+v\n", err) } findA := false for _, node := range wf.Status.Nodes { if node.DisplayName == "A" { findA = true } } if findA { break } } err = argo.GetInstance().SetWorkflow(client.SetWorkflowParams{ Namespace: namespace, Name: workflowName, NodeFieldSelector: "displayName=A", OutputParameters: `{"front_json": "pass"}`, }) if err != nil { t.Fatalf("%+v\n", err) } wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{ Namespace: namespace, Name: workflowName, }) if err != nil { t.Fatalf("%+v\n", err) } for _, node := range wf.Status.Nodes { if node.DisplayName == "A" { if node.Outputs.Parameters[0].Name == "front_json" && node.Outputs.Parameters[0].Value.String() == "pass" { return } t.Fatalf("%+v\n", errors.Errorf("输出参数不正确: %v", node.Outputs.Parameters[0].Value.String())) } } t.Fatalf("%+v\n", errors.New("找不到对应的节点")) } func TestSuspendAndResumeWorkflow(t *testing.T) { err := argo.Init(kubeConfig) if err != nil { t.Fatalf("%+v\n", err) } defer argo.Destroy() workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{ Namespace: namespace, WorkflowDefinition: workflowWithParamsYamlStr, }) if err != nil { t.Fatalf("%+v\n", err) } defer func() { err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{ Namespace: namespace, Name: workflowName, }) if err != nil { t.Fatalf("%+v\n", err) } }() suspendCalled := false wg1 := sync.WaitGroup{} wg1.Add(1) err = argo.GetInstance().RegisterWorkflowWatcher(namespace, workflowName, func(wf *v1alpha1.Workflow) { if suspendCalled { if wf.Status.Phase != "Running" { return } if wf.Spec.Suspend == nil || *wf.Spec.Suspend != true { return } wg1.Done() } }) if err != nil { t.Fatalf("%+v\n", err) } defer argo.GetInstance().UnregisterWorkflowWatcher(namespace, workflowName) err = argo.GetInstance().SuspendWorkflow(client.SuspendWorkflowParams{ Namespace: namespace, Name: workflowName, }) if err != nil { t.Fatalf("%+v\n", err) } suspendCalled = true wg1.Wait() err = argo.GetInstance().ResumeWorkflow(client.ResumeWorkflowParams{ Namespace: namespace, Name: workflowName, }) if err != nil { t.Fatalf("%+v\n", err) } // resume后不会推送消息 for { wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{ Namespace: namespace, Name: workflowName, }) if err != nil { t.Fatalf("%+v\n", err) } if wf.Spec.Suspend == nil || *wf.Spec.Suspend == false { break } time.Sleep(1 * time.Second) } }