Browse Source

修改逻辑

yjp 1 year ago
parent
commit
6f375d62c0
3 changed files with 11 additions and 93 deletions
  1. 0 36
      addtional_column.go
  2. 9 0
      client/table_row.go
  3. 2 57
      dpsv1/transaction.go

+ 0 - 36
addtional_column.go

@@ -1,36 +0,0 @@
-package dps
-
-import "time"
-
-const (
-	createUserIDColumnName     = "create_user_id"
-	lastUpdateUserIDColumnName = "last_update_user_id"
-	lastEventTimeColumnName    = "last_event_time"
-)
-
-func GetDataCreateUserID(dataMap map[string]any) string {
-	value, ok := dataMap[createUserIDColumnName].(string)
-	if !ok {
-		return ""
-	}
-
-	return value
-}
-
-func GetDataLastUpdateUserID(dataMap map[string]any) string {
-	value, ok := dataMap[lastUpdateUserIDColumnName].(string)
-	if !ok {
-		return ""
-	}
-
-	return value
-}
-
-func GetLastEventTime(dataMap map[string]any) *time.Time {
-	value, ok := dataMap[lastEventTimeColumnName].(time.Time)
-	if !ok {
-		return nil
-	}
-
-	return &value
-}

+ 9 - 0
client/table_row.go

@@ -92,6 +92,15 @@ func (tableRow *TableRow) AddColumnValueFloat64(columnName string, value float64
 	return tableRow
 }
 
+func (tableRow *TableRow) SnapshotColumnValue(prefix string, version string, columnName string) any {
+	value, ok := tableRow.row[prefix+"_snaps_"+version+"."+columnName]
+	if !ok {
+		return nil
+	}
+
+	return value
+}
+
 func (tableRow *TableRow) ColumnValueTime(columnName string) time.Time {
 	value, ok := tableRow.row[columnName].(time.Time)
 	if !ok {

+ 2 - 57
dpsv1/transaction.go

@@ -70,17 +70,6 @@ func (tx *Transaction) InsertBatchTx(req *client.InsertBatchRequest) (string, er
 		return "", nil
 	}
 
-	var err error
-
-	defer func() {
-		if err != nil {
-			innerErr := tx.stream.CloseSend()
-			if innerErr != nil {
-				panic(innerErr)
-			}
-		}
-	}()
-
 	tableRowItems := make([]*request.InsertTableRowItem, 0)
 
 	for _, reqTableItem := range req.Items {
@@ -106,7 +95,7 @@ func (tx *Transaction) InsertBatchTx(req *client.InsertBatchRequest) (string, er
 		})
 	}
 
-	err = tx.stream.Send(&request.TransactionOperation{
+	err := tx.stream.Send(&request.TransactionOperation{
 		Request: &request.TransactionOperation_InsertBatchTxRequest{
 			InsertBatchTxRequest: &request.InsertBatchTxRequest{
 				Items:  tableRowItems,
@@ -133,18 +122,7 @@ func (tx *Transaction) DeleteTx(req *client.DeleteRequest) (string, error) {
 		return "", nil
 	}
 
-	var err error
-
-	defer func() {
-		if err != nil {
-			innerErr := tx.stream.CloseSend()
-			if innerErr != nil {
-				panic(innerErr)
-			}
-		}
-	}()
-
-	err = tx.stream.Send(&request.TransactionOperation{
+	err := tx.stream.Send(&request.TransactionOperation{
 		Request: &request.TransactionOperation_DeleteTxRequest{
 			DeleteTxRequest: &request.DeleteTxRequest{
 				TablePrefixWithSchema: req.TablePrefixWithSchema,
@@ -178,17 +156,6 @@ func (tx *Transaction) DeleteWhereTx(req *client.DeleteWhereRequest) (string, er
 		return "", nil
 	}
 
-	var err error
-
-	defer func() {
-		if err != nil {
-			innerErr := tx.stream.CloseSend()
-			if innerErr != nil {
-				panic(innerErr)
-			}
-		}
-	}()
-
 	whereJsonBytes, err := req.Where.ToJson()
 	if err != nil {
 		return "", err
@@ -228,17 +195,6 @@ func (tx *Transaction) UpdateTx(req *client.UpdateRequest) (string, error) {
 		return "", nil
 	}
 
-	var err error
-
-	defer func() {
-		if err != nil {
-			innerErr := tx.stream.CloseSend()
-			if innerErr != nil {
-				panic(innerErr)
-			}
-		}
-	}()
-
 	reqNewTableRow, err := req.NewTableRow.ToDPSTableRow()
 	if err != nil {
 		return "", err
@@ -283,17 +239,6 @@ func (tx *Transaction) UpdateWhereTx(req *client.UpdateWhereRequest) (string, er
 		return "", nil
 	}
 
-	var err error
-
-	defer func() {
-		if err != nil {
-			innerErr := tx.stream.CloseSend()
-			if innerErr != nil {
-				panic(innerErr)
-			}
-		}
-	}()
-
 	whereJsonBytes, err := req.Where.ToJson()
 	if err != nil {
 		return "", err