saga.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package saga
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. var sagaInstance *Saga
  7. var sagaInstanceMutex sync.Mutex
  8. var initOnce sync.Once
  9. type Saga struct {
  10. stateStoreName string
  11. name string
  12. orchestrators []*Orchestrator
  13. }
  14. func Init(stateStoreName string, sagaName string) {
  15. sagaInstanceMutex.Lock()
  16. defer sagaInstanceMutex.Unlock()
  17. if sagaInstance != nil {
  18. return
  19. }
  20. initOnce.Do(func() {
  21. sagaInstance = new(Saga)
  22. sagaInstance.stateStoreName = stateStoreName
  23. sagaInstance.name = sagaName
  24. sagaInstance.orchestrators = make([]*Orchestrator, 0)
  25. })
  26. }
  27. func Destroy() {
  28. sagaInstanceMutex.Lock()
  29. defer sagaInstanceMutex.Unlock()
  30. if sagaInstance == nil {
  31. return
  32. }
  33. for _, orchestrator := range sagaInstance.orchestrators {
  34. orchestrator.stop()
  35. }
  36. sagaInstance.orchestrators = nil
  37. sagaInstance.name = ""
  38. sagaInstance.stateStoreName = ""
  39. sagaInstance = nil
  40. }
  41. func GetInstance() *Saga {
  42. return sagaInstance
  43. }
  44. func (s *Saga) BuildOrchestrator(orchestratorName string, rollbackRetryPeriodSec time.Duration) *Orchestrator {
  45. sagaInstanceMutex.Lock()
  46. defer sagaInstanceMutex.Unlock()
  47. orchestrator := &Orchestrator{
  48. stateStoreName: s.stateStoreName,
  49. sagaName: s.name,
  50. name: orchestratorName,
  51. rollbackRetryPeriodSec: rollbackRetryPeriodSec,
  52. }
  53. sagaInstance.orchestrators = append(sagaInstance.orchestrators, orchestrator)
  54. return orchestrator
  55. }
  56. func (s *Saga) GetOrchestratorNeedRollback() ([]OrchestratorState, error) {
  57. return GetSagaOrchestratorStates(s.stateStoreName, s.name)
  58. }