Browse Source

完成set接口

yjp 1 year ago
parent
commit
c900549453
2 changed files with 130 additions and 2 deletions
  1. 36 2
      client/workflow.go
  2. 94 0
      test/workflow_test.go

+ 36 - 2
client/workflow.go

@@ -512,14 +512,48 @@ type SetWorkflowParams struct {
 	Namespace         string
 	Name              string
 	NodeFieldSelector string
+	OutputParameters  string
 	Message           string
 	Phase             string
-	OutputParameters  []string
 }
 
 // SetWorkflow 设置工作流的参数
 func (c *Client) SetWorkflow(params SetWorkflowParams) error {
-	return nil
+	responseMap := make(map[string]any)
+
+	resp, err := c.restyClient.R().
+		SetHeader("Content-Type", "application/json").
+		SetAuthToken(c.token).
+		SetPathParams(map[string]string{
+			"namespace": params.Namespace,
+			"name":      params.Name,
+		}).
+		SetBody(map[string]any{
+			"namespace":         params.Namespace,
+			"name":              params.Name,
+			"nodeFieldSelector": params.NodeFieldSelector,
+			"outputParameters":  params.OutputParameters,
+			"message":           params.Message,
+			"phase":             params.Phase,
+		}).
+		SetResult(&responseMap).
+		SetError(&responseMap).
+		Put(setWorkflowRelativeUrl)
+	if err != nil {
+		return errors.New(err.Error())
+	}
+
+	switch resp.StatusCode() {
+	case http.StatusOK:
+		return nil
+	default:
+		message, ok := responseMap["message"]
+		if !ok {
+			return errors.Errorf("%v", resp.Status())
+		}
+
+		return errors.Errorf("%v, %v", resp.Status(), message)
+	}
 }
 
 type SuspendWorkflowParams struct {

+ 94 - 0
test/workflow_test.go

@@ -349,3 +349,97 @@ func TestTerminateWorkflow(t *testing.T) {
 		time.Sleep(1 * time.Second)
 	}
 }
+
+func TestSetWorkflow(t *testing.T) {
+	argo.Init(baseUrl, token, client.WithTimeoutSec(10))
+	defer argo.Destroy()
+
+	workflowDefinition := make(map[string]any)
+	err := yaml.Unmarshal(workflowYamlStr, &workflowDefinition)
+	if err != nil {
+		panic(err)
+	}
+
+	workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
+		Namespace:          namespace,
+		WorkflowDefinition: workflowDefinition,
+	})
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+
+	defer func() {
+		err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
+			Namespace: namespace,
+			Name:      workflowName,
+		})
+		if err != nil {
+			t.Fatalf("%+v\n", err)
+		}
+	}()
+
+	for {
+		currentWorkflowDefinitions, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
+			Namespace: namespace,
+			Name:      workflowName,
+		})
+		if err != nil {
+			t.Fatalf("%+v\n", err)
+		}
+
+		nodes, ok := currentWorkflowDefinitions["status"].(map[string]any)["nodes"]
+		if !ok {
+			time.Sleep(1 * time.Second)
+			continue
+		}
+
+		findA := false
+		nodesMap := nodes.(map[string]any)
+		for _, node := range nodesMap {
+			nodeMap := node.(map[string]any)
+			if nodeMap["displayName"].(string) == "A" {
+				findA = true
+			}
+		}
+
+		if findA {
+			break
+		}
+
+		time.Sleep(1 * time.Second)
+	}
+
+	err = argo.GetInstance().SetWorkflow(client.SetWorkflowParams{
+		Namespace:         namespace,
+		Name:              workflowName,
+		NodeFieldSelector: "displayName=A",
+		OutputParameters:  `{"approve": "pass"}`,
+	})
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+
+	currentWorkflowDefinitions, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
+		Namespace: namespace,
+		Name:      workflowName,
+	})
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+
+	nodesMap := currentWorkflowDefinitions["status"].(map[string]any)["nodes"].(map[string]any)
+	for _, node := range nodesMap {
+		nodeMap := node.(map[string]any)
+		if nodeMap["displayName"].(string) == "A" {
+			outputParams := nodeMap["outputs"].(map[string]any)["parameters"].([]any)
+			approveParamsMap := outputParams[0].(map[string]any)
+			if approveParamsMap["name"] == "approve" && approveParamsMap["value"] == "pass" {
+				return
+			}
+
+			t.Fatalf("%+v\n", errors.Errorf("输出参数不正确: %v", approveParamsMap["value"]))
+		}
+	}
+
+	t.Fatalf("%+v\n", errors.New("找不到对应的节点"))
+}