Browse Source

完成监听测试

yjp 9 months ago
parent
commit
d755ad5c26
5 changed files with 152 additions and 24 deletions
  1. 1 0
      argo.go
  2. 14 0
      client/client.go
  3. 126 0
      client/watch_workflow.go
  4. 0 12
      client/workflow.go
  5. 11 12
      test/workflow_test.go

+ 1 - 0
argo.go

@@ -24,6 +24,7 @@ func Destroy() {
 		return
 	}
 
+	client.Destroy(clientInstance)
 	clientInstance = nil
 }
 

+ 14 - 0
client/client.go

@@ -8,8 +8,11 @@ import (
 	"github.com/pkg/errors"
 	"k8s.io/client-go/tools/clientcmd"
 	"os"
+	"sync"
 )
 
+var watchWorkflowTokenMap sync.Map
+
 type Client struct {
 	ctx                     context.Context
 	client                  apiclient.Client
@@ -46,3 +49,14 @@ func NewClient(kubeConfigEnv string) (*Client, error) {
 		workflowService:         apiClient.NewWorkflowServiceClient(),
 	}, nil
 }
+
+func Destroy(c *Client) {
+	if c == nil {
+		return
+	}
+
+	c.ctx.Done()
+	c.client = nil
+	c.workflowTemplateService = nil
+	c.workflowService = nil
+}

+ 126 - 0
client/watch_workflow.go

@@ -0,0 +1,126 @@
+package client
+
+import (
+	"context"
+	"fmt"
+	"github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow"
+	"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
+	"github.com/argoproj/argo-workflows/v3/util"
+	"github.com/pkg/errors"
+	"io"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"sync"
+	"time"
+)
+
+type WatchWorkflowDoneToken struct {
+	wg *sync.WaitGroup
+}
+
+func (doneToken *WatchWorkflowDoneToken) Done() {
+	doneToken.wg.Done()
+}
+
+type WatchWorkflowCallback func(doneToken *WatchWorkflowDoneToken, wf *v1alpha1.Workflow)
+
+type WatchWorkflowParams struct {
+	Namespace string
+	Name      string
+}
+
+// WatchWorkflow 监听工作流
+func (c *Client) WatchWorkflow(params WatchWorkflowParams, callback WatchWorkflowCallback) error {
+	var finalCancel context.CancelFunc
+
+	req := &workflow.WatchWorkflowsRequest{
+		Namespace: params.Namespace,
+		ListOptions: &metav1.ListOptions{
+			FieldSelector:   util.GenerateFieldSelectorFromWorkflowName(params.Name),
+			ResourceVersion: "0",
+		},
+	}
+
+	cancelCtx, cancel := context.WithCancel(c.ctx)
+	stream, err := c.workflowService.WatchWorkflows(cancelCtx, req)
+	if err != nil {
+		cancel()
+		return errors.New(err.Error())
+	}
+
+	finalCancel = cancel
+
+	workflowChan := make(chan *v1alpha1.Workflow)
+	doneChan := make(chan any)
+
+	wg := &sync.WaitGroup{}
+	wg.Add(1)
+
+	doneToken := &WatchWorkflowDoneToken{
+		wg: wg,
+	}
+
+	go func() {
+		for {
+			select {
+			case <-doneChan:
+				return
+			case wf := <-workflowChan:
+				if wf == nil {
+					continue
+				}
+
+				if callback != nil {
+					callback(doneToken, wf)
+				}
+			}
+		}
+	}()
+
+	go func() {
+		for {
+			event, err := stream.Recv()
+			if err != nil {
+				if err == io.EOF {
+					if stream.Context().Err().Error() == "context canceled" {
+						return
+					}
+
+					cancelCtx, cancel := context.WithCancel(c.ctx)
+					stream, err = c.workflowService.WatchWorkflows(cancelCtx, req)
+					if err != nil {
+						cancel()
+						fmt.Printf("%v\n", errors.New(err.Error()))
+						time.Sleep(5 * time.Second)
+						continue
+					}
+
+					finalCancel = cancel
+
+					continue
+				}
+
+				fmt.Printf("%v\n", errors.New(err.Error()))
+				return
+			}
+
+			if event == nil {
+				continue
+			}
+
+			workflowChan <- event.Object
+		}
+	}()
+
+	doneToken.wg.Wait()
+	doneChan <- nil
+
+	finalCancel()
+
+	close(doneChan)
+	doneChan = nil
+
+	close(workflowChan)
+	workflowChan = nil
+
+	return nil
+}

+ 0 - 12
client/workflow.go

@@ -280,15 +280,3 @@ func (c *Client) ResumeWorkflow(params ResumeWorkflowParams) error {
 
 	return nil
 }
-
-type EventCallback func(event map[string]any, eventErr error) error
-
-type GetWorkflowEventsStreamParams struct {
-	Namespace string
-	Name      string
-}
-
-// GetWorkflowEventsStream 监听工作流事件
-func (c *Client) GetWorkflowEventsStream(params GetWorkflowEventsStreamParams, callback EventCallback) error {
-	return nil
-}

+ 11 - 12
test/workflow_test.go

@@ -3,6 +3,7 @@ package test
 import (
 	"git.sxidc.com/go-tools/argo-api"
 	"git.sxidc.com/go-tools/argo-api/client"
+	"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
 	"github.com/pkg/errors"
 	"testing"
 	"time"
@@ -211,20 +212,18 @@ func TestRetryWorkflow(t *testing.T) {
 		t.Fatalf("%+v\n", err)
 	}
 
-	for {
-		wf, err := argo.GetInstance().GetWorkflow(client.GetWorkflowParams{
-			Namespace: namespace,
-			Name:      workflowName,
-		})
-		if err != nil {
-			t.Fatalf("%+v\n", err)
-		}
-
-		if wf.Status.Phase != "Running" {
-			break
+	err = argo.GetInstance().WatchWorkflow(client.WatchWorkflowParams{
+		Namespace: namespace,
+		Name:      workflowName,
+	}, func(doneToken *client.WatchWorkflowDoneToken, wf *v1alpha1.Workflow) {
+		if wf.Status.Phase == "Running" {
+			return
 		}
 
-		time.Sleep(1 * time.Second)
+		doneToken.Done()
+	})
+	if err != nil {
+		t.Fatalf("%+v\n", err)
 	}
 
 	err = argo.GetInstance().RetryWorkflow(client.RetryWorkflowParams{