watch_workflow.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow"
  6. "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
  7. "github.com/argoproj/argo-workflows/v3/util"
  8. "github.com/pkg/errors"
  9. "io"
  10. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  11. "sync"
  12. "time"
  13. )
  14. type WatchWorkflowDoneToken struct {
  15. wg *sync.WaitGroup
  16. doneChan chan any
  17. streamReceiveCancel context.CancelFunc
  18. }
  19. func (doneToken *WatchWorkflowDoneToken) Done() {
  20. go func() {
  21. doneToken.streamReceiveCancel()
  22. doneToken.doneChan <- nil
  23. close(doneToken.doneChan)
  24. doneToken.doneChan = nil
  25. doneToken.wg.Done()
  26. }()
  27. }
  28. type WatchWorkflowCallback func(doneToken *WatchWorkflowDoneToken, wf *v1alpha1.Workflow)
  29. type WatchWorkflowParams struct {
  30. Namespace string
  31. Name string
  32. }
  33. // WatchWorkflow 监听工作流
  34. func (c *Client) WatchWorkflow(params WatchWorkflowParams, callback WatchWorkflowCallback) error {
  35. req := &workflow.WatchWorkflowsRequest{
  36. Namespace: params.Namespace,
  37. ListOptions: &metav1.ListOptions{
  38. FieldSelector: util.GenerateFieldSelectorFromWorkflowName(params.Name),
  39. ResourceVersion: "0",
  40. },
  41. }
  42. cancelCtx, cancel := context.WithCancel(c.ctx)
  43. stream, err := c.workflowService.WatchWorkflows(cancelCtx, req)
  44. if err != nil {
  45. cancel()
  46. return errors.New(err.Error())
  47. }
  48. workflowChan := make(chan *v1alpha1.Workflow)
  49. wg := &sync.WaitGroup{}
  50. wg.Add(1)
  51. doneToken := &WatchWorkflowDoneToken{
  52. wg: wg,
  53. doneChan: make(chan any),
  54. streamReceiveCancel: cancel,
  55. }
  56. go func() {
  57. for {
  58. select {
  59. case <-doneToken.doneChan:
  60. return
  61. case wf := <-workflowChan:
  62. if wf == nil {
  63. continue
  64. }
  65. if callback != nil {
  66. callback(doneToken, wf)
  67. }
  68. }
  69. }
  70. }()
  71. go func() {
  72. for {
  73. event, err := stream.Recv()
  74. if err != nil {
  75. if err == io.EOF {
  76. if stream.Context().Err().Error() == "context canceled" {
  77. return
  78. }
  79. cancelCtx, cancel := context.WithCancel(c.ctx)
  80. stream, err = c.workflowService.WatchWorkflows(cancelCtx, req)
  81. if err != nil {
  82. cancel()
  83. fmt.Printf("%v\n", errors.New(err.Error()))
  84. time.Sleep(5 * time.Second)
  85. continue
  86. }
  87. doneToken.streamReceiveCancel = cancel
  88. continue
  89. }
  90. fmt.Printf("%v\n", errors.New(err.Error()))
  91. return
  92. }
  93. if event == nil {
  94. continue
  95. }
  96. workflowChan <- event.Object
  97. }
  98. }()
  99. doneToken.wg.Wait()
  100. close(workflowChan)
  101. workflowChan = nil
  102. return nil
  103. }