state_store.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. package saga
  2. import (
  3. "encoding/json"
  4. "git.sxidc.com/service-supports/dapr_api/state"
  5. "git.sxidc.com/service-supports/dapr_api/utils"
  6. )
  7. const (
  8. sagaIndexes = "indexes"
  9. )
  10. type OrchestratorState struct {
  11. Name string
  12. RollbackContextData string
  13. RollbackStartIndex int
  14. }
  15. func deleteOrchestratorState(stateAPI *state.API, stateStoreName string, sagaName string, orchestratorName string) error {
  16. orchestratorNames, err := getSagaIndexState(stateAPI, stateStoreName, sagaName)
  17. if err != nil {
  18. return err
  19. }
  20. for i, savedOrchestratorName := range orchestratorNames {
  21. if savedOrchestratorName == orchestratorName {
  22. if i == len(orchestratorNames)-1 {
  23. orchestratorNames = orchestratorNames[:i]
  24. } else {
  25. orchestratorNames = append(orchestratorNames[0:i], orchestratorNames[i+1:]...)
  26. }
  27. }
  28. }
  29. if len(orchestratorNames) == 0 {
  30. return stateAPI.Transaction(stateStoreName, state.TransactionRequest{
  31. Operations: []state.TransactionOperation{
  32. {
  33. Operation: state.TransactionDelete,
  34. Request: state.TransactionOperationRequest{
  35. Key: sagaName,
  36. },
  37. },
  38. {
  39. Operation: state.TransactionDelete,
  40. Request: state.TransactionOperationRequest{
  41. Key: orchestratorName,
  42. },
  43. },
  44. },
  45. })
  46. }
  47. orchestratorNames = append(orchestratorNames, orchestratorName)
  48. sagaIndexStateJsonData, err := json.Marshal(&map[string]interface{}{sagaIndexes: orchestratorNames})
  49. if err != nil {
  50. return err
  51. }
  52. return stateAPI.Transaction(stateStoreName, state.TransactionRequest{
  53. Operations: []state.TransactionOperation{
  54. {
  55. Operation: state.TransactionUpsert,
  56. Request: state.TransactionOperationRequest{
  57. Key: sagaName,
  58. Value: string(sagaIndexStateJsonData),
  59. },
  60. },
  61. {
  62. Operation: state.TransactionDelete,
  63. Request: state.TransactionOperationRequest{
  64. Key: orchestratorName,
  65. },
  66. },
  67. },
  68. })
  69. }
  70. func getSagaIndexState(stateAPI *state.API, stateStoreName string, sagaName string) ([]string, error) {
  71. data, _, err := stateAPI.GetState(stateStoreName, sagaName, nil)
  72. if err != nil {
  73. return nil, err
  74. }
  75. if utils.HasBlank(data) {
  76. return make([]string, 0), nil
  77. }
  78. orchestratorNames := make([]string, 0)
  79. err = json.Unmarshal([]byte(data), &map[string]interface{}{sagaIndexes: orchestratorNames})
  80. if err != nil {
  81. return nil, err
  82. }
  83. return orchestratorNames, nil
  84. }
  85. func saveOrchestratorState(stateAPI *state.API, stateStoreName string, sagaName string, orchestratorName string,
  86. rollbackContextData string, rollbackStartIndex int) error {
  87. orchestratorStateJsonData, err := json.Marshal(&OrchestratorState{
  88. Name: orchestratorName,
  89. RollbackContextData: rollbackContextData,
  90. RollbackStartIndex: rollbackStartIndex,
  91. })
  92. if err != nil {
  93. return err
  94. }
  95. orchestratorNames, err := getSagaIndexState(stateAPI, stateStoreName, sagaName)
  96. if err != nil {
  97. return err
  98. }
  99. find := false
  100. for _, savedOrchestratorName := range orchestratorNames {
  101. if savedOrchestratorName == orchestratorName {
  102. find = true
  103. }
  104. }
  105. if find {
  106. err := stateAPI.SaveState(stateStoreName, []state.SaveStateRequest{
  107. {
  108. Key: orchestratorName,
  109. Value: string(orchestratorStateJsonData),
  110. },
  111. })
  112. if err != nil {
  113. return err
  114. }
  115. return nil
  116. }
  117. orchestratorNames = append(orchestratorNames, orchestratorName)
  118. sagaIndexStateJsonData, err := json.Marshal(&map[string]interface{}{sagaIndexes: orchestratorNames})
  119. if err != nil {
  120. return err
  121. }
  122. return stateAPI.Transaction(stateStoreName, state.TransactionRequest{
  123. Operations: []state.TransactionOperation{
  124. {
  125. Operation: state.TransactionUpsert,
  126. Request: state.TransactionOperationRequest{
  127. Key: sagaName,
  128. Value: string(sagaIndexStateJsonData),
  129. },
  130. },
  131. {
  132. Operation: state.TransactionUpsert,
  133. Request: state.TransactionOperationRequest{
  134. Key: orchestratorName,
  135. Value: string(orchestratorStateJsonData),
  136. },
  137. },
  138. },
  139. })
  140. }
  141. func getSagaOrchestratorStates(stateAPI *state.API, stateStoreName string, sagaName string) ([]OrchestratorState, error) {
  142. data, _, err := stateAPI.GetState(stateStoreName, sagaName, nil)
  143. if err != nil {
  144. return nil, err
  145. }
  146. if utils.HasBlank(data) {
  147. return make([]OrchestratorState, 0), nil
  148. }
  149. indexes := make(map[string][]string)
  150. err = json.Unmarshal([]byte(data), &indexes)
  151. if err != nil {
  152. return nil, err
  153. }
  154. orchestratorStates := make([]OrchestratorState, 0)
  155. for _, orchestratorName := range indexes[sagaIndexes] {
  156. data, _, err := stateAPI.GetState(stateStoreName, orchestratorName, nil)
  157. if err != nil {
  158. return nil, err
  159. }
  160. orchestratorState := new(OrchestratorState)
  161. err = json.Unmarshal([]byte(data), orchestratorState)
  162. if err != nil {
  163. return nil, err
  164. }
  165. orchestratorStates = append(orchestratorStates, *orchestratorState)
  166. }
  167. return orchestratorStates, nil
  168. }