Selaa lähdekoodia

完成事务接口开发

yjp 1 vuosi sitten
vanhempi
commit
75e6735388
3 muutettua tiedostoa jossa 294 lisäystä ja 0 poistoa
  1. 27 0
      dpsv1/client.go
  2. 257 0
      dpsv1/transaction.go
  3. 10 0
      ports/transaction.go

+ 27 - 0
dpsv1/client.go

@@ -71,6 +71,33 @@ func (c *Client) AutoMigrate(req *ports.AutoMigrateRequest) error {
 	return nil
 }
 
+type TransactionFunc func(tx ports.Transaction) error
+
+func (c *Client) Transaction(databaseID string, txFunc TransactionFunc) error {
+	stream, err := c.commandServiceClient.Transaction(context.Background())
+	if err != nil {
+		return err
+	}
+
+	err = stream.Send(&request.TransactionOperation{
+		Request: &request.TransactionOperation_TransactionBeginRequest{
+			TransactionBeginRequest: &request.TransactionBeginRequest{DatabaseID: databaseID},
+		}})
+	if err != nil {
+		return err
+	}
+
+	err = txFunc(&Transaction{
+		stream: stream,
+		client: c,
+	})
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
 func (c *Client) Insert(req *ports.InsertRequest) (string, error) {
 	tableRowJsonBytes, err := c.formTableRow(req.TableRow)
 	if err != nil {

+ 257 - 0
dpsv1/transaction.go

@@ -0,0 +1,257 @@
+package dpsv1
+
+import (
+	"git.sxidc.com/service-supports/dps-sdk/pb/v1"
+	"git.sxidc.com/service-supports/dps-sdk/pb/v1/request"
+	"git.sxidc.com/service-supports/dps-sdk/ports"
+	"io"
+)
+
+type Transaction struct {
+	stream v1.CommandService_TransactionClient
+	client *Client
+}
+
+func (tx *Transaction) InsertTx(req *ports.InsertRequest) (string, error) {
+	var err error
+
+	defer func() {
+		if err != nil {
+			innerErr := tx.stream.CloseSend()
+			if innerErr != nil {
+				panic(innerErr)
+			}
+		}
+	}()
+
+	reqTableRowMap, err := tx.client.formTableRow(req.TableRow)
+	if err != nil {
+		return "", err
+	}
+
+	err = tx.stream.Send(&request.TransactionOperation{
+		Request: &request.TransactionOperation_InsertRequest{
+			InsertRequest: &request.InsertRequest{
+				DatabaseID:            req.DatabaseID,
+				TablePrefixWithSchema: req.TablePrefixWithSchema,
+				Version:               req.Version,
+				KeyColumns:            req.KeyColumns,
+				TableRow:              reqTableRowMap,
+				UserID:                req.UserID,
+			},
+		}})
+	if err != nil {
+		return "", err
+	}
+
+	txResponse, err := tx.stream.Recv()
+	if err != nil {
+		return "", err
+	}
+
+	return txResponse.Statement, nil
+}
+
+func (tx *Transaction) InsertBatchTx(req *ports.InsertBatchRequest) (string, error) {
+	var err error
+
+	defer func() {
+		if err != nil {
+			innerErr := tx.stream.CloseSend()
+			if innerErr != nil {
+				panic(innerErr)
+			}
+		}
+	}()
+
+	tableItems := make([]*request.InsertTableItem, 0)
+
+	for _, reqTableItem := range req.Items {
+		items := make([]*request.InsertItem, 0)
+		for _, reqItem := range reqTableItem.Items {
+			tableRow, err := tx.client.formTableRow(reqItem.TableRow)
+			if err != nil {
+				return "", err
+			}
+
+			items = append(items, &request.InsertItem{
+				KeyColumns: reqItem.KeyColumns,
+				TableRow:   tableRow,
+			})
+		}
+
+		tableItems = append(tableItems, &request.InsertTableItem{
+			TablePrefixWithSchema: reqTableItem.TablePrefixWithSchema,
+			Version:               reqTableItem.Version,
+			Items:                 items,
+		})
+	}
+
+	err = tx.stream.Send(&request.TransactionOperation{
+		Request: &request.TransactionOperation_InsertBatchRequest{
+			InsertBatchRequest: &request.InsertBatchRequest{
+				DatabaseID: req.DatabaseID,
+				Items:      tableItems,
+				UserID:     req.UserID,
+			},
+		}})
+	if err != nil {
+		return "", err
+	}
+
+	txResponse, err := tx.stream.Recv()
+	if err != nil {
+		return "", err
+	}
+
+	return txResponse.Statement, nil
+}
+
+func (tx *Transaction) DeleteTx(req *ports.DeleteRequest) (string, error) {
+	var err error
+
+	defer func() {
+		if err != nil {
+			innerErr := tx.stream.CloseSend()
+			if innerErr != nil {
+				panic(innerErr)
+			}
+		}
+	}()
+
+	err = tx.stream.Send(&request.TransactionOperation{
+		Request: &request.TransactionOperation_DeleteRequest{
+			DeleteRequest: &request.DeleteRequest{
+				DatabaseID:            req.DatabaseID,
+				TablePrefixWithSchema: req.TablePrefixWithSchema,
+				Version:               req.Version,
+				KeyValues:             req.KeyValues,
+				UserID:                req.UserID,
+			},
+		}})
+	if err != nil {
+		return "", err
+	}
+
+	txResponse, err := tx.stream.Recv()
+	if err != nil {
+		return "", err
+	}
+
+	return txResponse.Statement, nil
+}
+
+func (tx *Transaction) DeleteBatchTx(req *ports.DeleteBatchRequest) (string, error) {
+	var err error
+
+	defer func() {
+		if err != nil {
+			innerErr := tx.stream.CloseSend()
+			if innerErr != nil {
+				panic(innerErr)
+			}
+		}
+	}()
+
+	tableItems := make([]*request.DeleteTableItem, 0)
+
+	for _, reqTableItem := range req.Items {
+		items := make([]*request.DeleteItem, 0)
+		for _, reqItem := range reqTableItem.Items {
+			items = append(items, &request.DeleteItem{
+				KeyValues: reqItem.KeyValues,
+			})
+		}
+
+		tableItems = append(tableItems, &request.DeleteTableItem{
+			TablePrefixWithSchema: reqTableItem.TablePrefixWithSchema,
+			Version:               reqTableItem.Version,
+			Items:                 items,
+		})
+	}
+
+	err = tx.stream.Send(&request.TransactionOperation{
+		Request: &request.TransactionOperation_DeleteBatchRequest{
+			DeleteBatchRequest: &request.DeleteBatchRequest{
+				DatabaseID: req.DatabaseID,
+				Items:      tableItems,
+				UserID:     req.UserID,
+			},
+		}})
+	if err != nil {
+		return "", err
+	}
+
+	txResponse, err := tx.stream.Recv()
+	if err != nil {
+		return "", err
+	}
+
+	return txResponse.Statement, nil
+}
+
+func (tx *Transaction) UpdateTx(req *ports.UpdateRequest) (string, error) {
+	var err error
+
+	defer func() {
+		if err != nil {
+			innerErr := tx.stream.CloseSend()
+			if innerErr != nil {
+				panic(innerErr)
+			}
+		}
+	}()
+
+	reqNewTableRowMap, err := tx.client.formTableRow(req.NewTableRow)
+	if err != nil {
+		return "", err
+	}
+
+	err = tx.stream.Send(&request.TransactionOperation{
+		Request: &request.TransactionOperation_UpdateRequest{
+			UpdateRequest: &request.UpdateRequest{
+				DatabaseID:            req.DatabaseID,
+				TablePrefixWithSchema: req.TablePrefixWithSchema,
+				Version:               req.Version,
+				KeyValues:             req.KeyValues,
+				NewTableRow:           reqNewTableRowMap,
+				UserID:                req.UserID,
+			},
+		}})
+	if err != nil {
+		return "", err
+	}
+
+	txResponse, err := tx.stream.Recv()
+	if err != nil {
+		return "", err
+	}
+
+	return txResponse.Statement, nil
+}
+
+func (tx *Transaction) End() error {
+	var err error
+
+	defer func() {
+		innerErr := tx.stream.CloseSend()
+		if innerErr != nil {
+			panic(innerErr)
+		}
+	}()
+
+	err = tx.stream.Send(&request.TransactionOperation{
+		Request: &request.TransactionOperation_TransactionEndRequest{
+			TransactionEndRequest: &request.TransactionEndRequest{},
+		}})
+	if err != nil {
+		return err
+	}
+
+	_, err = tx.stream.Recv()
+	if err != nil && err != io.EOF {
+		return err
+	}
+
+	return nil
+}

+ 10 - 0
ports/transaction.go

@@ -0,0 +1,10 @@
+package ports
+
+type Transaction interface {
+	InsertTx(request *InsertRequest) (string, error)
+	InsertBatchTx(request *InsertBatchRequest) (string, error)
+	DeleteTx(request *DeleteRequest) (string, error)
+	DeleteBatchTx(request *DeleteBatchRequest) (string, error)
+	UpdateTx(request *UpdateRequest) (string, error)
+	End() error
+}