package client import ( "context" "fmt" "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow" "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/util" "github.com/pkg/errors" "io" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sync" "time" ) type WatchWorkflowDoneToken struct { wg *sync.WaitGroup } func (doneToken *WatchWorkflowDoneToken) Done() { doneToken.wg.Done() } type WatchWorkflowCallback func(doneToken *WatchWorkflowDoneToken, wf *v1alpha1.Workflow) type WatchWorkflowParams struct { Namespace string Name string } // WatchWorkflow 监听工作流 func (c *Client) WatchWorkflow(params WatchWorkflowParams, callback WatchWorkflowCallback) error { var finalCancel context.CancelFunc req := &workflow.WatchWorkflowsRequest{ Namespace: params.Namespace, ListOptions: &metav1.ListOptions{ FieldSelector: util.GenerateFieldSelectorFromWorkflowName(params.Name), ResourceVersion: "0", }, } cancelCtx, cancel := context.WithCancel(c.ctx) stream, err := c.workflowService.WatchWorkflows(cancelCtx, req) if err != nil { cancel() return errors.New(err.Error()) } finalCancel = cancel workflowChan := make(chan *v1alpha1.Workflow) doneChan := make(chan any) wg := &sync.WaitGroup{} wg.Add(1) doneToken := &WatchWorkflowDoneToken{ wg: wg, } go func() { for { select { case <-doneChan: return case wf := <-workflowChan: if wf == nil { continue } if callback != nil { callback(doneToken, wf) } } } }() go func() { for { event, err := stream.Recv() if err != nil { if err == io.EOF { if stream.Context().Err().Error() == "context canceled" { return } cancelCtx, cancel := context.WithCancel(c.ctx) stream, err = c.workflowService.WatchWorkflows(cancelCtx, req) if err != nil { cancel() fmt.Printf("%v\n", errors.New(err.Error())) time.Sleep(5 * time.Second) continue } finalCancel = cancel continue } fmt.Printf("%v\n", errors.New(err.Error())) return } if event == nil { continue } workflowChan <- event.Object } }() doneToken.wg.Wait() doneChan <- nil finalCancel() close(doneChan) doneChan = nil close(workflowChan) workflowChan = nil return nil }