watch_workflow.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow"
  6. "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
  7. "github.com/argoproj/argo-workflows/v3/util"
  8. "github.com/pkg/errors"
  9. "io"
  10. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  11. "time"
  12. )
  13. type WatchWorkflowCallback func(wf *v1alpha1.Workflow)
  14. type WatchWorkflowToken struct {
  15. workflowChan chan *v1alpha1.Workflow
  16. doneChan chan any
  17. streamReceiveCancel context.CancelFunc
  18. }
  19. func (token *WatchWorkflowToken) Done() {
  20. token.streamReceiveCancel()
  21. token.doneChan <- nil
  22. close(token.doneChan)
  23. token.doneChan = nil
  24. close(token.workflowChan)
  25. token.workflowChan = nil
  26. }
  27. type WatchWorkflowParams struct {
  28. Namespace string
  29. Name string
  30. }
  31. // WatchWorkflow 监听工作流
  32. func (c *Client) WatchWorkflow(params WatchWorkflowParams, callback WatchWorkflowCallback) (*WatchWorkflowToken, error) {
  33. req := &workflow.WatchWorkflowsRequest{
  34. Namespace: params.Namespace,
  35. ListOptions: &metav1.ListOptions{
  36. FieldSelector: util.GenerateFieldSelectorFromWorkflowName(params.Name),
  37. ResourceVersion: "0",
  38. },
  39. }
  40. cancelCtx, cancel := context.WithCancel(c.ctx)
  41. stream, err := c.workflowService.WatchWorkflows(cancelCtx, req)
  42. if err != nil {
  43. cancel()
  44. return nil, errors.New(err.Error())
  45. }
  46. token := &WatchWorkflowToken{
  47. workflowChan: make(chan *v1alpha1.Workflow),
  48. doneChan: make(chan any),
  49. streamReceiveCancel: cancel,
  50. }
  51. go func() {
  52. for {
  53. select {
  54. case <-token.doneChan:
  55. return
  56. case wf := <-token.workflowChan:
  57. if wf == nil {
  58. continue
  59. }
  60. if callback != nil {
  61. callback(wf)
  62. }
  63. }
  64. }
  65. }()
  66. go func() {
  67. for {
  68. event, err := stream.Recv()
  69. if err != nil {
  70. if err == io.EOF {
  71. if stream.Context().Err().Error() == "context canceled" {
  72. return
  73. }
  74. cancelCtx, cancel := context.WithCancel(c.ctx)
  75. stream, err = c.workflowService.WatchWorkflows(cancelCtx, req)
  76. if err != nil {
  77. cancel()
  78. fmt.Printf("%v\n", errors.New(err.Error()))
  79. time.Sleep(5 * time.Second)
  80. continue
  81. }
  82. token.streamReceiveCancel = cancel
  83. continue
  84. }
  85. fmt.Printf("%v\n", errors.New(err.Error()))
  86. return
  87. }
  88. if event == nil {
  89. continue
  90. }
  91. token.workflowChan <- event.Object
  92. }
  93. }()
  94. return token, nil
  95. }