package dpsv1 import ( "git.sxidc.com/service-supports/dps-sdk/client" "git.sxidc.com/service-supports/dps-sdk/pb/v1" "git.sxidc.com/service-supports/dps-sdk/pb/v1/request" ) type Transaction struct { stream v1.CommandService_TransactionClient client *Client } func (tx *Transaction) InsertTx(req *client.InsertRequest) (string, error) { tx.client.transactionMutex.Lock() defer tx.client.transactionMutex.Unlock() if tx.client.conn == nil { return "", nil } var err error defer func() { if err != nil { innerErr := tx.stream.CloseSend() if innerErr != nil { panic(innerErr) } } }() reqTableRow, err := tx.client.formTableRow(req.TableRow) if err != nil { return "", err } err = tx.stream.Send(&request.TransactionOperation{ Request: &request.TransactionOperation_InsertTxRequest{ InsertTxRequest: &request.InsertTxRequest{ TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, KeyColumns: req.KeyColumns, TableRow: reqTableRow, UserID: req.UserID, }, }}) if err != nil { return "", err } txResponse, err := tx.stream.Recv() if err != nil { return "", err } return txResponse.Statement, nil } func (tx *Transaction) InsertBatchTx(req *client.InsertBatchRequest) (string, error) { tx.client.transactionMutex.Lock() defer tx.client.transactionMutex.Unlock() if tx.client.conn == nil { return "", nil } var err error defer func() { if err != nil { innerErr := tx.stream.CloseSend() if innerErr != nil { panic(innerErr) } } }() tableRowItems := make([]*request.InsertTableRowItem, 0) for _, reqTableItem := range req.Items { tableRows := make([]*request.TableRow, 0) for _, reqTableRow := range reqTableItem.TableRows { tableRow, err := tx.client.formTableRow(reqTableRow) if err != nil { return "", err } tableRows = append(tableRows, tableRow) } tableRowItems = append(tableRowItems, &request.InsertTableRowItem{ TablePrefixWithSchema: reqTableItem.TablePrefixWithSchema, Version: reqTableItem.Version, KeyColumns: reqTableItem.KeyColumns, TableRows: tableRows, }) } err = tx.stream.Send(&request.TransactionOperation{ Request: &request.TransactionOperation_InsertBatchTxRequest{ InsertBatchTxRequest: &request.InsertBatchTxRequest{ Items: tableRowItems, UserID: req.UserID, }, }}) if err != nil { return "", err } txResponse, err := tx.stream.Recv() if err != nil { return "", err } return txResponse.Statement, nil } func (tx *Transaction) DeleteTx(req *client.DeleteRequest) (string, error) { tx.client.transactionMutex.Lock() defer tx.client.transactionMutex.Unlock() if tx.client.conn == nil { return "", nil } var err error defer func() { if err != nil { innerErr := tx.stream.CloseSend() if innerErr != nil { panic(innerErr) } } }() err = tx.stream.Send(&request.TransactionOperation{ Request: &request.TransactionOperation_DeleteTxRequest{ DeleteTxRequest: &request.DeleteTxRequest{ TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, KeyValues: req.KeyValues, UserID: req.UserID, }, }}) if err != nil { return "", err } txResponse, err := tx.stream.Recv() if err != nil { return "", err } return txResponse.Statement, nil } func (tx *Transaction) DeleteBatchTx(req *client.DeleteBatchRequest) (string, error) { tx.client.transactionMutex.Lock() defer tx.client.transactionMutex.Unlock() if tx.client.conn == nil { return "", nil } var err error defer func() { if err != nil { innerErr := tx.stream.CloseSend() if innerErr != nil { panic(innerErr) } } }() tableRowItems := make([]*request.DeleteTableRowItem, 0) for _, reqTableItem := range req.Items { items := make([]*request.DeleteItem, 0) for _, reqKeyValues := range reqTableItem.KeyValues { items = append(items, &request.DeleteItem{ KeyValues: reqKeyValues, }) } tableRowItems = append(tableRowItems, &request.DeleteTableRowItem{ TablePrefixWithSchema: reqTableItem.TablePrefixWithSchema, Version: reqTableItem.Version, Items: items, }) } err = tx.stream.Send(&request.TransactionOperation{ Request: &request.TransactionOperation_DeleteBatchTxRequest{ DeleteBatchTxRequest: &request.DeleteBatchTxRequest{ Items: tableRowItems, UserID: req.UserID, }, }}) if err != nil { return "", err } txResponse, err := tx.stream.Recv() if err != nil { return "", err } return txResponse.Statement, nil } func (tx *Transaction) UpdateTx(req *client.UpdateRequest) (string, error) { tx.client.transactionMutex.Lock() defer tx.client.transactionMutex.Unlock() if tx.client.conn == nil { return "", nil } var err error defer func() { if err != nil { innerErr := tx.stream.CloseSend() if innerErr != nil { panic(innerErr) } } }() reqNewTableRowMap, err := tx.client.formTableRow(req.NewTableRow) if err != nil { return "", err } err = tx.stream.Send(&request.TransactionOperation{ Request: &request.TransactionOperation_UpdateTxRequest{ UpdateTxRequest: &request.UpdateTxRequest{ TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, KeyValues: req.KeyValues, NewTableRow: reqNewTableRowMap, UserID: req.UserID, }, }}) if err != nil { return "", err } txResponse, err := tx.stream.Recv() if err != nil { return "", err } return txResponse.Statement, nil }