Browse Source

修改监控工作流

yjp 1 year ago
parent
commit
3360881f7a
2 changed files with 119 additions and 80 deletions
  1. 21 33
      client/watch_workflow.go
  2. 98 47
      test/workflow_test.go

+ 21 - 33
client/watch_workflow.go

@@ -9,37 +9,35 @@ import (
 	"github.com/pkg/errors"
 	"io"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
-	"sync"
 	"time"
 )
 
-type WatchWorkflowDoneToken struct {
-	wg                  *sync.WaitGroup
+type WatchWorkflowCallback func(wf *v1alpha1.Workflow)
+
+type WatchWorkflowToken struct {
+	workflowChan        chan *v1alpha1.Workflow
 	doneChan            chan any
 	streamReceiveCancel context.CancelFunc
 }
 
-func (doneToken *WatchWorkflowDoneToken) Done() {
-	go func() {
-		doneToken.streamReceiveCancel()
+func (token *WatchWorkflowToken) Done() {
+	token.streamReceiveCancel()
 
-		doneToken.doneChan <- nil
-		close(doneToken.doneChan)
-		doneToken.doneChan = nil
+	token.doneChan <- nil
+	close(token.doneChan)
+	token.doneChan = nil
 
-		doneToken.wg.Done()
-	}()
+	close(token.workflowChan)
+	token.workflowChan = nil
 }
 
-type WatchWorkflowCallback func(doneToken *WatchWorkflowDoneToken, wf *v1alpha1.Workflow)
-
 type WatchWorkflowParams struct {
 	Namespace string
 	Name      string
 }
 
 // WatchWorkflow 监听工作流
-func (c *Client) WatchWorkflow(params WatchWorkflowParams, callback WatchWorkflowCallback) error {
+func (c *Client) WatchWorkflow(params WatchWorkflowParams, callback WatchWorkflowCallback) (*WatchWorkflowToken, error) {
 	req := &workflow.WatchWorkflowsRequest{
 		Namespace: params.Namespace,
 		ListOptions: &metav1.ListOptions{
@@ -52,16 +50,11 @@ func (c *Client) WatchWorkflow(params WatchWorkflowParams, callback WatchWorkflo
 	stream, err := c.workflowService.WatchWorkflows(cancelCtx, req)
 	if err != nil {
 		cancel()
-		return errors.New(err.Error())
+		return nil, errors.New(err.Error())
 	}
 
-	workflowChan := make(chan *v1alpha1.Workflow)
-
-	wg := &sync.WaitGroup{}
-	wg.Add(1)
-
-	doneToken := &WatchWorkflowDoneToken{
-		wg:                  wg,
+	token := &WatchWorkflowToken{
+		workflowChan:        make(chan *v1alpha1.Workflow),
 		doneChan:            make(chan any),
 		streamReceiveCancel: cancel,
 	}
@@ -69,15 +62,15 @@ func (c *Client) WatchWorkflow(params WatchWorkflowParams, callback WatchWorkflo
 	go func() {
 		for {
 			select {
-			case <-doneToken.doneChan:
+			case <-token.doneChan:
 				return
-			case wf := <-workflowChan:
+			case wf := <-token.workflowChan:
 				if wf == nil {
 					continue
 				}
 
 				if callback != nil {
-					callback(doneToken, wf)
+					callback(wf)
 				}
 			}
 		}
@@ -101,7 +94,7 @@ func (c *Client) WatchWorkflow(params WatchWorkflowParams, callback WatchWorkflo
 						continue
 					}
 
-					doneToken.streamReceiveCancel = cancel
+					token.streamReceiveCancel = cancel
 
 					continue
 				}
@@ -114,14 +107,9 @@ func (c *Client) WatchWorkflow(params WatchWorkflowParams, callback WatchWorkflo
 				continue
 			}
 
-			workflowChan <- event.Object
+			token.workflowChan <- event.Object
 		}
 	}()
 
-	doneToken.wg.Wait()
-
-	close(workflowChan)
-	workflowChan = nil
-
-	return nil
+	return token, nil
 }

+ 98 - 47
test/workflow_test.go

@@ -5,6 +5,7 @@ import (
 	"git.sxidc.com/go-tools/argo-api/client"
 	"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
 	"github.com/pkg/errors"
+	"sync"
 	"testing"
 	"time"
 )
@@ -204,28 +205,40 @@ func TestRetryWorkflow(t *testing.T) {
 		}
 	}()
 
-	err = argo.GetInstance().TerminateWorkflow(client.TerminateWorkflowParams{
+	terminalCalled := false
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+
+	token, err := argo.GetInstance().WatchWorkflow(client.WatchWorkflowParams{
 		Namespace: namespace,
 		Name:      workflowName,
+	}, func(wf *v1alpha1.Workflow) {
+		if terminalCalled {
+			if wf.Status.Phase == "Running" {
+				return
+			}
+
+			wg.Done()
+		}
 	})
 	if err != nil {
 		t.Fatalf("%+v\n", err)
 	}
 
-	err = argo.GetInstance().WatchWorkflow(client.WatchWorkflowParams{
+	defer token.Done()
+
+	err = argo.GetInstance().TerminateWorkflow(client.TerminateWorkflowParams{
 		Namespace: namespace,
 		Name:      workflowName,
-	}, func(doneToken *client.WatchWorkflowDoneToken, wf *v1alpha1.Workflow) {
-		if wf.Status.Phase == "Running" {
-			return
-		}
-
-		doneToken.Done()
 	})
 	if err != nil {
 		t.Fatalf("%+v\n", err)
 	}
 
+	terminalCalled = true
+
+	wg.Wait()
+
 	err = argo.GetInstance().RetryWorkflow(client.RetryWorkflowParams{
 		Namespace: namespace,
 		Name:      workflowName,
@@ -261,27 +274,39 @@ func TestStopWorkflow(t *testing.T) {
 		}
 	}()
 
-	err = argo.GetInstance().StopWorkflow(client.StopWorkflowParams{
+	stopCalled := false
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+
+	token, err := argo.GetInstance().WatchWorkflow(client.WatchWorkflowParams{
 		Namespace: namespace,
 		Name:      workflowName,
+	}, func(wf *v1alpha1.Workflow) {
+		if stopCalled {
+			if wf.Status.Phase == "Running" {
+				return
+			}
+
+			wg.Done()
+		}
 	})
 	if err != nil {
 		t.Fatalf("%+v\n", err)
 	}
 
-	err = argo.GetInstance().WatchWorkflow(client.WatchWorkflowParams{
+	defer token.Done()
+
+	err = argo.GetInstance().StopWorkflow(client.StopWorkflowParams{
 		Namespace: namespace,
 		Name:      workflowName,
-	}, func(doneToken *client.WatchWorkflowDoneToken, wf *v1alpha1.Workflow) {
-		if wf.Status.Phase == "Running" {
-			return
-		}
-
-		doneToken.Done()
 	})
 	if err != nil {
 		t.Fatalf("%+v\n", err)
 	}
+
+	stopCalled = true
+
+	wg.Wait()
 }
 
 func TestTerminateWorkflow(t *testing.T) {
@@ -310,27 +335,39 @@ func TestTerminateWorkflow(t *testing.T) {
 		}
 	}()
 
-	err = argo.GetInstance().TerminateWorkflow(client.TerminateWorkflowParams{
+	terminalCalled := false
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+
+	token, err := argo.GetInstance().WatchWorkflow(client.WatchWorkflowParams{
 		Namespace: namespace,
 		Name:      workflowName,
+	}, func(wf *v1alpha1.Workflow) {
+		if terminalCalled {
+			if wf.Status.Phase == "Running" {
+				return
+			}
+
+			wg.Done()
+		}
 	})
 	if err != nil {
 		t.Fatalf("%+v\n", err)
 	}
 
-	err = argo.GetInstance().WatchWorkflow(client.WatchWorkflowParams{
+	defer token.Done()
+
+	err = argo.GetInstance().TerminateWorkflow(client.TerminateWorkflowParams{
 		Namespace: namespace,
 		Name:      workflowName,
-	}, func(doneToken *client.WatchWorkflowDoneToken, wf *v1alpha1.Workflow) {
-		if wf.Status.Phase == "Running" {
-			return
-		}
-
-		doneToken.Done()
 	})
 	if err != nil {
 		t.Fatalf("%+v\n", err)
 	}
+
+	terminalCalled = true
+
+	wg.Wait()
 }
 
 func TestSetWorkflow(t *testing.T) {
@@ -359,10 +396,15 @@ func TestSetWorkflow(t *testing.T) {
 		}
 	}()
 
-	err = argo.GetInstance().WatchWorkflow(client.WatchWorkflowParams{
-		Namespace: namespace,
-		Name:      workflowName,
-	}, func(doneToken *client.WatchWorkflowDoneToken, wf *v1alpha1.Workflow) {
+	for {
+		wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
+			Namespace: namespace,
+			Name:      workflowName,
+		})
+		if err != nil {
+			t.Fatalf("%+v\n", err)
+		}
+
 		findA := false
 		for _, node := range wf.Status.Nodes {
 			if node.DisplayName == "A" {
@@ -370,14 +412,9 @@ func TestSetWorkflow(t *testing.T) {
 			}
 		}
 
-		if !findA {
-			return
+		if findA {
+			break
 		}
-
-		doneToken.Done()
-	})
-	if err != nil {
-		t.Fatalf("%+v\n", err)
 	}
 
 	err = argo.GetInstance().SetWorkflow(client.SetWorkflowParams{
@@ -437,32 +474,45 @@ func TestSuspendAndResumeWorkflow(t *testing.T) {
 		}
 	}()
 
-	err = argo.GetInstance().SuspendWorkflow(client.SuspendWorkflowParams{
+	suspendCalled := false
+
+	wg1 := sync.WaitGroup{}
+	wg1.Add(1)
+
+	token, err := argo.GetInstance().WatchWorkflow(client.WatchWorkflowParams{
 		Namespace: namespace,
 		Name:      workflowName,
+	}, func(wf *v1alpha1.Workflow) {
+		if suspendCalled {
+			if wf.Status.Phase != "Running" {
+				return
+			}
+
+			if wf.Spec.Suspend == nil || *wf.Spec.Suspend != true {
+				return
+			}
+
+			wg1.Done()
+		}
 	})
 	if err != nil {
 		t.Fatalf("%+v\n", err)
 	}
 
-	err = argo.GetInstance().WatchWorkflow(client.WatchWorkflowParams{
+	defer token.Done()
+
+	err = argo.GetInstance().SuspendWorkflow(client.SuspendWorkflowParams{
 		Namespace: namespace,
 		Name:      workflowName,
-	}, func(doneToken *client.WatchWorkflowDoneToken, wf *v1alpha1.Workflow) {
-		if wf.Status.Phase != "Running" {
-			return
-		}
-
-		if wf.Spec.Suspend == nil || *wf.Spec.Suspend != true {
-			return
-		}
-
-		doneToken.Done()
 	})
 	if err != nil {
 		t.Fatalf("%+v\n", err)
 	}
 
+	suspendCalled = true
+
+	wg1.Wait()
+
 	err = argo.GetInstance().ResumeWorkflow(client.ResumeWorkflowParams{
 		Namespace: namespace,
 		Name:      workflowName,
@@ -471,6 +521,7 @@ func TestSuspendAndResumeWorkflow(t *testing.T) {
 		t.Fatalf("%+v\n", err)
 	}
 
+	// resume后不会推送消息
 	for {
 		wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
 			Namespace: namespace,