workflow_watcher.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow"
  6. "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
  7. "github.com/argoproj/argo-workflows/v3/util"
  8. "github.com/pkg/errors"
  9. "io"
  10. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  11. "strings"
  12. "sync"
  13. "time"
  14. )
  15. type WatcherCallback func(wf *v1alpha1.Workflow)
  16. type workflowWatcherManager struct {
  17. watcherMapMutex *sync.Mutex
  18. watcherMap map[string]*workflowWatcher
  19. }
  20. func newWorkflowWatcherManager() *workflowWatcherManager {
  21. return &workflowWatcherManager{
  22. watcherMapMutex: &sync.Mutex{},
  23. watcherMap: make(map[string]*workflowWatcher),
  24. }
  25. }
  26. func (manager *workflowWatcherManager) addWatcher(namespace string, name string, watcher *workflowWatcher) {
  27. manager.watcherMapMutex.Lock()
  28. defer manager.watcherMapMutex.Unlock()
  29. manager.watcherMap[manager.formKey(namespace, name)] = watcher
  30. }
  31. func (manager *workflowWatcherManager) removeWatcher(namespace string, name string) (*workflowWatcher, bool) {
  32. manager.watcherMapMutex.Lock()
  33. defer manager.watcherMapMutex.Unlock()
  34. key := manager.formKey(namespace, name)
  35. value, ok := manager.watcherMap[key]
  36. if ok {
  37. delete(manager.watcherMap, key)
  38. }
  39. return value, ok
  40. }
  41. func (manager *workflowWatcherManager) clearWatchers(f func(namespace string, name string, watcher *workflowWatcher) bool) {
  42. manager.watcherMapMutex.Lock()
  43. defer manager.watcherMapMutex.Unlock()
  44. for key, value := range manager.watcherMap {
  45. namespace, name := manager.splitKey(key)
  46. notStop := f(namespace, name, value)
  47. if !notStop {
  48. break
  49. }
  50. }
  51. manager.watcherMap = make(map[string]*workflowWatcher)
  52. }
  53. func (manager *workflowWatcherManager) formKey(namespace string, name string) string {
  54. return strings.Join([]string{namespace, name}, "::")
  55. }
  56. func (manager *workflowWatcherManager) splitKey(key string) (namespace string, name string) {
  57. keyParts := strings.Split(key, "::")
  58. namespace = keyParts[0]
  59. name = keyParts[1]
  60. return
  61. }
  62. type workflowWatcher struct {
  63. workflowChan chan *v1alpha1.Workflow
  64. doneChan chan any
  65. streamReceiveCancel context.CancelFunc
  66. }
  67. func (watcher *workflowWatcher) close() {
  68. // 停止接收数据协程
  69. watcher.streamReceiveCancel()
  70. // 停止处理数据协程
  71. watcher.doneChan <- nil
  72. close(watcher.doneChan)
  73. watcher.doneChan = nil
  74. // 关闭数据通道
  75. close(watcher.workflowChan)
  76. watcher.workflowChan = nil
  77. }
  78. func (c *Client) RegisterWorkflowWatcher(namespace string, name string, callback WatcherCallback) error {
  79. req := &workflow.WatchWorkflowsRequest{
  80. Namespace: namespace,
  81. ListOptions: &metav1.ListOptions{
  82. FieldSelector: util.GenerateFieldSelectorFromWorkflowName(name),
  83. ResourceVersion: "0",
  84. },
  85. }
  86. cancelCtx, cancel := context.WithCancel(c.ctx)
  87. stream, err := c.workflowService.WatchWorkflows(cancelCtx, req)
  88. if err != nil {
  89. cancel()
  90. return errors.New(err.Error())
  91. }
  92. watcher := &workflowWatcher{
  93. workflowChan: make(chan *v1alpha1.Workflow),
  94. doneChan: make(chan any),
  95. streamReceiveCancel: cancel,
  96. }
  97. go func() {
  98. for {
  99. select {
  100. case <-watcher.doneChan:
  101. return
  102. case wf := <-watcher.workflowChan:
  103. if wf == nil {
  104. continue
  105. }
  106. if callback != nil {
  107. callback(wf)
  108. }
  109. }
  110. }
  111. }()
  112. go func() {
  113. for {
  114. event, err := stream.Recv()
  115. if err != nil {
  116. if err == io.EOF {
  117. if stream.Context().Err().Error() == "context canceled" {
  118. return
  119. }
  120. cancelCtx, cancel := context.WithCancel(c.ctx)
  121. stream, err = c.workflowService.WatchWorkflows(cancelCtx, req)
  122. if err != nil {
  123. cancel()
  124. fmt.Printf("%v\n", errors.New(err.Error()))
  125. time.Sleep(5 * time.Second)
  126. continue
  127. }
  128. watcher.streamReceiveCancel = cancel
  129. continue
  130. }
  131. fmt.Printf("%v\n", errors.New(err.Error()))
  132. return
  133. }
  134. if event == nil {
  135. continue
  136. }
  137. watcher.workflowChan <- event.Object
  138. }
  139. }()
  140. c.workflowWatcherManager.addWatcher(namespace, name, watcher)
  141. return nil
  142. }
  143. func (c *Client) UnregisterWorkflowWatcher(namespace string, name string) {
  144. watcher, loaded := c.workflowWatcherManager.removeWatcher(namespace, name)
  145. if !loaded {
  146. return
  147. }
  148. watcher.close()
  149. }
  150. func (c *Client) unregisterAllWorkflowWatchers() {
  151. c.workflowWatcherManager.clearWatchers(func(namespace string, name string, watcher *workflowWatcher) bool {
  152. watcher.close()
  153. return true
  154. })
  155. }