123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- package saga
- import (
- "encoding/json"
- "git.sxidc.com/service-supports/dapr_api/state"
- "git.sxidc.com/service-supports/dapr_api/utils"
- )
- const (
- sagaIndexes = "indexes"
- )
- type OrchestratorState struct {
- Name string
- RollbackContextData string
- RollbackStartIndex int
- }
- func deleteOrchestratorState(stateAPI *state.API, stateStoreName string, sagaName string, orchestratorName string) error {
- orchestratorNames, err := getSagaIndexState(stateAPI, stateStoreName, sagaName)
- if err != nil {
- return err
- }
- for i, savedOrchestratorName := range orchestratorNames {
- if savedOrchestratorName == orchestratorName {
- if i == len(orchestratorNames)-1 {
- orchestratorNames = orchestratorNames[:i]
- } else {
- orchestratorNames = append(orchestratorNames[0:i], orchestratorNames[i+1:]...)
- }
- }
- }
- if len(orchestratorNames) == 0 {
- return stateAPI.Transaction(stateStoreName, state.TransactionRequest{
- Operations: []state.TransactionOperation{
- {
- Operation: state.TransactionDelete,
- Request: state.TransactionOperationRequest{
- Key: sagaName,
- },
- },
- {
- Operation: state.TransactionDelete,
- Request: state.TransactionOperationRequest{
- Key: orchestratorName,
- },
- },
- },
- })
- }
- orchestratorNames = append(orchestratorNames, orchestratorName)
- sagaIndexStateJsonData, err := json.Marshal(&map[string]interface{}{sagaIndexes: orchestratorNames})
- if err != nil {
- return err
- }
- return stateAPI.Transaction(stateStoreName, state.TransactionRequest{
- Operations: []state.TransactionOperation{
- {
- Operation: state.TransactionUpsert,
- Request: state.TransactionOperationRequest{
- Key: sagaName,
- Value: string(sagaIndexStateJsonData),
- },
- },
- {
- Operation: state.TransactionDelete,
- Request: state.TransactionOperationRequest{
- Key: orchestratorName,
- },
- },
- },
- })
- }
- func getSagaIndexState(stateAPI *state.API, stateStoreName string, sagaName string) ([]string, error) {
- data, _, err := stateAPI.GetState(stateStoreName, sagaName, nil)
- if err != nil {
- return nil, err
- }
- if utils.HasBlank(data) {
- return make([]string, 0), nil
- }
- orchestratorNames := make([]string, 0)
- err = json.Unmarshal([]byte(data), &map[string]interface{}{sagaIndexes: orchestratorNames})
- if err != nil {
- return nil, err
- }
- return orchestratorNames, nil
- }
- func saveOrchestratorState(stateAPI *state.API, stateStoreName string, sagaName string, orchestratorName string,
- rollbackContextData string, rollbackStartIndex int) error {
- orchestratorStateJsonData, err := json.Marshal(&OrchestratorState{
- Name: orchestratorName,
- RollbackContextData: rollbackContextData,
- RollbackStartIndex: rollbackStartIndex,
- })
- if err != nil {
- return err
- }
- orchestratorNames, err := getSagaIndexState(stateAPI, stateStoreName, sagaName)
- if err != nil {
- return err
- }
- find := false
- for _, savedOrchestratorName := range orchestratorNames {
- if savedOrchestratorName == orchestratorName {
- find = true
- }
- }
- if find {
- err := stateAPI.SaveState(stateStoreName, []state.SaveStateRequest{
- {
- Key: orchestratorName,
- Value: string(orchestratorStateJsonData),
- },
- })
- if err != nil {
- return err
- }
- return nil
- }
- orchestratorNames = append(orchestratorNames, orchestratorName)
- sagaIndexStateJsonData, err := json.Marshal(&map[string]interface{}{sagaIndexes: orchestratorNames})
- if err != nil {
- return err
- }
- return stateAPI.Transaction(stateStoreName, state.TransactionRequest{
- Operations: []state.TransactionOperation{
- {
- Operation: state.TransactionUpsert,
- Request: state.TransactionOperationRequest{
- Key: sagaName,
- Value: string(sagaIndexStateJsonData),
- },
- },
- {
- Operation: state.TransactionUpsert,
- Request: state.TransactionOperationRequest{
- Key: orchestratorName,
- Value: string(orchestratorStateJsonData),
- },
- },
- },
- })
- }
- func getSagaOrchestratorStates(stateAPI *state.API, stateStoreName string, sagaName string) ([]OrchestratorState, error) {
- data, _, err := stateAPI.GetState(stateStoreName, sagaName, nil)
- if err != nil {
- return nil, err
- }
- if utils.HasBlank(data) {
- return make([]OrchestratorState, 0), nil
- }
- indexes := make(map[string][]string)
- err = json.Unmarshal([]byte(data), &indexes)
- if err != nil {
- return nil, err
- }
- orchestratorStates := make([]OrchestratorState, 0)
- for _, orchestratorName := range indexes[sagaIndexes] {
- data, _, err := stateAPI.GetState(stateStoreName, orchestratorName, nil)
- if err != nil {
- return nil, err
- }
- orchestratorState := new(OrchestratorState)
- err = json.Unmarshal([]byte(data), orchestratorState)
- if err != nil {
- return nil, err
- }
- orchestratorStates = append(orchestratorStates, *orchestratorState)
- }
- return orchestratorStates, nil
- }
|