package client import ( "context" "fmt" "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow" "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/util" "github.com/pkg/errors" "io" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sync" "time" ) type WatchWorkflowDoneToken struct { wg *sync.WaitGroup doneChan chan any streamReceiveCancel context.CancelFunc } func (doneToken *WatchWorkflowDoneToken) Done() { go func() { doneToken.streamReceiveCancel() doneToken.doneChan <- nil close(doneToken.doneChan) doneToken.doneChan = nil doneToken.wg.Done() }() } type WatchWorkflowCallback func(doneToken *WatchWorkflowDoneToken, wf *v1alpha1.Workflow) type WatchWorkflowParams struct { Namespace string Name string } // WatchWorkflow 监听工作流 func (c *Client) WatchWorkflow(params WatchWorkflowParams, callback WatchWorkflowCallback) error { req := &workflow.WatchWorkflowsRequest{ Namespace: params.Namespace, ListOptions: &metav1.ListOptions{ FieldSelector: util.GenerateFieldSelectorFromWorkflowName(params.Name), ResourceVersion: "0", }, } cancelCtx, cancel := context.WithCancel(c.ctx) stream, err := c.workflowService.WatchWorkflows(cancelCtx, req) if err != nil { cancel() return errors.New(err.Error()) } workflowChan := make(chan *v1alpha1.Workflow) wg := &sync.WaitGroup{} wg.Add(1) doneToken := &WatchWorkflowDoneToken{ wg: wg, doneChan: make(chan any), streamReceiveCancel: cancel, } go func() { for { select { case <-doneToken.doneChan: return case wf := <-workflowChan: if wf == nil { continue } if callback != nil { callback(doneToken, wf) } } } }() go func() { for { event, err := stream.Recv() if err != nil { if err == io.EOF { if stream.Context().Err().Error() == "context canceled" { return } cancelCtx, cancel := context.WithCancel(c.ctx) stream, err = c.workflowService.WatchWorkflows(cancelCtx, req) if err != nil { cancel() fmt.Printf("%v\n", errors.New(err.Error())) time.Sleep(5 * time.Second) continue } doneToken.streamReceiveCancel = cancel continue } fmt.Printf("%v\n", errors.New(err.Error())) return } if event == nil { continue } workflowChan <- event.Object } }() doneToken.wg.Wait() close(workflowChan) workflowChan = nil return nil }