Browse Source

完成suspend和resume并测试

yjp 1 year ago
parent
commit
70c063d3af
4 changed files with 163 additions and 2 deletions
  1. 62 2
      client/workflow.go
  2. 3 0
      test/common.go
  3. 20 0
      test/definitions/workflow_with_params.yaml
  4. 78 0
      test/workflow_test.go

+ 62 - 2
client/workflow.go

@@ -563,7 +563,37 @@ type SuspendWorkflowParams struct {
 
 // SuspendWorkflow 挂起工作流
 func (c *Client) SuspendWorkflow(params SuspendWorkflowParams) error {
-	return nil
+	responseMap := make(map[string]any)
+
+	resp, err := c.restyClient.R().
+		SetHeader("Content-Type", "application/json").
+		SetAuthToken(c.token).
+		SetPathParams(map[string]string{
+			"namespace": params.Namespace,
+			"name":      params.Name,
+		}).
+		SetBody(map[string]any{
+			"namespace": params.Namespace,
+			"name":      params.Name,
+		}).
+		SetResult(&responseMap).
+		SetError(&responseMap).
+		Put(suspendWorkflowRelativeUrl)
+	if err != nil {
+		return errors.New(err.Error())
+	}
+
+	switch resp.StatusCode() {
+	case http.StatusOK:
+		return nil
+	default:
+		message, ok := responseMap["message"]
+		if !ok {
+			return errors.Errorf("%v", resp.Status())
+		}
+
+		return errors.Errorf("%v, %v", resp.Status(), message)
+	}
 }
 
 type ResumeWorkflowParams struct {
@@ -574,7 +604,37 @@ type ResumeWorkflowParams struct {
 
 // ResumeWorkflow 恢复被挂起的工作流
 func (c *Client) ResumeWorkflow(params ResumeWorkflowParams) error {
-	return nil
+	responseMap := make(map[string]any)
+
+	resp, err := c.restyClient.R().
+		SetHeader("Content-Type", "application/json").
+		SetAuthToken(c.token).
+		SetPathParams(map[string]string{
+			"namespace": params.Namespace,
+			"name":      params.Name,
+		}).
+		SetBody(map[string]any{
+			"namespace": params.Namespace,
+			"name":      params.Name,
+		}).
+		SetResult(&responseMap).
+		SetError(&responseMap).
+		Put(resumeWorkflowRelativeUrl)
+	if err != nil {
+		return errors.New(err.Error())
+	}
+
+	switch resp.StatusCode() {
+	case http.StatusOK:
+		return nil
+	default:
+		message, ok := responseMap["message"]
+		if !ok {
+			return errors.Errorf("%v", resp.Status())
+		}
+
+		return errors.Errorf("%v, %v", resp.Status(), message)
+	}
 }
 
 type EventCallback func(event map[string]any, eventErr error) error

+ 3 - 0
test/common.go

@@ -24,6 +24,9 @@ var workflowYamlStr []byte
 //go:embed definitions/workflow_template_with_params.yaml
 var workflowTemplateWithParamsYamlStr []byte
 
+//go:embed definitions/workflow_with_params.yaml
+var workflowWithParamsYamlStr []byte
+
 func compareDefinitionMap(t *testing.T, definitionMap map[string]any, checkDefinitionMaps map[string]any) {
 	keys := make([]string, 0)
 	for key, _ := range definitionMap {

+ 20 - 0
test/definitions/workflow_with_params.yaml

@@ -0,0 +1,20 @@
+apiVersion: argoproj.io/v1alpha1
+kind: Workflow
+metadata:
+  generateName: hello
+spec:
+  entrypoint: whalesay
+  arguments:
+    parameters:
+      - name: message
+        value: hello world
+
+  templates:
+    - name: whalesay
+      inputs:
+        parameters:
+          - name: message
+      container:
+        image: docker/whalesay
+        command: [cowsay]
+        args: ["{{inputs.parameters.message}}"]

+ 78 - 0
test/workflow_test.go

@@ -443,3 +443,81 @@ func TestSetWorkflow(t *testing.T) {
 
 	t.Fatalf("%+v\n", errors.New("找不到对应的节点"))
 }
+
+func TestSuspendAndResumeWorkflow(t *testing.T) {
+	argo.Init(baseUrl, token, client.WithTimeoutSec(10))
+	defer argo.Destroy()
+
+	workflowDefinition := make(map[string]any)
+	err := yaml.Unmarshal(workflowWithParamsYamlStr, &workflowDefinition)
+	if err != nil {
+		panic(err)
+	}
+
+	workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
+		Namespace:          namespace,
+		WorkflowDefinition: workflowDefinition,
+	})
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+
+	defer func() {
+		err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
+			Namespace: namespace,
+			Name:      workflowName,
+		})
+		if err != nil {
+			t.Fatalf("%+v\n", err)
+		}
+	}()
+
+	err = argo.GetInstance().SuspendWorkflow(client.SuspendWorkflowParams{
+		Namespace: namespace,
+		Name:      workflowName,
+	})
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+
+	for {
+		currentWorkflowDefinitions, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
+			Namespace: namespace,
+			Name:      workflowName,
+		})
+		if err != nil {
+			t.Fatalf("%+v\n", err)
+		}
+
+		if currentWorkflowDefinitions["spec"].(map[string]any)["suspend"] == true {
+			break
+		}
+
+		time.Sleep(1 * time.Second)
+	}
+
+	err = argo.GetInstance().ResumeWorkflow(client.ResumeWorkflowParams{
+		Namespace: namespace,
+		Name:      workflowName,
+	})
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+
+	for {
+		currentWorkflowDefinitions, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
+			Namespace: namespace,
+			Name:      workflowName,
+		})
+		if err != nil {
+			t.Fatalf("%+v\n", err)
+		}
+
+		_, ok := currentWorkflowDefinitions["spec"].(map[string]any)["suspend"]
+		if !ok {
+			break
+		}
+
+		time.Sleep(1 * time.Second)
+	}
+}