orchestrator.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. package saga
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. var runningCountMutex sync.Mutex
  8. var runningCount uint64
  9. type StepFunc func() error
  10. type StepRollbackFunc func() error
  11. type OutputFunc func() (interface{}, error)
  12. type Step struct {
  13. StepFunc StepFunc
  14. StepRollbackFunc StepRollbackFunc
  15. RollbackContextData string
  16. rollbackDone chan interface{}
  17. }
  18. type Orchestrator struct {
  19. stateStoreName string
  20. sagaName string
  21. name string
  22. rollbackRetryPeriodSec time.Duration
  23. steps []*Step
  24. OutputFunc OutputFunc
  25. }
  26. func (orchestrator *Orchestrator) AddStep(step *Step) *Orchestrator {
  27. orchestrator.steps = append(orchestrator.steps, step)
  28. return orchestrator
  29. }
  30. func (orchestrator *Orchestrator) Output(outputFunc OutputFunc) *Orchestrator {
  31. orchestrator.OutputFunc = outputFunc
  32. return orchestrator
  33. }
  34. func (orchestrator *Orchestrator) Run() (interface{}, error) {
  35. runningCountMutex.Lock()
  36. runningCount++
  37. runningCountMutex.Unlock()
  38. for index, step := range orchestrator.steps {
  39. if step.StepFunc == nil {
  40. continue
  41. }
  42. err := step.StepFunc()
  43. if err == nil {
  44. continue
  45. }
  46. if err != nil {
  47. err = saveOrchestratorState(orchestrator.stateStoreName, orchestrator.sagaName,
  48. orchestrator.name, step.RollbackContextData, index)
  49. if err != nil {
  50. return nil, err
  51. }
  52. go orchestrator.Rollback(index)
  53. runningCountMutex.Lock()
  54. runningCount--
  55. runningCountMutex.Unlock()
  56. return nil, err
  57. }
  58. }
  59. if orchestrator.OutputFunc != nil {
  60. result, err := orchestrator.OutputFunc()
  61. runningCountMutex.Lock()
  62. runningCount--
  63. runningCountMutex.Unlock()
  64. return result, err
  65. }
  66. runningCountMutex.Lock()
  67. runningCount--
  68. runningCountMutex.Unlock()
  69. return nil, nil
  70. }
  71. func (orchestrator *Orchestrator) Rollback(startIndex int) {
  72. for i := startIndex; i >= 0; i-- {
  73. rollbackStep := orchestrator.steps[i]
  74. err := orchestrator.rollbackStep(rollbackStep)
  75. if err != nil {
  76. fmt.Println("Rollback", "orchestrator.rollbackStep", err)
  77. }
  78. err = saveOrchestratorState(orchestrator.stateStoreName, orchestrator.sagaName, orchestrator.name,
  79. rollbackStep.RollbackContextData, i-1)
  80. if err != nil {
  81. fmt.Println("Rollback", "saveOrchestratorState", err)
  82. return
  83. }
  84. }
  85. err := deleteOrchestratorState(orchestrator.stateStoreName, orchestrator.sagaName, orchestrator.name)
  86. if err != nil {
  87. fmt.Println("Rollback", "deleteOrchestratorState", err)
  88. return
  89. }
  90. }
  91. func (orchestrator *Orchestrator) rollbackStep(rollbackStep *Step) error {
  92. if rollbackStep.StepRollbackFunc == nil {
  93. return nil
  94. }
  95. err := rollbackStep.StepRollbackFunc()
  96. if err == nil {
  97. return nil
  98. }
  99. orchestrator.rollbackRetry(rollbackStep)
  100. return nil
  101. }
  102. func (orchestrator *Orchestrator) rollbackRetry(step *Step) {
  103. ticker := time.NewTicker(orchestrator.rollbackRetryPeriodSec)
  104. for {
  105. select {
  106. case <-step.rollbackDone:
  107. ticker.Stop()
  108. ticker = nil
  109. case <-ticker.C:
  110. err := step.StepRollbackFunc()
  111. if err == nil {
  112. return
  113. }
  114. }
  115. }
  116. }
  117. func (orchestrator *Orchestrator) stop() {
  118. runningCountMutex.Lock()
  119. for runningCount != 0 {
  120. runningCountMutex.Unlock()
  121. time.Sleep(500 * time.Millisecond)
  122. runningCountMutex.Lock()
  123. continue
  124. }
  125. runningCountMutex.Unlock()
  126. for _, step := range orchestrator.steps {
  127. if step.rollbackDone != nil {
  128. step.rollbackDone <- true
  129. close(step.rollbackDone)
  130. step.rollbackDone = nil
  131. }
  132. }
  133. }