123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- package saga
- import (
- "encoding/json"
- "git.sxidc.com/yjp/dapr_client"
- dapr "github.com/dapr/go-sdk/client"
- )
- const (
- sagaIndexes = "indexes"
- )
- type OrchestratorState struct {
- Name string
- RollbackContextData string
- RollbackStartIndex int
- }
- func deleteOrchestratorState(stateStoreName string, sagaName string, orchestratorName string) error {
- orchestratorNames, err := getSagaIndexState(stateStoreName, sagaName)
- if err != nil {
- return err
- }
- for i, savedOrchestratorName := range orchestratorNames {
- if savedOrchestratorName == orchestratorName {
- if i == len(orchestratorNames)-1 {
- orchestratorNames = orchestratorNames[:i]
- } else {
- orchestratorNames = append(orchestratorNames[0:i], orchestratorNames[i+1:]...)
- }
- }
- }
- if len(orchestratorNames) == 0 {
- return dapr_client.GetInstance().ExecuteStateTransaction(stateStoreName, nil,
- []*dapr.StateOperation{
- {
- Type: dapr.StateOperationTypeDelete,
- Item: &dapr.SetStateItem{
- Key: sagaName,
- },
- },
- {
- Type: dapr.StateOperationTypeDelete,
- Item: &dapr.SetStateItem{
- Key: orchestratorName,
- },
- },
- })
- }
- orchestratorNames = append(orchestratorNames, orchestratorName)
- sagaIndexStateJsonData, err := json.Marshal(&map[string]interface{}{sagaIndexes: orchestratorNames})
- if err != nil {
- return err
- }
- return dapr_client.GetInstance().ExecuteStateTransaction(stateStoreName, nil,
- []*dapr.StateOperation{
- {
- Type: dapr.StateOperationTypeUpsert,
- Item: &dapr.SetStateItem{
- Key: sagaName,
- Value: sagaIndexStateJsonData,
- },
- },
- {
- Type: dapr.StateOperationTypeDelete,
- Item: &dapr.SetStateItem{
- Key: orchestratorName,
- },
- },
- })
- }
- func getSagaIndexState(stateStoreName string, sagaName string) ([]string, error) {
- stateItem, err := dapr_client.GetInstance().GetState(stateStoreName, sagaName, nil)
- if err != nil {
- return nil, err
- }
- if stateItem.Value == nil {
- return make([]string, 0), nil
- }
- orchestratorNames := make([]string, 0)
- err = json.Unmarshal(stateItem.Value, &map[string]interface{}{sagaIndexes: orchestratorNames})
- if err != nil {
- return nil, err
- }
- return orchestratorNames, nil
- }
- func saveOrchestratorState(stateStoreName string, sagaName string, orchestratorName string,
- rollbackContextData string, rollbackStartIndex int) error {
- orchestratorStateJsonData, err := json.Marshal(&OrchestratorState{
- Name: orchestratorName,
- RollbackContextData: rollbackContextData,
- RollbackStartIndex: rollbackStartIndex,
- })
- if err != nil {
- return err
- }
- orchestratorNames, err := getSagaIndexState(stateStoreName, sagaName)
- if err != nil {
- return err
- }
- find := false
- for _, savedOrchestratorName := range orchestratorNames {
- if savedOrchestratorName == orchestratorName {
- find = true
- }
- }
- if find {
- err := dapr_client.GetInstance().SaveState(stateStoreName, orchestratorName, orchestratorStateJsonData, nil)
- if err != nil {
- return err
- }
- return nil
- }
- orchestratorNames = append(orchestratorNames, orchestratorName)
- sagaIndexStateJsonData, err := json.Marshal(&map[string]interface{}{sagaIndexes: orchestratorNames})
- if err != nil {
- return err
- }
- return dapr_client.GetInstance().ExecuteStateTransaction(stateStoreName, nil,
- []*dapr.StateOperation{
- {
- Type: dapr.StateOperationTypeUpsert,
- Item: &dapr.SetStateItem{
- Key: sagaName,
- Value: sagaIndexStateJsonData,
- },
- },
- {
- Type: dapr.StateOperationTypeUpsert,
- Item: &dapr.SetStateItem{
- Key: orchestratorName,
- Value: orchestratorStateJsonData,
- },
- },
- })
- }
- func getSagaOrchestratorStates(stateStoreName string, sagaName string) ([]OrchestratorState, error) {
- stateItem, err := dapr_client.GetInstance().GetState(stateStoreName, sagaName, nil)
- if err != nil {
- return nil, err
- }
- if stateItem.Value == nil {
- return make([]OrchestratorState, 0), nil
- }
- indexes := make(map[string][]string)
- err = json.Unmarshal(stateItem.Value, &indexes)
- if err != nil {
- return nil, err
- }
- orchestratorStates := make([]OrchestratorState, 0)
- for _, orchestratorName := range indexes[sagaIndexes] {
- stateItem, err := dapr_client.GetInstance().GetState(stateStoreName, orchestratorName, nil)
- if err != nil {
- return nil, err
- }
- orchestratorState := new(OrchestratorState)
- err = json.Unmarshal(stateItem.Value, orchestratorState)
- if err != nil {
- return nil, err
- }
- orchestratorStates = append(orchestratorStates, *orchestratorState)
- }
- return orchestratorStates, nil
- }
|