package client import ( "context" "fmt" "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow" "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/util" "github.com/pkg/errors" "io" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "strings" "sync" "time" ) type WatcherCallback func(wf *v1alpha1.Workflow) type workflowWatcherManager struct { watcherMapMutex *sync.Mutex watcherMap map[string]*workflowWatcher } func newWorkflowWatcherManager() *workflowWatcherManager { return &workflowWatcherManager{ watcherMapMutex: &sync.Mutex{}, watcherMap: make(map[string]*workflowWatcher), } } func (manager *workflowWatcherManager) addWatcher(namespace string, name string, watcher *workflowWatcher) { manager.watcherMapMutex.Lock() defer manager.watcherMapMutex.Unlock() manager.watcherMap[manager.formKey(namespace, name)] = watcher } func (manager *workflowWatcherManager) removeWatcher(namespace string, name string) (*workflowWatcher, bool) { manager.watcherMapMutex.Lock() defer manager.watcherMapMutex.Unlock() key := manager.formKey(namespace, name) value, ok := manager.watcherMap[key] if ok { delete(manager.watcherMap, key) } return value, ok } func (manager *workflowWatcherManager) clearWatchers(f func(namespace string, name string, watcher *workflowWatcher) bool) { manager.watcherMapMutex.Lock() defer manager.watcherMapMutex.Unlock() for key, value := range manager.watcherMap { namespace, name := manager.splitKey(key) notStop := f(namespace, name, value) if !notStop { break } } manager.watcherMap = make(map[string]*workflowWatcher) } func (manager *workflowWatcherManager) formKey(namespace string, name string) string { return strings.Join([]string{namespace, name}, "::") } func (manager *workflowWatcherManager) splitKey(key string) (namespace string, name string) { keyParts := strings.Split(key, "::") namespace = keyParts[0] name = keyParts[1] return } type workflowWatcher struct { workflowChan chan *v1alpha1.Workflow doneChan chan any streamReceiveCancel context.CancelFunc } func (watcher *workflowWatcher) close() { // 停止接收数据协程 watcher.streamReceiveCancel() // 停止处理数据协程 watcher.doneChan <- nil close(watcher.doneChan) watcher.doneChan = nil // 关闭数据通道 close(watcher.workflowChan) watcher.workflowChan = nil } func (c *Client) RegisterWorkflowWatcher(namespace string, name string, callback WatcherCallback) error { req := &workflow.WatchWorkflowsRequest{ Namespace: namespace, ListOptions: &metav1.ListOptions{ FieldSelector: util.GenerateFieldSelectorFromWorkflowName(name), ResourceVersion: "0", }, } cancelCtx, cancel := context.WithCancel(c.ctx) stream, err := c.workflowService.WatchWorkflows(cancelCtx, req) if err != nil { cancel() return errors.New(err.Error()) } watcher := &workflowWatcher{ workflowChan: make(chan *v1alpha1.Workflow), doneChan: make(chan any), streamReceiveCancel: cancel, } go func() { for { select { case <-watcher.doneChan: return case wf := <-watcher.workflowChan: if wf == nil { continue } if callback != nil { callback(wf) } } } }() go func() { for { event, err := stream.Recv() if err != nil { if err == io.EOF { if stream.Context().Err().Error() == "context canceled" { return } cancelCtx, cancel := context.WithCancel(c.ctx) stream, err = c.workflowService.WatchWorkflows(cancelCtx, req) if err != nil { cancel() fmt.Printf("%v\n", errors.New(err.Error())) time.Sleep(5 * time.Second) continue } watcher.streamReceiveCancel = cancel continue } fmt.Printf("%v\n", errors.New(err.Error())) return } if event == nil { continue } watcher.workflowChan <- event.Object } }() c.workflowWatcherManager.addWatcher(namespace, name, watcher) return nil } func (c *Client) UnregisterWorkflowWatcher(namespace string, name string) { watcher, loaded := c.workflowWatcherManager.removeWatcher(namespace, name) if !loaded { return } watcher.close() } func (c *Client) unregisterAllWorkflowWatchers() { c.workflowWatcherManager.clearWatchers(func(namespace string, name string, watcher *workflowWatcher) bool { watcher.close() return true }) }