package dpsv1 import ( "context" "encoding/json" "errors" "git.sxidc.com/service-supports/dps-sdk/client" "git.sxidc.com/service-supports/dps-sdk/pb/v1" "git.sxidc.com/service-supports/dps-sdk/pb/v1/request" "git.sxidc.com/service-supports/dps-sdk/pb/v1/response" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "reflect" "time" ) type Client struct { conn *grpc.ClientConn commandServiceClient v1.CommandServiceClient queryServiceClient v1.QueryServiceClient eventQueryServiceClient v1.EventQueryServiceClient } func NewClient(address string) (*Client, error) { conn, err := grpc.DialContext(context.Background(), address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } return &Client{ conn: conn, commandServiceClient: v1.NewCommandServiceClient(conn), queryServiceClient: v1.NewQueryServiceClient(conn), eventQueryServiceClient: v1.NewEventQueryServiceClient(conn), }, nil } func DestroyClient(client *Client) error { if client == nil { return nil } err := client.conn.Close() if err != nil { return err } client.conn = nil client.commandServiceClient = nil client.queryServiceClient = nil client = nil return nil } func (c *Client) AutoMigrate(databaseID string, req *client.AutoMigrateRequest) error { tableModelDescribeJsonBytes, err := json.Marshal(req.TableModelDescribe) if err != nil { return err } _, err = c.commandServiceClient.AutoMigrate(context.Background(), &request.AutoMigrateRequest{ DatabaseID: databaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, TableModelDescribe: tableModelDescribeJsonBytes, }) if err != nil { return err } return nil } func (c *Client) AutoMigrateBatch(databaseID string, req *client.AutoMigrateBatchRequest) error { items := make([]*request.AutoMigrateItem, 0) for _, reqItem := range req.Items { tableModelDescribeJsonBytes, err := json.Marshal(reqItem.TableModelDescribe) if err != nil { return err } items = append(items, &request.AutoMigrateItem{ TablePrefixWithSchema: reqItem.TablePrefixWithSchema, Version: reqItem.Version, TableModelDescribe: tableModelDescribeJsonBytes, }) } _, err := c.commandServiceClient.AutoMigrateBatch(context.Background(), &request.AutoMigrateBatchRequest{ DatabaseID: databaseID, Items: items, }) if err != nil { return err } return nil } func (c *Client) Transaction(databaseID string, txFunc client.TransactionFunc) error { stream, err := c.commandServiceClient.Transaction(context.Background()) if err != nil { return err } err = stream.Send(&request.TransactionOperation{ Request: &request.TransactionOperation_TransactionBeginRequest{ TransactionBeginRequest: &request.TransactionBeginRequest{DatabaseID: databaseID}, }}) if err != nil { return err } err = txFunc(&Transaction{ stream: stream, client: c, }) if err != nil { return err } return nil } func (c *Client) Insert(databaseID string, req *client.InsertRequest) (string, error) { tableRowJsonBytes, err := c.formTableRow(req.TableRow) if err != nil { return "", err } reply, err := c.commandServiceClient.Insert(context.Background(), &request.InsertRequest{ DatabaseID: databaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, KeyColumns: req.KeyColumns, TableRow: tableRowJsonBytes, UserID: req.UserID, }) if err != nil { return "", err } return reply.Statement, nil } func (c *Client) InsertBatch(databaseID string, req *client.InsertBatchRequest) (string, error) { tableItems := make([]*request.InsertTableItem, 0) for _, reqTableItem := range req.Items { items := make([]*request.InsertItem, 0) for _, reqItem := range reqTableItem.Items { tableRow, err := c.formTableRow(reqItem.TableRow) if err != nil { return "", err } items = append(items, &request.InsertItem{ KeyColumns: reqItem.KeyColumns, TableRow: tableRow, }) } tableItems = append(tableItems, &request.InsertTableItem{ TablePrefixWithSchema: reqTableItem.TablePrefixWithSchema, Version: reqTableItem.Version, Items: items, }) } reply, err := c.commandServiceClient.InsertBatch(context.Background(), &request.InsertBatchRequest{ DatabaseID: databaseID, Items: tableItems, UserID: req.UserID, }) if err != nil { return "", err } return reply.Statement, nil } func (c *Client) Delete(databaseID string, req *client.DeleteRequest) (string, error) { reply, err := c.commandServiceClient.Delete(context.Background(), &request.DeleteRequest{ DatabaseID: databaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, KeyValues: req.KeyValues, UserID: req.UserID, }) if err != nil { return "", err } return reply.Statement, nil } func (c *Client) DeleteBatch(databaseID string, req *client.DeleteBatchRequest) (string, error) { tableItems := make([]*request.DeleteTableItem, 0) for _, reqTableItem := range req.Items { items := make([]*request.DeleteItem, 0) for _, reqItem := range reqTableItem.Items { items = append(items, &request.DeleteItem{ KeyValues: reqItem.KeyValues, }) } tableItems = append(tableItems, &request.DeleteTableItem{ TablePrefixWithSchema: reqTableItem.TablePrefixWithSchema, Version: reqTableItem.Version, Items: items, }) } reply, err := c.commandServiceClient.DeleteBatch(context.Background(), &request.DeleteBatchRequest{ DatabaseID: databaseID, Items: tableItems, UserID: req.UserID, }) if err != nil { return "", err } return reply.Statement, nil } func (c *Client) Update(databaseID string, req *client.UpdateRequest) (string, error) { newTableRowJsonByte, err := c.formTableRow(req.NewTableRow) if err != nil { return "", err } reply, err := c.commandServiceClient.Update(context.Background(), &request.UpdateRequest{ DatabaseID: databaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, KeyValues: req.KeyValues, NewTableRow: newTableRowJsonByte, UserID: req.UserID, }) if err != nil { return "", err } return reply.Statement, nil } func (c *Client) Replay(databaseID string, req *client.ReplayRequest) (string, error) { reply, err := c.commandServiceClient.Replay(context.Background(), &request.ReplayRequest{ DatabaseID: databaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, KeyValues: req.KeyValues, UserID: req.UserID, }) if err != nil { return "", err } return reply.Statement, nil } func (c *Client) QueryByWhereAndOrderBy(databaseID string, req *client.QueryByWhereAndOrderByRequest) (string, []map[string]any, int64, error) { whereJsonBytes, err := json.Marshal(req.Where) if err != nil { return "", nil, 0, err } reply, err := c.queryServiceClient.QueryByWhereAndOrderBy(context.Background(), &request.QueryByWhereAndOrderByRequest{ DatabaseID: databaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, Select: req.Select, Where: whereJsonBytes, OrderBy: req.OrderBy, PageNo: int32(req.PageNo), PageSize: int32(req.PageSize), }) if err != nil { return "", nil, 0, err } infosMap, err := c.infoDataToInfoMapBatch(reply.Infos) if err != nil { return "", nil, 0, err } return reply.Statement, infosMap, reply.TotalCount, nil } func (c *Client) CommonQuery(databaseID string, req *client.CommonQueryRequest) (string, []map[string]any, int64, error) { whereJsonBytes, err := json.Marshal(req.Where) if err != nil { return "", nil, 0, err } orJsonBytes, err := json.Marshal(req.Or) if err != nil { return "", nil, 0, err } havingJsonBytes, err := json.Marshal(req.Having) if err != nil { return "", nil, 0, err } reply, err := c.queryServiceClient.CommonQuery(context.Background(), &request.CommonQueryRequest{ DatabaseID: databaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, Select: req.Select, Where: whereJsonBytes, OrderBy: req.OrderBy, Or: orJsonBytes, GroupBy: req.GroupBy, Join: req.Join, Having: havingJsonBytes, PageNo: int32(req.PageNo), PageSize: int32(req.PageSize), }) if err != nil { return "", nil, 0, err } infosMap, err := c.infoDataToInfoMapBatch(reply.Infos) if err != nil { return "", nil, 0, err } return reply.Statement, infosMap, reply.TotalCount, nil } func (c *Client) QueryByKeys(databaseID string, req *client.QueryByKeysRequest) (string, map[string]any, error) { reply, err := c.queryServiceClient.QueryByKeys(context.Background(), &request.QueryByKeysRequest{ DatabaseID: databaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, Select: req.Select, KeyValues: req.KeyValues, }) if err != nil { return "", nil, err } infoMap, err := c.infoDataToInfoMap(reply.Info) if err != nil { return "", nil, err } return reply.Statement, infoMap, nil } func (c *Client) CountWhere(databaseID string, req *client.CountWhereRequest) (string, int64, error) { whereJsonBytes, err := json.Marshal(req.Where) if err != nil { return "", 0, err } reply, err := c.queryServiceClient.CountWhere(context.Background(), &request.CountWhereRequest{ DatabaseID: databaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, Where: whereJsonBytes, }) if err != nil { return "", 0, err } return reply.Statement, reply.Count, nil } func (c *Client) CommonCount(databaseID string, req *client.CommonCountRequest) (string, int64, error) { whereJsonBytes, err := json.Marshal(req.Where) if err != nil { return "", 0, err } orJsonBytes, err := json.Marshal(req.Or) if err != nil { return "", 0, err } havingJsonBytes, err := json.Marshal(req.Having) if err != nil { return "", 0, err } reply, err := c.queryServiceClient.CommonCount(context.Background(), &request.CommonCountRequest{ DatabaseID: databaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, Where: whereJsonBytes, Or: orJsonBytes, GroupBy: req.GroupBy, Join: req.Join, Having: havingJsonBytes, }) if err != nil { return "", 0, err } return reply.Statement, reply.Count, nil } func (c *Client) EventQueryByKeys(databaseID string, req *client.EventQueryByKeysRequest) (string, []client.EventInfo, int64, error) { reply, err := c.eventQueryServiceClient.EventQueryByKeys(context.Background(), &request.EventQueryByKeysRequest{ DatabaseID: databaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Select: req.Select, KeyValues: req.KeyValues, PageNo: int32(req.PageNo), PageSize: int32(req.PageSize), }) if err != nil { return "", nil, 0, err } return reply.Statement, client.FormEventInfoBatch(reply.Infos), reply.TotalCount, nil } func (c *Client) CommonEventQuery(databaseID string, req *client.CommonEventQueryRequest) (string, []client.EventInfo, int64, error) { reply, err := c.eventQueryServiceClient.CommonEventQuery(context.Background(), &request.CommonEventQueryRequest{ DatabaseID: databaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Select: req.Select, KeyValues: req.KeyValues, Version: req.Version, Operation: req.Operation, CreatorID: req.CreatorID, StartCreatedTime: req.StartCreatedTime, EndCreatedTime: req.EndCreatedTime, PageNo: int32(req.PageNo), PageSize: int32(req.PageSize), }) if err != nil { return "", nil, 0, err } return reply.Statement, client.FormEventInfoBatch(reply.Infos), reply.TotalCount, nil } func (c *Client) CountEventByKeys(databaseID string, req *client.CountEventByKeysRequest) (string, int64, error) { reply, err := c.eventQueryServiceClient.CountEventByKeys(context.Background(), &request.CountEventByKeysRequest{ DatabaseID: databaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, KeyValues: req.KeyValues, }) if err != nil { return "", 0, err } return reply.Statement, reply.Count, nil } func (c *Client) CommonCountEvent(databaseID string, req *client.CommonCountEventRequest) (string, int64, error) { reply, err := c.eventQueryServiceClient.CommonCountEvent(context.Background(), &request.CommonCountEventRequest{ DatabaseID: databaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, KeyValues: req.KeyValues, Version: req.Version, Operation: req.Operation, CreatorID: req.CreatorID, StartCreatedTime: req.StartCreatedTime, EndCreatedTime: req.EndCreatedTime, }) if err != nil { return "", 0, err } return reply.Statement, reply.Count, nil } func (c *Client) EventHistoryQueryByKeys(databaseID string, req *client.EventQueryByKeysRequest) (string, []client.EventInfo, int64, error) { reply, err := c.eventQueryServiceClient.EventHistoryQueryByKeys(context.Background(), &request.EventQueryByKeysRequest{ DatabaseID: databaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Select: req.Select, KeyValues: req.KeyValues, PageNo: int32(req.PageNo), PageSize: int32(req.PageSize), }) if err != nil { return "", nil, 0, err } return reply.Statement, client.FormEventInfoBatch(reply.Infos), reply.TotalCount, nil } func (c *Client) CommonEventHistoryQuery(databaseID string, req *client.CommonEventQueryRequest) (string, []client.EventInfo, int64, error) { reply, err := c.eventQueryServiceClient.CommonEventHistoryQuery(context.Background(), &request.CommonEventQueryRequest{ DatabaseID: databaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Select: req.Select, KeyValues: req.KeyValues, Version: req.Version, Operation: req.Operation, CreatorID: req.CreatorID, StartCreatedTime: req.StartCreatedTime, EndCreatedTime: req.EndCreatedTime, PageNo: int32(req.PageNo), PageSize: int32(req.PageSize), }) if err != nil { return "", nil, 0, err } return reply.Statement, client.FormEventInfoBatch(reply.Infos), reply.TotalCount, nil } func (c *Client) CountEventHistoryByKeys(databaseID string, req *client.CountEventByKeysRequest) (string, int64, error) { reply, err := c.eventQueryServiceClient.CountEventHistoryByKeys(context.Background(), &request.CountEventByKeysRequest{ DatabaseID: databaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, KeyValues: req.KeyValues, }) if err != nil { return "", 0, err } return reply.Statement, reply.Count, nil } func (c *Client) CommonCountEventHistory(databaseID string, req *client.CommonCountEventRequest) (string, int64, error) { reply, err := c.eventQueryServiceClient.CommonCountEventHistory(context.Background(), &request.CommonCountEventRequest{ DatabaseID: databaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, KeyValues: req.KeyValues, Version: req.Version, Operation: req.Operation, CreatorID: req.CreatorID, StartCreatedTime: req.StartCreatedTime, EndCreatedTime: req.EndCreatedTime, }) if err != nil { return "", 0, err } return reply.Statement, reply.Count, nil } func (c *Client) formTableRow(tableRow map[string]any) (*request.TableRow, error) { if tableRow == nil || len(tableRow) == 0 { return &request.TableRow{Columns: make([]*request.Column, 0)}, nil } columns := make([]*request.Column, 0) for columnName, value := range tableRow { valueType := reflect.TypeOf(value) if valueType.Kind() == reflect.Ptr { reflectValue := reflect.ValueOf(value).Elem() valueType = reflectValue.Type() value = reflectValue.Interface() } typedValue := new(request.ColumnValue) typedValue.Kind = int32(valueType.Kind()) typedValue.Type = valueType.Name() switch valueType.Name() { case "Time": timeObj := value.(time.Time) typedValue.TypedValue = &request.ColumnValue_Uint64Value{Uint64Value: uint64(timeObj.UnixNano())} case "string": typedValue.TypedValue = &request.ColumnValue_StringValue{StringValue: value.(string)} case "bool": typedValue.TypedValue = &request.ColumnValue_BoolValue{BoolValue: value.(bool)} case "int": typedValue.TypedValue = &request.ColumnValue_Uint32Value{Uint32Value: uint32(value.(int))} case "int32": typedValue.TypedValue = &request.ColumnValue_Uint32Value{Uint32Value: uint32(value.(int32))} case "int64": typedValue.TypedValue = &request.ColumnValue_Uint64Value{Uint64Value: uint64(value.(int64))} case "uint32": typedValue.TypedValue = &request.ColumnValue_Uint32Value{Uint32Value: value.(uint32)} case "uint64": typedValue.TypedValue = &request.ColumnValue_Uint64Value{Uint64Value: value.(uint64)} case "float32": typedValue.TypedValue = &request.ColumnValue_Float64Value{Float64Value: float64(value.(float32))} case "float64": typedValue.TypedValue = &request.ColumnValue_Float64Value{Float64Value: value.(float64)} default: return nil, errors.New("不支持的数据类型" + valueType.Name()) } columns = append(columns, &request.Column{ Name: columnName, Value: typedValue, }) } return &request.TableRow{Columns: columns}, nil } func (c *Client) infoDataToInfoMap(infoData *response.InfoData) (map[string]any, error) { retInfoMap := make(map[string]any) for _, column := range infoData.Columns { switch column.Value.Type { case "Time": timeObj := time.Unix(0, int64(column.Value.GetUint64Value())) retInfoMap[column.Name] = timeObj case "string": retInfoMap[column.Name] = column.Value.GetStringValue() case "bool": retInfoMap[column.Name] = column.Value.GetBoolValue() case "int": retInfoMap[column.Name] = int(column.Value.GetUint32Value()) case "int32": retInfoMap[column.Name] = int(column.Value.GetUint32Value()) case "int64": retInfoMap[column.Name] = int64(column.Value.GetUint64Value()) case "uint32": retInfoMap[column.Name] = column.Value.GetUint32Value() case "uint64": retInfoMap[column.Name] = column.Value.GetUint64Value() case "float32": retInfoMap[column.Name] = float32(column.Value.GetFloat64Value()) case "float64": retInfoMap[column.Name] = column.Value.GetFloat64Value() default: return nil, errors.New("不支持的数据类型" + column.Value.Type) } } return retInfoMap, nil } func (c *Client) infoDataToInfoMapBatch(infosData []*response.InfoData) ([]map[string]any, error) { retInfosDataMap := make([]map[string]any, 0) for _, infoData := range infosData { retInfoMap, err := c.infoDataToInfoMap(infoData) if err != nil { return nil, err } retInfosDataMap = append(retInfosDataMap, retInfoMap) } return retInfosDataMap, nil }