|
@@ -3,9 +3,13 @@ package saga
|
|
|
import (
|
|
import (
|
|
|
"fmt"
|
|
"fmt"
|
|
|
"git.sxidc.com/service-supports/dapr_api/state"
|
|
"git.sxidc.com/service-supports/dapr_api/state"
|
|
|
|
|
+ "sync"
|
|
|
"time"
|
|
"time"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
|
|
+var runningCountMutex sync.Mutex
|
|
|
|
|
+var runningCount uint64
|
|
|
|
|
+
|
|
|
type StepFunc func() error
|
|
type StepFunc func() error
|
|
|
type StepRollbackFunc func() error
|
|
type StepRollbackFunc func() error
|
|
|
type OutputFunc func() (interface{}, error)
|
|
type OutputFunc func() (interface{}, error)
|
|
@@ -40,6 +44,10 @@ func (orchestrator *Orchestrator) Output(outputFunc OutputFunc) *Orchestrator {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (orchestrator *Orchestrator) Run() (interface{}, error) {
|
|
func (orchestrator *Orchestrator) Run() (interface{}, error) {
|
|
|
|
|
+ runningCountMutex.Lock()
|
|
|
|
|
+ runningCount++
|
|
|
|
|
+ runningCountMutex.Unlock()
|
|
|
|
|
+
|
|
|
for index, step := range orchestrator.steps {
|
|
for index, step := range orchestrator.steps {
|
|
|
if step.StepFunc == nil {
|
|
if step.StepFunc == nil {
|
|
|
continue
|
|
continue
|
|
@@ -58,14 +66,29 @@ func (orchestrator *Orchestrator) Run() (interface{}, error) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
go orchestrator.Rollback(index)
|
|
go orchestrator.Rollback(index)
|
|
|
|
|
+
|
|
|
|
|
+ runningCountMutex.Lock()
|
|
|
|
|
+ runningCount--
|
|
|
|
|
+ runningCountMutex.Unlock()
|
|
|
|
|
+
|
|
|
return nil, err
|
|
return nil, err
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if orchestrator.OutputFunc != nil {
|
|
if orchestrator.OutputFunc != nil {
|
|
|
- return orchestrator.OutputFunc()
|
|
|
|
|
|
|
+ result, err := orchestrator.OutputFunc()
|
|
|
|
|
+
|
|
|
|
|
+ runningCountMutex.Lock()
|
|
|
|
|
+ runningCount--
|
|
|
|
|
+ runningCountMutex.Unlock()
|
|
|
|
|
+
|
|
|
|
|
+ return result, err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ runningCountMutex.Lock()
|
|
|
|
|
+ runningCount--
|
|
|
|
|
+ runningCountMutex.Unlock()
|
|
|
|
|
+
|
|
|
return nil, nil
|
|
return nil, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -127,6 +150,15 @@ func (orchestrator *Orchestrator) rollbackRetry(step *Step) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (orchestrator *Orchestrator) stop() {
|
|
func (orchestrator *Orchestrator) stop() {
|
|
|
|
|
+ runningCountMutex.Lock()
|
|
|
|
|
+ for runningCount != 0 {
|
|
|
|
|
+ runningCountMutex.Unlock()
|
|
|
|
|
+ time.Sleep(500 * time.Millisecond)
|
|
|
|
|
+ runningCountMutex.Lock()
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ runningCountMutex.Unlock()
|
|
|
|
|
+
|
|
|
for _, step := range orchestrator.steps {
|
|
for _, step := range orchestrator.steps {
|
|
|
if step.rollbackDone != nil {
|
|
if step.rollbackDone != nil {
|
|
|
step.rollbackDone <- true
|
|
step.rollbackDone <- true
|