state_store.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package saga
  2. import (
  3. "encoding/json"
  4. "git.sxidc.com/yjp/dapr_client"
  5. dapr "github.com/dapr/go-sdk/client"
  6. )
  7. const (
  8. sagaIndexes = "indexes"
  9. )
  10. type OrchestratorState struct {
  11. Name string
  12. RollbackContextData string
  13. RollbackStartIndex int
  14. }
  15. func deleteOrchestratorState(stateStoreName string, sagaName string, orchestratorName string) error {
  16. orchestratorNames, err := getSagaIndexState(stateStoreName, sagaName)
  17. if err != nil {
  18. return err
  19. }
  20. for i, savedOrchestratorName := range orchestratorNames {
  21. if savedOrchestratorName == orchestratorName {
  22. if i == len(orchestratorNames)-1 {
  23. orchestratorNames = orchestratorNames[:i]
  24. } else {
  25. orchestratorNames = append(orchestratorNames[0:i], orchestratorNames[i+1:]...)
  26. }
  27. }
  28. }
  29. if len(orchestratorNames) == 0 {
  30. return dapr_client.GetInstance().ExecuteStateTransaction(stateStoreName, nil,
  31. []*dapr.StateOperation{
  32. {
  33. Type: dapr.StateOperationTypeDelete,
  34. Item: &dapr.SetStateItem{
  35. Key: sagaName,
  36. },
  37. },
  38. {
  39. Type: dapr.StateOperationTypeDelete,
  40. Item: &dapr.SetStateItem{
  41. Key: orchestratorName,
  42. },
  43. },
  44. })
  45. }
  46. orchestratorNames = append(orchestratorNames, orchestratorName)
  47. sagaIndexStateJsonData, err := json.Marshal(&map[string]interface{}{sagaIndexes: orchestratorNames})
  48. if err != nil {
  49. return err
  50. }
  51. return dapr_client.GetInstance().ExecuteStateTransaction(stateStoreName, nil,
  52. []*dapr.StateOperation{
  53. {
  54. Type: dapr.StateOperationTypeUpsert,
  55. Item: &dapr.SetStateItem{
  56. Key: sagaName,
  57. Value: sagaIndexStateJsonData,
  58. },
  59. },
  60. {
  61. Type: dapr.StateOperationTypeDelete,
  62. Item: &dapr.SetStateItem{
  63. Key: orchestratorName,
  64. },
  65. },
  66. })
  67. }
  68. func getSagaIndexState(stateStoreName string, sagaName string) ([]string, error) {
  69. stateItem, err := dapr_client.GetInstance().GetState(stateStoreName, sagaName, nil)
  70. if err != nil {
  71. return nil, err
  72. }
  73. if stateItem.Value == nil {
  74. return make([]string, 0), nil
  75. }
  76. orchestratorNames := make([]string, 0)
  77. err = json.Unmarshal(stateItem.Value, &map[string]interface{}{sagaIndexes: orchestratorNames})
  78. if err != nil {
  79. return nil, err
  80. }
  81. return orchestratorNames, nil
  82. }
  83. func saveOrchestratorState(stateStoreName string, sagaName string, orchestratorName string,
  84. rollbackContextData string, rollbackStartIndex int) error {
  85. orchestratorStateJsonData, err := json.Marshal(&OrchestratorState{
  86. Name: orchestratorName,
  87. RollbackContextData: rollbackContextData,
  88. RollbackStartIndex: rollbackStartIndex,
  89. })
  90. if err != nil {
  91. return err
  92. }
  93. orchestratorNames, err := getSagaIndexState(stateStoreName, sagaName)
  94. if err != nil {
  95. return err
  96. }
  97. find := false
  98. for _, savedOrchestratorName := range orchestratorNames {
  99. if savedOrchestratorName == orchestratorName {
  100. find = true
  101. }
  102. }
  103. if find {
  104. err := dapr_client.GetInstance().SaveState(stateStoreName, orchestratorName, orchestratorStateJsonData, nil)
  105. if err != nil {
  106. return err
  107. }
  108. return nil
  109. }
  110. orchestratorNames = append(orchestratorNames, orchestratorName)
  111. sagaIndexStateJsonData, err := json.Marshal(&map[string]interface{}{sagaIndexes: orchestratorNames})
  112. if err != nil {
  113. return err
  114. }
  115. return dapr_client.GetInstance().ExecuteStateTransaction(stateStoreName, nil,
  116. []*dapr.StateOperation{
  117. {
  118. Type: dapr.StateOperationTypeUpsert,
  119. Item: &dapr.SetStateItem{
  120. Key: sagaName,
  121. Value: sagaIndexStateJsonData,
  122. },
  123. },
  124. {
  125. Type: dapr.StateOperationTypeUpsert,
  126. Item: &dapr.SetStateItem{
  127. Key: orchestratorName,
  128. Value: orchestratorStateJsonData,
  129. },
  130. },
  131. })
  132. }
  133. func getSagaOrchestratorStates(stateStoreName string, sagaName string) ([]OrchestratorState, error) {
  134. stateItem, err := dapr_client.GetInstance().GetState(stateStoreName, sagaName, nil)
  135. if err != nil {
  136. return nil, err
  137. }
  138. if stateItem.Value == nil {
  139. return make([]OrchestratorState, 0), nil
  140. }
  141. indexes := make(map[string][]string)
  142. err = json.Unmarshal(stateItem.Value, &indexes)
  143. if err != nil {
  144. return nil, err
  145. }
  146. orchestratorStates := make([]OrchestratorState, 0)
  147. for _, orchestratorName := range indexes[sagaIndexes] {
  148. stateItem, err := dapr_client.GetInstance().GetState(stateStoreName, orchestratorName, nil)
  149. if err != nil {
  150. return nil, err
  151. }
  152. orchestratorState := new(OrchestratorState)
  153. err = json.Unmarshal(stateItem.Value, orchestratorState)
  154. if err != nil {
  155. return nil, err
  156. }
  157. orchestratorStates = append(orchestratorStates, *orchestratorState)
  158. }
  159. return orchestratorStates, nil
  160. }