yjp 1 жил өмнө
parent
commit
e58ff48a16
2 өөрчлөгдсөн 179 нэмэгдсэн , 15 устгасан
  1. 70 15
      client/workflow.go
  2. 109 0
      test/workflow_test.go

+ 70 - 15
client/workflow.go

@@ -14,12 +14,11 @@ const (
 	submitWorkflowRelativeUrl          = "/api/v1/workflows/{namespace}/submit"
 	resubmitWorkflowRelativeUrl        = "/api/v1/workflows/{namespace}/{name}/resubmit"
 	retryWorkflowRelativeUrl           = "/api/v1/workflows/{namespace}/{name}/retry"
-	setWorkflowRelativeUrl             = "/api/v1/workflows/{namespace}/{name}/set"
 	stopWorkflowRelativeUrl            = "/api/v1/workflows/{namespace}/{name}/stop"
 	terminateWorkflowRelativeUrl       = "/api/v1/workflows/{namespace}/{name}/terminate"
+	setWorkflowRelativeUrl             = "/api/v1/workflows/{namespace}/{name}/set"
 	suspendWorkflowRelativeUrl         = "/api/v1/workflows/{namespace}/{name}/suspend"
 	resumeWorkflowRelativeUrl          = "/api/v1/workflows/{namespace}/{name}/resume"
-	getEventsStreamRelativeUrl         = "/api/v1/stream/events/{namespace}"
 	getWorkflowEventsStreamRelativeUrl = "/api/v1/workflow-events/{namespace}"
 )
 
