Browse Source

添加停止校验

yjp 2 years ago
parent
commit
9fdc64156e
1 changed files with 33 additions and 1 deletions
  1. 33 1
      saga/orchestrator.go

+ 33 - 1
saga/orchestrator.go

@@ -2,9 +2,13 @@ package saga
 
 import (
 	"fmt"
+	"sync"
 	"time"
 )
 
+var runningCountMutex sync.Mutex
+var runningCount uint64
+
 type StepFunc func() error
 type StepRollbackFunc func() error
 type OutputFunc func() (interface{}, error)
@@ -38,6 +42,10 @@ func (orchestrator *Orchestrator) Output(outputFunc OutputFunc) *Orchestrator {
 }
 
 func (orchestrator *Orchestrator) Run() (interface{}, error) {
+	runningCountMutex.Lock()
+	runningCount++
+	runningCountMutex.Unlock()
+
 	for index, step := range orchestrator.steps {
 		if step.StepFunc == nil {
 			continue
@@ -56,14 +64,29 @@ func (orchestrator *Orchestrator) Run() (interface{}, error) {
 			}
 
 			go orchestrator.Rollback(index)
+
+			runningCountMutex.Lock()
+			runningCount--
+			runningCountMutex.Unlock()
+
 			return nil, err
 		}
 	}
 
 	if orchestrator.OutputFunc != nil {
-		return orchestrator.OutputFunc()
+		result, err := orchestrator.OutputFunc()
+
+		runningCountMutex.Lock()
+		runningCount--
+		runningCountMutex.Unlock()
+
+		return result, err
 	}
 
+	runningCountMutex.Lock()
+	runningCount--
+	runningCountMutex.Unlock()
+
 	return nil, nil
 }
 
@@ -123,6 +146,15 @@ func (orchestrator *Orchestrator) rollbackRetry(step *Step) {
 }
 
 func (orchestrator *Orchestrator) stop() {
+	runningCountMutex.Lock()
+	for runningCount != 0 {
+		runningCountMutex.Unlock()
+		time.Sleep(500 * time.Millisecond)
+		runningCountMutex.Lock()
+		continue
+	}
+	runningCountMutex.Unlock()
+
 	for _, step := range orchestrator.steps {
 		if step.rollbackDone != nil {
 			step.rollbackDone <- true