package saga import ( "encoding/json" "git.sxidc.com/yjp/dapr_client" dapr "github.com/dapr/go-sdk/client" ) const ( sagaIndexes = "indexes" ) type OrchestratorState struct { Name string RollbackContextData string RollbackStartIndex int } func deleteOrchestratorState(stateStoreName string, sagaName string, orchestratorName string) error { orchestratorNames, err := getSagaIndexState(stateStoreName, sagaName) if err != nil { return err } for i, savedOrchestratorName := range orchestratorNames { if savedOrchestratorName == orchestratorName { if i == len(orchestratorNames)-1 { orchestratorNames = orchestratorNames[:i] } else { orchestratorNames = append(orchestratorNames[0:i], orchestratorNames[i+1:]...) } } } if len(orchestratorNames) == 0 { return dapr_client.GetInstance().ExecuteStateTransaction(stateStoreName, nil, []*dapr.StateOperation{ { Type: dapr.StateOperationTypeDelete, Item: &dapr.SetStateItem{ Key: sagaName, }, }, { Type: dapr.StateOperationTypeDelete, Item: &dapr.SetStateItem{ Key: orchestratorName, }, }, }) } orchestratorNames = append(orchestratorNames, orchestratorName) sagaIndexStateJsonData, err := json.Marshal(&map[string]interface{}{sagaIndexes: orchestratorNames}) if err != nil { return err } return dapr_client.GetInstance().ExecuteStateTransaction(stateStoreName, nil, []*dapr.StateOperation{ { Type: dapr.StateOperationTypeUpsert, Item: &dapr.SetStateItem{ Key: sagaName, Value: sagaIndexStateJsonData, }, }, { Type: dapr.StateOperationTypeDelete, Item: &dapr.SetStateItem{ Key: orchestratorName, }, }, }) } func getSagaIndexState(stateStoreName string, sagaName string) ([]string, error) { stateItem, err := dapr_client.GetInstance().GetState(stateStoreName, sagaName, nil) if err != nil { return nil, err } if stateItem.Value == nil { return make([]string, 0), nil } orchestratorNames := make([]string, 0) err = json.Unmarshal(stateItem.Value, &map[string]interface{}{sagaIndexes: orchestratorNames}) if err != nil { return nil, err } return orchestratorNames, nil } func saveOrchestratorState(stateStoreName string, sagaName string, orchestratorName string, rollbackContextData string, rollbackStartIndex int) error { orchestratorStateJsonData, err := json.Marshal(&OrchestratorState{ Name: orchestratorName, RollbackContextData: rollbackContextData, RollbackStartIndex: rollbackStartIndex, }) if err != nil { return err } orchestratorNames, err := getSagaIndexState(stateStoreName, sagaName) if err != nil { return err } find := false for _, savedOrchestratorName := range orchestratorNames { if savedOrchestratorName == orchestratorName { find = true } } if find { err := dapr_client.GetInstance().SaveState(stateStoreName, orchestratorName, orchestratorStateJsonData, nil) if err != nil { return err } return nil } orchestratorNames = append(orchestratorNames, orchestratorName) sagaIndexStateJsonData, err := json.Marshal(&map[string]interface{}{sagaIndexes: orchestratorNames}) if err != nil { return err } return dapr_client.GetInstance().ExecuteStateTransaction(stateStoreName, nil, []*dapr.StateOperation{ { Type: dapr.StateOperationTypeUpsert, Item: &dapr.SetStateItem{ Key: sagaName, Value: sagaIndexStateJsonData, }, }, { Type: dapr.StateOperationTypeUpsert, Item: &dapr.SetStateItem{ Key: orchestratorName, Value: orchestratorStateJsonData, }, }, }) } func getSagaOrchestratorStates(stateStoreName string, sagaName string) ([]OrchestratorState, error) { stateItem, err := dapr_client.GetInstance().GetState(stateStoreName, sagaName, nil) if err != nil { return nil, err } if stateItem.Value == nil { return make([]OrchestratorState, 0), nil } indexes := make(map[string][]string) err = json.Unmarshal(stateItem.Value, &indexes) if err != nil { return nil, err } orchestratorStates := make([]OrchestratorState, 0) for _, orchestratorName := range indexes[sagaIndexes] { stateItem, err := dapr_client.GetInstance().GetState(stateStoreName, orchestratorName, nil) if err != nil { return nil, err } orchestratorState := new(OrchestratorState) err = json.Unmarshal(stateItem.Value, orchestratorState) if err != nil { return nil, err } orchestratorStates = append(orchestratorStates, *orchestratorState) } return orchestratorStates, nil }