123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- package client
- import (
- "context"
- "fmt"
- "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow"
- "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
- "github.com/argoproj/argo-workflows/v3/util"
- "github.com/pkg/errors"
- "io"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "strings"
- "sync"
- "time"
- )
- type WatcherCallback func(wf *v1alpha1.Workflow)
- type workflowWatcherManager struct {
- watcherMapMutex *sync.Mutex
- watcherMap map[string]*workflowWatcher
- }
- func newWorkflowWatcherManager() *workflowWatcherManager {
- return &workflowWatcherManager{
- watcherMapMutex: &sync.Mutex{},
- watcherMap: make(map[string]*workflowWatcher),
- }
- }
- func (manager *workflowWatcherManager) addWatcher(namespace string, name string, watcher *workflowWatcher) {
- manager.watcherMapMutex.Lock()
- defer manager.watcherMapMutex.Unlock()
- manager.watcherMap[manager.formKey(namespace, name)] = watcher
- }
- func (manager *workflowWatcherManager) removeWatcher(namespace string, name string) (*workflowWatcher, bool) {
- manager.watcherMapMutex.Lock()
- defer manager.watcherMapMutex.Unlock()
- key := manager.formKey(namespace, name)
- value, ok := manager.watcherMap[key]
- if ok {
- delete(manager.watcherMap, key)
- }
- return value, ok
- }
- func (manager *workflowWatcherManager) clearWatchers(f func(namespace string, name string, watcher *workflowWatcher) bool) {
- manager.watcherMapMutex.Lock()
- defer manager.watcherMapMutex.Unlock()
- for key, value := range manager.watcherMap {
- namespace, name := manager.splitKey(key)
- notStop := f(namespace, name, value)
- if !notStop {
- break
- }
- }
- manager.watcherMap = make(map[string]*workflowWatcher)
- }
- func (manager *workflowWatcherManager) formKey(namespace string, name string) string {
- return strings.Join([]string{namespace, name}, "::")
- }
- func (manager *workflowWatcherManager) splitKey(key string) (namespace string, name string) {
- keyParts := strings.Split(key, "::")
- namespace = keyParts[0]
- name = keyParts[1]
- return
- }
- type workflowWatcher struct {
- workflowChan chan *v1alpha1.Workflow
- doneChan chan any
- streamReceiveCancel context.CancelFunc
- }
- func (watcher *workflowWatcher) close() {
- // 停止接收数据协程
- watcher.streamReceiveCancel()
- // 停止处理数据协程
- watcher.doneChan <- nil
- close(watcher.doneChan)
- watcher.doneChan = nil
- // 关闭数据通道
- close(watcher.workflowChan)
- watcher.workflowChan = nil
- }
- func (c *Client) RegisterWorkflowWatcher(namespace string, name string, callback WatcherCallback) error {
- req := &workflow.WatchWorkflowsRequest{
- Namespace: namespace,
- ListOptions: &metav1.ListOptions{
- FieldSelector: util.GenerateFieldSelectorFromWorkflowName(name),
- ResourceVersion: "0",
- },
- }
- cancelCtx, cancel := context.WithCancel(c.ctx)
- stream, err := c.workflowService.WatchWorkflows(cancelCtx, req)
- if err != nil {
- cancel()
- return errors.New(err.Error())
- }
- watcher := &workflowWatcher{
- workflowChan: make(chan *v1alpha1.Workflow),
- doneChan: make(chan any),
- streamReceiveCancel: cancel,
- }
- go func() {
- for {
- select {
- case <-watcher.doneChan:
- return
- case wf := <-watcher.workflowChan:
- if wf == nil {
- continue
- }
- if callback != nil {
- callback(wf)
- }
- }
- }
- }()
- go func() {
- for {
- event, err := stream.Recv()
- if err != nil {
- if err == io.EOF {
- if stream.Context().Err().Error() == "context canceled" {
- return
- }
- cancelCtx, cancel := context.WithCancel(c.ctx)
- stream, err = c.workflowService.WatchWorkflows(cancelCtx, req)
- if err != nil {
- cancel()
- fmt.Printf("%v\n", errors.New(err.Error()))
- time.Sleep(5 * time.Second)
- continue
- }
- watcher.streamReceiveCancel = cancel
- continue
- }
- fmt.Printf("%v\n", errors.New(err.Error()))
- return
- }
- if event == nil {
- continue
- }
- watcher.workflowChan <- event.Object
- }
- }()
- c.workflowWatcherManager.addWatcher(namespace, name, watcher)
- return nil
- }
- func (c *Client) UnregisterWorkflowWatcher(namespace string, name string) {
- watcher, loaded := c.workflowWatcherManager.removeWatcher(namespace, name)
- if !loaded {
- return
- }
- watcher.close()
- }
- func (c *Client) unregisterAllWorkflowWatchers() {
- c.workflowWatcherManager.clearWatchers(func(namespace string, name string, watcher *workflowWatcher) bool {
- watcher.close()
- return true
- })
- }
|