@@ -239,8 +238,6 @@ func (c *Client) LintWorkflow(params LintWorkflowParams) error {
 	switch resp.StatusCode() {
 	case http.StatusOK:
 		return nil
-	case http.StatusConflict:
-		return errors.New("工作流已存在")
 	default:
 		message, ok := responseMap["message"]
 		if !ok {
@@ -305,8 +302,6 @@ func (c *Client) SubmitWorkflowFromWorkflowTemplate(params SubmitWorkflowFromWor
 		}
 
 		return workflowNameStr, nil
-	case http.StatusConflict:
-		return "", errors.New("工作流已存在")
 	default:
 		message, ok := responseMap["message"]
 		if !ok {
@@ -373,8 +368,6 @@ func (c *Client) ResubmitWorkflow(params ResubmitWorkflowParams) (string, error)
 		}
 
 		return workflowNameStr, nil
-	case http.StatusConflict:
-		return "", errors.New("工作流已存在")
 	default:
 		message, ok := responseMap["message"]
 		if !ok {
@@ -419,8 +412,8 @@ func (c *Client) RetryWorkflow(params RetryWorkflowParams) error {
 	}
 
 	switch resp.StatusCode() {
-	case http.StatusConflict:
-		return errors.New("工作流已存在")
+	case http.StatusOK:
+		return nil
 	default:
 		message, ok := responseMap["message"]
 		if !ok {
@@ -440,17 +433,79 @@ type StopWorkflowParams struct {
 
 // StopWorkflow 终止工作流运行,会调用所有的退出处理器
 func (c *Client) StopWorkflow(params StopWorkflowParams) error {
-	return nil
+	responseMap := make(map[string]any)
+
+	resp, err := c.restyClient.R().
+		SetHeader("Content-Type", "application/json").
+		SetAuthToken(c.token).
+		SetPathParams(map[string]string{
+			"namespace": params.Namespace,
+			"name":      params.Name,
+		}).
+		SetBody(map[string]any{
+			"namespace":         params.Namespace,
+			"name":              params.Name,
+			"nodeFieldSelector": params.NodeFieldSelector,
+			"message":           params.Message,
+		}).
+		SetResult(&responseMap).
+		SetError(&responseMap).
+		Put(stopWorkflowRelativeUrl)
+	if err != nil {
+		return errors.New(err.Error())
+	}
+
+	switch resp.StatusCode() {
+	case http.StatusOK:
+		return nil
+	default:
+		message, ok := responseMap["message"]
+		if !ok {
+			return errors.Errorf("%v", resp.Status())
+		}
+
+		return errors.Errorf("%v, %v", resp.Status(), message)
+	}
 }
 
-type TerminalWorkflowParams struct {
+type TerminateWorkflowParams struct {
 	Namespace string
 	Name      string
 }
 
-// TerminalWorkflow 终止工作流运行,不调用所有的退出处理器
-func (c *Client) TerminalWorkflow(params TerminalWorkflowParams) error {
-	return nil
+// TerminateWorkflow 终止工作流运行,不调用所有的退出处理器
+func (c *Client) TerminateWorkflow(params TerminateWorkflowParams) error {
+	responseMap := make(map[string]any)
+
+	resp, err := c.restyClient.R().
+		SetHeader("Content-Type", "application/json").
+		SetAuthToken(c.token).
+		SetPathParams(map[string]string{
+			"namespace": params.Namespace,
+			"name":      params.Name,
+		}).
+		SetBody(map[string]any{
+			"namespace": params.Namespace,
+			"name":      params.Name,
+		}).
+		SetResult(&responseMap).
+		SetError(&responseMap).
+		Put(terminateWorkflowRelativeUrl)
+	if err != nil {
+		return errors.New(err.Error())
+	}
+
+	switch resp.StatusCode() {
+	case http.StatusOK:
+		return nil
+	default:
+		message, ok := responseMap["message"]
+		if !ok {
+			return errors.Errorf("%v", resp.Status())
+		}
+
+		return errors.Errorf("%v, %v", resp.Status(), message)
+	}
 }
 
 type SetWorkflowParams struct {

+ 109 - 0
test/workflow_test.go

@@ -6,6 +6,7 @@ import (
 	"github.com/pkg/errors"
 	"gopkg.in/yaml.v3"
 	"testing"
+	"time"
 )
 
 func TestWorkflowBase(t *testing.T) {
@@ -134,3 +135,111 @@ func TestSubmitWorkflow(t *testing.T) {
 		t.Fatalf("%+v\n", errors.Errorf("参数值错误: %v", parametersMap[0].(map[string]any)["value"]))
 	}
 }
+
+func TestResubmitWorkflow(t *testing.T) {
+	argo.Init(baseUrl, token, client.WithTimeoutSec(10))
+	defer argo.Destroy()
+
+	workflowDefinition := make(map[string]any)
+	err := yaml.Unmarshal(workflowYamlStr, &workflowDefinition)
+	if err != nil {
+		panic(err)
+	}
+
+	workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
+		Namespace:          namespace,
+		WorkflowDefinition: workflowDefinition,
+	})
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+
+	defer func() {
+		err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
+			Namespace: namespace,
+			Name:      workflowName,
+		})
+		if err != nil {
+			t.Fatalf("%+v\n", err)
+		}
+	}()
+
+	resubmittedWorkflowName, err := argo.GetInstance().ResubmitWorkflow(client.ResubmitWorkflowParams{
+		Namespace: namespace,
+		Name:      workflowName,
+	})
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+
+	defer func() {
+		err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
+			Namespace: namespace,
+			Name:      resubmittedWorkflowName,
+		})
+		if err != nil {
+			t.Fatalf("%+v\n", err)
+		}
+	}()
+}
+
+func TestRetryWorkflow(t *testing.T) {
+	argo.Init(baseUrl, token, client.WithTimeoutSec(10))
+	defer argo.Destroy()
+
+	workflowDefinition := make(map[string]any)
+	err := yaml.Unmarshal(workflowYamlStr, &workflowDefinition)
+	if err != nil {
+		panic(err)
+	}
+
+	workflowName, err := argo.GetInstance().CreateWorkflow(client.CreateWorkflowParams{
+		Namespace:          namespace,
+		WorkflowDefinition: workflowDefinition,
+	})
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+
+	defer func() {
+		err := argo.GetInstance().DeleteWorkflow(client.DeleteWorkflowParams{
+			Namespace: namespace,
+			Name:      workflowName,
+		})
+		if err != nil {
+			t.Fatalf("%+v\n", err)
+		}
+	}()
+
+	err = argo.GetInstance().TerminateWorkflow(client.TerminateWorkflowParams{
+		Namespace: namespace,
+		Name:      workflowName,
+	})
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+
+	for {
+		currentWorkflowDefinitions, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
+			Namespace: namespace,
+			Name:      workflowName,
+		})
+		if err != nil {
+			t.Fatalf("%+v\n", err)
+		}
+
+		if currentWorkflowDefinitions["status"].(map[string]any)["phase"] != "Running" {
+			break
+		}
+
+		time.Sleep(1 * time.Second)
+	}
+
+	err = argo.GetInstance().RetryWorkflow(client.RetryWorkflowParams{
+		Namespace: namespace,
+		Name:      workflowName,
+	})
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+}