saga.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package saga
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. var sagaInstance *Saga
  7. var sagaInstanceMutex sync.Mutex
  8. type Saga struct {
  9. stateStoreName string
  10. name string
  11. orchestrators []*Orchestrator
  12. }
  13. func Init(stateStoreName string, sagaName string) {
  14. sagaInstanceMutex.Lock()
  15. defer sagaInstanceMutex.Unlock()
  16. if sagaInstance != nil {
  17. return
  18. }
  19. sagaInstance = new(Saga)
  20. sagaInstance.stateStoreName = stateStoreName
  21. sagaInstance.name = sagaName
  22. sagaInstance.orchestrators = make([]*Orchestrator, 0)
  23. }
  24. func Destroy() {
  25. sagaInstanceMutex.Lock()
  26. defer sagaInstanceMutex.Unlock()
  27. if sagaInstance == nil {
  28. return
  29. }
  30. for _, orchestrator := range sagaInstance.orchestrators {
  31. orchestrator.stop()
  32. }
  33. sagaInstance.orchestrators = nil
  34. sagaInstance.name = ""
  35. sagaInstance.stateStoreName = ""
  36. sagaInstance = nil
  37. }
  38. func GetInstance() *Saga {
  39. return sagaInstance
  40. }
  41. func (s *Saga) BuildOrchestrator(orchestratorName string, rollbackRetryPeriodSec time.Duration) *Orchestrator {
  42. sagaInstanceMutex.Lock()
  43. defer sagaInstanceMutex.Unlock()
  44. orchestrator := &Orchestrator{
  45. stateStoreName: s.stateStoreName,
  46. sagaName: s.name,
  47. name: orchestratorName,
  48. rollbackRetryPeriodSec: rollbackRetryPeriodSec,
  49. }
  50. sagaInstance.orchestrators = append(sagaInstance.orchestrators, orchestrator)
  51. return orchestrator
  52. }
  53. func (s *Saga) GetOrchestratorNeedRollback() ([]OrchestratorState, error) {
  54. return getSagaOrchestratorStates(s.stateStoreName, s.name)
  55. }