watch_workflow.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow"
  6. "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
  7. "github.com/argoproj/argo-workflows/v3/util"
  8. "github.com/pkg/errors"
  9. "io"
  10. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  11. "sync"
  12. "time"
  13. )
  14. type WatchWorkflowDoneToken struct {
  15. wg *sync.WaitGroup
  16. }
  17. func (doneToken *WatchWorkflowDoneToken) Done() {
  18. doneToken.wg.Done()
  19. }
  20. type WatchWorkflowCallback func(doneToken *WatchWorkflowDoneToken, wf *v1alpha1.Workflow)
  21. type WatchWorkflowParams struct {
  22. Namespace string
  23. Name string
  24. }
  25. // WatchWorkflow 监听工作流
  26. func (c *Client) WatchWorkflow(params WatchWorkflowParams, callback WatchWorkflowCallback) error {
  27. var finalCancel context.CancelFunc
  28. req := &workflow.WatchWorkflowsRequest{
  29. Namespace: params.Namespace,
  30. ListOptions: &metav1.ListOptions{
  31. FieldSelector: util.GenerateFieldSelectorFromWorkflowName(params.Name),
  32. ResourceVersion: "0",
  33. },
  34. }
  35. cancelCtx, cancel := context.WithCancel(c.ctx)
  36. stream, err := c.workflowService.WatchWorkflows(cancelCtx, req)
  37. if err != nil {
  38. cancel()
  39. return errors.New(err.Error())
  40. }
  41. finalCancel = cancel
  42. workflowChan := make(chan *v1alpha1.Workflow)
  43. doneChan := make(chan any)
  44. wg := &sync.WaitGroup{}
  45. wg.Add(1)
  46. doneToken := &WatchWorkflowDoneToken{
  47. wg: wg,
  48. }
  49. go func() {
  50. for {
  51. select {
  52. case <-doneChan:
  53. return
  54. case wf := <-workflowChan:
  55. if wf == nil {
  56. continue
  57. }
  58. if callback != nil {
  59. callback(doneToken, wf)
  60. }
  61. }
  62. }
  63. }()
  64. go func() {
  65. for {
  66. event, err := stream.Recv()
  67. if err != nil {
  68. if err == io.EOF {
  69. if stream.Context().Err().Error() == "context canceled" {
  70. return
  71. }
  72. cancelCtx, cancel := context.WithCancel(c.ctx)
  73. stream, err = c.workflowService.WatchWorkflows(cancelCtx, req)
  74. if err != nil {
  75. cancel()
  76. fmt.Printf("%v\n", errors.New(err.Error()))
  77. time.Sleep(5 * time.Second)
  78. continue
  79. }
  80. finalCancel = cancel
  81. continue
  82. }
  83. fmt.Printf("%v\n", errors.New(err.Error()))
  84. return
  85. }
  86. if event == nil {
  87. continue
  88. }
  89. workflowChan <- event.Object
  90. }
  91. }()
  92. doneToken.wg.Wait()
  93. doneChan <- nil
  94. finalCancel()
  95. close(doneChan)
  96. doneChan = nil
  97. close(workflowChan)
  98. workflowChan = nil
  99. return nil
  100. }