|
|
@@ -14,11 +14,21 @@ import (
|
|
|
)
|
|
|
|
|
|
type WatchWorkflowDoneToken struct {
|
|
|
- wg *sync.WaitGroup
|
|
|
+ wg *sync.WaitGroup
|
|
|
+ doneChan chan any
|
|
|
+ streamReceiveCancel context.CancelFunc
|
|
|
}
|
|
|
|
|
|
func (doneToken *WatchWorkflowDoneToken) Done() {
|
|
|
- doneToken.wg.Done()
|
|
|
+ go func() {
|
|
|
+ doneToken.streamReceiveCancel()
|
|
|
+
|
|
|
+ doneToken.doneChan <- nil
|
|
|
+ close(doneToken.doneChan)
|
|
|
+ doneToken.doneChan = nil
|
|
|
+
|
|
|
+ doneToken.wg.Done()
|
|
|
+ }()
|
|
|
}
|
|
|
|
|
|
type WatchWorkflowCallback func(doneToken *WatchWorkflowDoneToken, wf *v1alpha1.Workflow)
|
|
|
@@ -30,8 +40,6 @@ type WatchWorkflowParams struct {
|
|
|
|
|
|
// WatchWorkflow 监听工作流
|
|
|
func (c *Client) WatchWorkflow(params WatchWorkflowParams, callback WatchWorkflowCallback) error {
|
|
|
- var finalCancel context.CancelFunc
|
|
|
-
|
|
|
req := &workflow.WatchWorkflowsRequest{
|
|
|
Namespace: params.Namespace,
|
|
|
ListOptions: &metav1.ListOptions{
|
|
|
@@ -47,22 +55,21 @@ func (c *Client) WatchWorkflow(params WatchWorkflowParams, callback WatchWorkflo
|
|
|
return errors.New(err.Error())
|
|
|
}
|
|
|
|
|
|
- finalCancel = cancel
|
|
|
-
|
|
|
workflowChan := make(chan *v1alpha1.Workflow)
|
|
|
- doneChan := make(chan any)
|
|
|
|
|
|
wg := &sync.WaitGroup{}
|
|
|
wg.Add(1)
|
|
|
|
|
|
doneToken := &WatchWorkflowDoneToken{
|
|
|
- wg: wg,
|
|
|
+ wg: wg,
|
|
|
+ doneChan: make(chan any),
|
|
|
+ streamReceiveCancel: cancel,
|
|
|
}
|
|
|
|
|
|
go func() {
|
|
|
for {
|
|
|
select {
|
|
|
- case <-doneChan:
|
|
|
+ case <-doneToken.doneChan:
|
|
|
return
|
|
|
case wf := <-workflowChan:
|
|
|
if wf == nil {
|
|
|
@@ -94,7 +101,7 @@ func (c *Client) WatchWorkflow(params WatchWorkflowParams, callback WatchWorkflo
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- finalCancel = cancel
|
|
|
+ doneToken.streamReceiveCancel = cancel
|
|
|
|
|
|
continue
|
|
|
}
|
|
|
@@ -112,12 +119,6 @@ func (c *Client) WatchWorkflow(params WatchWorkflowParams, callback WatchWorkflo
|
|
|
}()
|
|
|
|
|
|
doneToken.wg.Wait()
|
|
|
- doneChan <- nil
|
|
|
-
|
|
|
- finalCancel()
|
|
|
-
|
|
|
- close(doneChan)
|
|
|
- doneChan = nil
|
|
|
|
|
|
close(workflowChan)
|
|
|
workflowChan = nil
|