package dpsv1 import ( "errors" "git.sxidc.com/service-supports/dps-sdk/client" "git.sxidc.com/service-supports/dps-sdk/pb/v1" "git.sxidc.com/service-supports/dps-sdk/pb/v1/request" ) type Transaction struct { stream v1.CommandService_TransactionClient client *Client } func (tx *Transaction) InsertTx(req *client.InsertRequest) (string, error) { if req.TableRow == nil { return "", nil } tx.client.transactionMutex.Lock() defer tx.client.transactionMutex.Unlock() if tx.client.conn == nil { return "", nil } var err error defer func() { if err != nil { innerErr := tx.stream.CloseSend() if innerErr != nil { panic(innerErr) } } }() reqTableRow, err := req.TableRow.ToDPSTableRow() if err != nil { return "", err } err = tx.stream.Send(&request.TransactionOperation{ Request: &request.TransactionOperation_InsertTxRequest{ InsertTxRequest: &request.InsertTxRequest{ TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, KeyColumns: req.KeyColumns, TableRow: reqTableRow, UserID: req.UserID, }, }}) if err != nil { return "", err } txResponse, err := tx.stream.Recv() if err != nil { return "", err } return txResponse.Statement, nil } func (tx *Transaction) InsertBatchTx(req *client.InsertBatchRequest) (string, error) { tx.client.transactionMutex.Lock() defer tx.client.transactionMutex.Unlock() if tx.client.conn == nil { return "", nil } tableRowItems := make([]*request.InsertTableRowItem, 0) for _, reqTableItem := range req.Items { tableRows := make([]*request.TableRow, 0) for _, reqTableRow := range reqTableItem.TableRows { if reqTableRow == nil { continue } tableRow, err := reqTableRow.ToDPSTableRow() if err != nil { return "", err } tableRows = append(tableRows, tableRow) } tableRowItems = append(tableRowItems, &request.InsertTableRowItem{ TablePrefixWithSchema: reqTableItem.TablePrefixWithSchema, Version: reqTableItem.Version, KeyColumns: reqTableItem.KeyColumns, TableRows: tableRows, }) } err := tx.stream.Send(&request.TransactionOperation{ Request: &request.TransactionOperation_InsertBatchTxRequest{ InsertBatchTxRequest: &request.InsertBatchTxRequest{ Items: tableRowItems, UserID: req.UserID, }, }}) if err != nil { return "", err } txResponse, err := tx.stream.Recv() if err != nil { return "", err } return txResponse.Statement, nil } func (tx *Transaction) DeleteTx(req *client.DeleteRequest) (string, error) { tx.client.transactionMutex.Lock() defer tx.client.transactionMutex.Unlock() if tx.client.conn == nil { return "", nil } err := tx.stream.Send(&request.TransactionOperation{ Request: &request.TransactionOperation_DeleteTxRequest{ DeleteTxRequest: &request.DeleteTxRequest{ TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, KeyColumns: req.KeyValues.Columns(), KeyValues: req.KeyValues.Values(), UserID: req.UserID, }, }}) if err != nil { return "", err } txResponse, err := tx.stream.Recv() if err != nil { return "", err } return txResponse.Statement, nil } func (tx *Transaction) DeleteWhereTx(req *client.DeleteWhereRequest) (string, error) { if req.Where == nil { return "", errors.New("没有传递Where条件") } tx.client.transactionMutex.Lock() defer tx.client.transactionMutex.Unlock() if tx.client.conn == nil { return "", nil } whereJsonBytes, err := req.Where.ToJson() if err != nil { return "", err } err = tx.stream.Send(&request.TransactionOperation{ Request: &request.TransactionOperation_DeleteWhereTxRequest{ DeleteWhereTxRequest: &request.DeleteWhereTxRequest{ TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, KeyColumns: req.KeyColumns, Where: whereJsonBytes, UserID: req.UserID, }, }}) if err != nil { return "", err } txResponse, err := tx.stream.Recv() if err != nil { return "", err } return txResponse.Statement, nil } func (tx *Transaction) UpdateTx(req *client.UpdateRequest) (string, error) { if req.NewTableRow == nil { return "", nil } tx.client.transactionMutex.Lock() defer tx.client.transactionMutex.Unlock() if tx.client.conn == nil { return "", nil } reqNewTableRow, err := req.NewTableRow.ToDPSTableRow() if err != nil { return "", err } err = tx.stream.Send(&request.TransactionOperation{ Request: &request.TransactionOperation_UpdateTxRequest{ UpdateTxRequest: &request.UpdateTxRequest{ TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, KeyColumns: req.KeyValues.Columns(), KeyValues: req.KeyValues.Values(), NewTableRow: reqNewTableRow, UserID: req.UserID, }, }}) if err != nil { return "", err } txResponse, err := tx.stream.Recv() if err != nil { return "", err } return txResponse.Statement, nil } func (tx *Transaction) UpdateWhereTx(req *client.UpdateWhereRequest) (string, error) { if req.Where == nil { return "", errors.New("没有传递Where条件") } if req.NewTableRow == nil { return "", nil } tx.client.transactionMutex.Lock() defer tx.client.transactionMutex.Unlock() if tx.client.conn == nil { return "", nil } whereJsonBytes, err := req.Where.ToJson() if err != nil { return "", err } reqNewTableRow, err := req.NewTableRow.ToDPSTableRow() if err != nil { return "", err } err = tx.stream.Send(&request.TransactionOperation{ Request: &request.TransactionOperation_UpdateWhereTxRequest{ UpdateWhereTxRequest: &request.UpdateWhereTxRequest{ TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, KeyColumns: req.KeyColumns, Where: whereJsonBytes, NewTableRow: reqNewTableRow, UserID: req.UserID, }, }}) if err != nil { return "", err } txResponse, err := tx.stream.Recv() if err != nil { return "", err } return txResponse.Statement, nil }