package saga import ( "encoding/json" "git.sxidc.com/service-supports/dapr_api/state" "git.sxidc.com/service-supports/dapr_api/utils" ) const ( sagaIndexes = "indexes" ) type OrchestratorState struct { Name string RollbackContextData string RollbackStartIndex int } func deleteOrchestratorState(stateAPI *state.API, stateStoreName string, sagaName string, orchestratorName string) error { orchestratorNames, err := getSagaIndexState(stateAPI, stateStoreName, sagaName) if err != nil { return err } for i, savedOrchestratorName := range orchestratorNames { if savedOrchestratorName == orchestratorName { if i == len(orchestratorNames)-1 { orchestratorNames = orchestratorNames[:i] } else { orchestratorNames = append(orchestratorNames[0:i], orchestratorNames[i+1:]...) } } } if len(orchestratorNames) == 0 { return stateAPI.Transaction(stateStoreName, state.TransactionRequest{ Operations: []state.TransactionOperation{ { Operation: state.TransactionDelete, Request: state.TransactionOperationRequest{ Key: sagaName, }, }, { Operation: state.TransactionDelete, Request: state.TransactionOperationRequest{ Key: orchestratorName, }, }, }, }) } orchestratorNames = append(orchestratorNames, orchestratorName) sagaIndexStateJsonData, err := json.Marshal(&map[string]interface{}{sagaIndexes: orchestratorNames}) if err != nil { return err } return stateAPI.Transaction(stateStoreName, state.TransactionRequest{ Operations: []state.TransactionOperation{ { Operation: state.TransactionUpsert, Request: state.TransactionOperationRequest{ Key: sagaName, Value: string(sagaIndexStateJsonData), }, }, { Operation: state.TransactionDelete, Request: state.TransactionOperationRequest{ Key: orchestratorName, }, }, }, }) } func getSagaIndexState(stateAPI *state.API, stateStoreName string, sagaName string) ([]string, error) { data, _, err := stateAPI.GetState(stateStoreName, sagaName, nil) if err != nil { return nil, err } if utils.HasBlank(data) { return make([]string, 0), nil } orchestratorNames := make([]string, 0) err = json.Unmarshal([]byte(data), &map[string]interface{}{sagaIndexes: orchestratorNames}) if err != nil { return nil, err } return orchestratorNames, nil } func saveOrchestratorState(stateAPI *state.API, stateStoreName string, sagaName string, orchestratorName string, rollbackContextData string, rollbackStartIndex int) error { orchestratorStateJsonData, err := json.Marshal(&OrchestratorState{ Name: orchestratorName, RollbackContextData: rollbackContextData, RollbackStartIndex: rollbackStartIndex, }) if err != nil { return err } orchestratorNames, err := getSagaIndexState(stateAPI, stateStoreName, sagaName) if err != nil { return err } find := false for _, savedOrchestratorName := range orchestratorNames { if savedOrchestratorName == orchestratorName { find = true } } if find { err := stateAPI.SaveState(stateStoreName, []state.SaveStateRequest{ { Key: orchestratorName, Value: string(orchestratorStateJsonData), }, }) if err != nil { return err } return nil } orchestratorNames = append(orchestratorNames, orchestratorName) sagaIndexStateJsonData, err := json.Marshal(&map[string]interface{}{sagaIndexes: orchestratorNames}) if err != nil { return err } return stateAPI.Transaction(stateStoreName, state.TransactionRequest{ Operations: []state.TransactionOperation{ { Operation: state.TransactionUpsert, Request: state.TransactionOperationRequest{ Key: sagaName, Value: string(sagaIndexStateJsonData), }, }, { Operation: state.TransactionUpsert, Request: state.TransactionOperationRequest{ Key: orchestratorName, Value: string(orchestratorStateJsonData), }, }, }, }) } func getSagaOrchestratorStates(stateAPI *state.API, stateStoreName string, sagaName string) ([]OrchestratorState, error) { data, _, err := stateAPI.GetState(stateStoreName, sagaName, nil) if err != nil { return nil, err } if utils.HasBlank(data) { return make([]OrchestratorState, 0), nil } indexes := make(map[string][]string) err = json.Unmarshal([]byte(data), &indexes) if err != nil { return nil, err } orchestratorStates := make([]OrchestratorState, 0) for _, orchestratorName := range indexes[sagaIndexes] { data, _, err := stateAPI.GetState(stateStoreName, orchestratorName, nil) if err != nil { return nil, err } orchestratorState := new(OrchestratorState) err = json.Unmarshal([]byte(data), orchestratorState) if err != nil { return nil, err } orchestratorStates = append(orchestratorStates, *orchestratorState) } return orchestratorStates, nil }