package dpsv1 import ( "git.sxidc.com/service-supports/dps-sdk/client" "git.sxidc.com/service-supports/dps-sdk/pb/v1" "git.sxidc.com/service-supports/dps-sdk/pb/v1/request" "io" ) type Transaction struct { stream v1.CommandService_TransactionClient client *Client } func (tx *Transaction) InsertTx(databaseID string, req *client.InsertRequest) (string, error) { var err error defer func() { if err != nil { innerErr := tx.stream.CloseSend() if innerErr != nil { panic(innerErr) } } }() reqTableRow, err := tx.client.formTableRow(req.TableRow) if err != nil { return "", err } err = tx.stream.Send(&request.TransactionOperation{ Request: &request.TransactionOperation_InsertRequest{ InsertRequest: &request.InsertRequest{ DatabaseID: databaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, KeyColumns: req.KeyColumns, TableRow: reqTableRow, UserID: req.UserID, }, }}) if err != nil { return "", err } txResponse, err := tx.stream.Recv() if err != nil { return "", err } return txResponse.Statement, nil } func (tx *Transaction) InsertBatchTx(databaseID string, req *client.InsertBatchRequest) (string, error) { var err error defer func() { if err != nil { innerErr := tx.stream.CloseSend() if innerErr != nil { panic(innerErr) } } }() tableItems := make([]*request.InsertTableItem, 0) for _, reqTableItem := range req.Items { items := make([]*request.InsertItem, 0) for _, reqItem := range reqTableItem.Items { tableRow, err := tx.client.formTableRow(reqItem.TableRow) if err != nil { return "", err } items = append(items, &request.InsertItem{ KeyColumns: reqItem.KeyColumns, TableRow: tableRow, }) } tableItems = append(tableItems, &request.InsertTableItem{ TablePrefixWithSchema: reqTableItem.TablePrefixWithSchema, Version: reqTableItem.Version, Items: items, }) } err = tx.stream.Send(&request.TransactionOperation{ Request: &request.TransactionOperation_InsertBatchRequest{ InsertBatchRequest: &request.InsertBatchRequest{ DatabaseID: databaseID, Items: tableItems, UserID: req.UserID, }, }}) if err != nil { return "", err } txResponse, err := tx.stream.Recv() if err != nil { return "", err } return txResponse.Statement, nil } func (tx *Transaction) DeleteTx(databaseID string, req *client.DeleteRequest) (string, error) { var err error defer func() { if err != nil { innerErr := tx.stream.CloseSend() if innerErr != nil { panic(innerErr) } } }() err = tx.stream.Send(&request.TransactionOperation{ Request: &request.TransactionOperation_DeleteRequest{ DeleteRequest: &request.DeleteRequest{ DatabaseID: databaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, KeyValues: req.KeyValues, UserID: req.UserID, }, }}) if err != nil { return "", err } txResponse, err := tx.stream.Recv() if err != nil { return "", err } return txResponse.Statement, nil } func (tx *Transaction) DeleteBatchTx(databaseID string, req *client.DeleteBatchRequest) (string, error) { var err error defer func() { if err != nil { innerErr := tx.stream.CloseSend() if innerErr != nil { panic(innerErr) } } }() tableItems := make([]*request.DeleteTableItem, 0) for _, reqTableItem := range req.Items { items := make([]*request.DeleteItem, 0) for _, reqItem := range reqTableItem.Items { items = append(items, &request.DeleteItem{ KeyValues: reqItem.KeyValues, }) } tableItems = append(tableItems, &request.DeleteTableItem{ TablePrefixWithSchema: reqTableItem.TablePrefixWithSchema, Version: reqTableItem.Version, Items: items, }) } err = tx.stream.Send(&request.TransactionOperation{ Request: &request.TransactionOperation_DeleteBatchRequest{ DeleteBatchRequest: &request.DeleteBatchRequest{ DatabaseID: databaseID, Items: tableItems, UserID: req.UserID, }, }}) if err != nil { return "", err } txResponse, err := tx.stream.Recv() if err != nil { return "", err } return txResponse.Statement, nil } func (tx *Transaction) UpdateTx(databaseID string, req *client.UpdateRequest) (string, error) { var err error defer func() { if err != nil { innerErr := tx.stream.CloseSend() if innerErr != nil { panic(innerErr) } } }() reqNewTableRowMap, err := tx.client.formTableRow(req.NewTableRow) if err != nil { return "", err } err = tx.stream.Send(&request.TransactionOperation{ Request: &request.TransactionOperation_UpdateRequest{ UpdateRequest: &request.UpdateRequest{ DatabaseID: databaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, KeyValues: req.KeyValues, NewTableRow: reqNewTableRowMap, UserID: req.UserID, }, }}) if err != nil { return "", err } txResponse, err := tx.stream.Recv() if err != nil { return "", err } return txResponse.Statement, nil } func (tx *Transaction) End() error { var err error defer func() { innerErr := tx.stream.CloseSend() if innerErr != nil { panic(innerErr) } }() err = tx.stream.Send(&request.TransactionOperation{ Request: &request.TransactionOperation_TransactionEndRequest{ TransactionEndRequest: &request.TransactionEndRequest{}, }}) if err != nil { return err } _, err = tx.stream.Recv() if err != nil && err != io.EOF { return err } return nil }