saga.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package saga
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. var sagaInstance *Saga
  7. var sagaInstanceMutex sync.Mutex
  8. var initOnce sync.Once
  9. type Saga struct {
  10. stateStoreName string
  11. name string
  12. orchestrators []*Orchestrator
  13. }
  14. func Init(stateStoreName string, sagaName string) {
  15. sagaInstanceMutex.Lock()
  16. defer sagaInstanceMutex.Unlock()
  17. if sagaInstance != nil {
  18. return
  19. }
  20. sagaInstance = new(Saga)
  21. sagaInstance.stateStoreName = stateStoreName
  22. sagaInstance.name = sagaName
  23. sagaInstance.orchestrators = make([]*Orchestrator, 0)
  24. }
  25. func Destroy() {
  26. sagaInstanceMutex.Lock()
  27. defer sagaInstanceMutex.Unlock()
  28. if sagaInstance == nil {
  29. return
  30. }
  31. for _, orchestrator := range sagaInstance.orchestrators {
  32. orchestrator.stop()
  33. }
  34. sagaInstance.orchestrators = nil
  35. sagaInstance.name = ""
  36. sagaInstance.stateStoreName = ""
  37. sagaInstance = nil
  38. }
  39. func GetInstance() *Saga {
  40. return sagaInstance
  41. }
  42. func (s *Saga) BuildOrchestrator(orchestratorName string, rollbackRetryPeriodSec time.Duration) *Orchestrator {
  43. sagaInstanceMutex.Lock()
  44. defer sagaInstanceMutex.Unlock()
  45. orchestrator := &Orchestrator{
  46. stateStoreName: s.stateStoreName,
  47. sagaName: s.name,
  48. name: orchestratorName,
  49. rollbackRetryPeriodSec: rollbackRetryPeriodSec,
  50. }
  51. sagaInstance.orchestrators = append(sagaInstance.orchestrators, orchestrator)
  52. return orchestrator
  53. }
  54. func (s *Saga) GetOrchestratorNeedRollback() ([]OrchestratorState, error) {
  55. return getSagaOrchestratorStates(s.stateStoreName, s.name)
  56. }