package dpsv1 import ( "context" "encoding/json" "errors" "git.sxidc.com/service-supports/dps-sdk/pb/v1" "git.sxidc.com/service-supports/dps-sdk/pb/v1/request" "git.sxidc.com/service-supports/dps-sdk/pb/v1/response" "git.sxidc.com/service-supports/dps-sdk/ports" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "reflect" "time" ) type Client struct { conn *grpc.ClientConn commandServiceClient v1.CommandServiceClient queryServiceClient v1.QueryServiceClient } func NewClient(address string) (*Client, error) { conn, err := grpc.DialContext(context.Background(), address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } return &Client{ conn: conn, commandServiceClient: v1.NewCommandServiceClient(conn), queryServiceClient: v1.NewQueryServiceClient(conn), }, nil } func DestroyClient(client *Client) error { if client == nil { return nil } err := client.conn.Close() if err != nil { return err } client.conn = nil client.commandServiceClient = nil client.queryServiceClient = nil client = nil return nil } func (c *Client) AutoMigrate(req *ports.AutoMigrateRequest) error { tableModelDescribeJsonBytes, err := json.Marshal(req.TableModelDescribe) if err != nil { return err } _, err = c.commandServiceClient.AutoMigrate(context.Background(), &request.AutoMigrateRequest{ DatabaseID: req.DatabaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, TableModelDescribe: tableModelDescribeJsonBytes, }) if err != nil { return err } return nil } func (c *Client) Insert(req *ports.InsertRequest) (string, error) { tableRowJsonBytes, err := c.formTableRow(req.TableRow) if err != nil { return "", err } reply, err := c.commandServiceClient.Insert(context.Background(), &request.InsertRequest{ DatabaseID: req.DatabaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, KeyColumns: req.KeyColumns, TableRow: tableRowJsonBytes, UserID: req.UserID, }) if err != nil { return "", err } return reply.Statement, nil } // TODO InsertBatch func (c *Client) Delete(req *ports.DeleteRequest) (string, error) { reply, err := c.commandServiceClient.Delete(context.Background(), &request.DeleteRequest{ DatabaseID: req.DatabaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, KeyValues: req.KeyValues, UserID: req.UserID, }) if err != nil { return "", err } return reply.Statement, nil } // TODO DeleteBatch func (c *Client) Update(req *ports.UpdateRequest) (string, error) { newTableRowJsonByte, err := c.formTableRow(req.NewTableRow) if err != nil { return "", err } reply, err := c.commandServiceClient.Update(context.Background(), &request.UpdateRequest{ DatabaseID: req.DatabaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, KeyValues: req.KeyValues, NewTableRow: newTableRowJsonByte, UserID: req.UserID, }) if err != nil { return "", err } return reply.Statement, nil } func (c *Client) Replay(req *ports.ReplayRequest) (string, error) { reply, err := c.commandServiceClient.Replay(context.Background(), &request.ReplayRequest{ DatabaseID: req.DatabaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, KeyValues: req.KeyValues, UserID: req.UserID, }) if err != nil { return "", err } return reply.Statement, nil } func (c *Client) QueryByWhereAndOrderBy(req *ports.QueryByWhereAndOrderByRequest) (string, []map[string]any, int64, error) { whereJsonBytes, err := json.Marshal(req.Where) if err != nil { return "", nil, 0, err } reply, err := c.queryServiceClient.QueryByWhereAndOrderBy(context.Background(), &request.QueryByWhereAndOrderByRequest{ DatabaseID: req.DatabaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, Select: req.Select, Where: whereJsonBytes, OrderBy: req.OrderBy, PageNo: req.PageNo, PageSize: req.PageSize, }) if err != nil { return "", nil, 0, err } infosMap, err := c.infoDataToInfoMapBatch(reply.Infos) if err != nil { return "", nil, 0, err } return reply.Statement, infosMap, reply.TotalCount, nil } func (c *Client) CommonQuery(req *ports.CommonQueryRequest) (string, []map[string]any, int64, error) { whereJsonBytes, err := json.Marshal(req.Where) if err != nil { return "", nil, 0, err } orJsonBytes, err := json.Marshal(req.Or) if err != nil { return "", nil, 0, err } havingJsonBytes, err := json.Marshal(req.Having) if err != nil { return "", nil, 0, err } reply, err := c.queryServiceClient.CommonQuery(context.Background(), &request.CommonQueryRequest{ DatabaseID: req.DatabaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, Select: req.Select, Where: whereJsonBytes, OrderBy: req.OrderBy, Or: orJsonBytes, GroupBy: req.GroupBy, Join: req.Join, Having: havingJsonBytes, PageNo: req.PageNo, PageSize: req.PageSize, }) if err != nil { return "", nil, 0, err } infosMap, err := c.infoDataToInfoMapBatch(reply.Infos) if err != nil { return "", nil, 0, err } return reply.Statement, infosMap, reply.TotalCount, nil } func (c *Client) QueryByKeys(req *ports.QueryByKeysRequest) (string, map[string]any, error) { reply, err := c.queryServiceClient.QueryByKeys(context.Background(), &request.QueryByKeysRequest{ DatabaseID: req.DatabaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, Select: req.Select, KeyValues: req.KeyValues, }) if err != nil { return "", nil, err } infoMap, err := c.infoDataToInfoMap(reply.Info) if err != nil { return "", nil, err } return reply.Statement, infoMap, nil } func (c *Client) CountWhere(req *ports.CountWhereRequest) (string, int64, error) { whereJsonBytes, err := json.Marshal(req.Where) if err != nil { return "", 0, err } reply, err := c.queryServiceClient.CountWhere(context.Background(), &request.CountWhereRequest{ DatabaseID: req.DatabaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, Where: whereJsonBytes, PageNo: req.PageNo, PageSize: req.PageSize, }) if err != nil { return "", 0, err } return reply.Statement, reply.Count, nil } func (c *Client) CommonCount(req *ports.CommonCountRequest) (string, int64, error) { whereJsonBytes, err := json.Marshal(req.Where) if err != nil { return "", 0, err } orJsonBytes, err := json.Marshal(req.Or) if err != nil { return "", 0, err } havingJsonBytes, err := json.Marshal(req.Having) if err != nil { return "", 0, err } reply, err := c.queryServiceClient.CommonCount(context.Background(), &request.CommonCountRequest{ DatabaseID: req.DatabaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, Where: whereJsonBytes, Or: orJsonBytes, GroupBy: req.GroupBy, Join: req.Join, Having: havingJsonBytes, PageNo: req.PageNo, PageSize: req.PageSize, }) if err != nil { return "", 0, err } return reply.Statement, reply.Count, nil } func (c *Client) formTableRow(tableRow map[string]any) (*request.TableRow, error) { if tableRow == nil || len(tableRow) == 0 { return &request.TableRow{Columns: make([]*request.Column, 0)}, nil } columns := make([]*request.Column, 0) for columnName, value := range tableRow { valueType := reflect.TypeOf(value) if valueType.Kind() == reflect.Ptr { reflectValue := reflect.ValueOf(value).Elem() valueType = reflectValue.Type() value = reflectValue.Interface() } typedValue := new(request.ColumnValue) typedValue.Kind = int32(valueType.Kind()) typedValue.Type = valueType.Name() switch valueType.Name() { case "Time": timeObj := value.(time.Time) typedValue.TypedValue = &request.ColumnValue_Uint64Value{Uint64Value: uint64(timeObj.UnixNano())} case "string": typedValue.TypedValue = &request.ColumnValue_StringValue{StringValue: value.(string)} case "bool": typedValue.TypedValue = &request.ColumnValue_BoolValue{BoolValue: value.(bool)} case "int": typedValue.TypedValue = &request.ColumnValue_Uint32Value{Uint32Value: uint32(value.(int))} case "int32": typedValue.TypedValue = &request.ColumnValue_Uint32Value{Uint32Value: uint32(value.(int32))} case "int64": typedValue.TypedValue = &request.ColumnValue_Uint64Value{Uint64Value: uint64(value.(int64))} case "uint32": typedValue.TypedValue = &request.ColumnValue_Uint32Value{Uint32Value: value.(uint32)} case "uint64": typedValue.TypedValue = &request.ColumnValue_Uint64Value{Uint64Value: value.(uint64)} case "float32": typedValue.TypedValue = &request.ColumnValue_Float64Value{Float64Value: float64(value.(float32))} case "float64": typedValue.TypedValue = &request.ColumnValue_Float64Value{Float64Value: value.(float64)} default: return nil, errors.New("不支持的数据类型" + valueType.Name()) } columns = append(columns, &request.Column{ Name: columnName, Value: typedValue, }) } return &request.TableRow{Columns: columns}, nil } func (c *Client) infoDataToInfoMap(infoData *response.InfoData) (map[string]any, error) { retInfoMap := make(map[string]any) for _, column := range infoData.Columns { switch column.Value.Type { case "Time": timeObj := time.Unix(0, int64(column.Value.GetUint64Value())) retInfoMap[column.Name] = timeObj case "string": retInfoMap[column.Name] = column.Value.GetStringValue() case "bool": retInfoMap[column.Name] = column.Value.GetBoolValue() case "int": retInfoMap[column.Name] = int(column.Value.GetUint32Value()) case "int32": retInfoMap[column.Name] = int(column.Value.GetUint32Value()) case "int64": retInfoMap[column.Name] = int64(column.Value.GetUint64Value()) case "uint32": retInfoMap[column.Name] = column.Value.GetUint32Value() case "uint64": retInfoMap[column.Name] = column.Value.GetUint64Value() case "float32": retInfoMap[column.Name] = float32(column.Value.GetFloat64Value()) case "float64": retInfoMap[column.Name] = column.Value.GetFloat64Value() default: return nil, errors.New("不支持的数据类型" + column.Value.Type) } } return retInfoMap, nil } func (c *Client) infoDataToInfoMapBatch(infosData []*response.InfoData) ([]map[string]any, error) { retInfosDataMap := make([]map[string]any, 0) for _, infoData := range infosData { retInfoMap, err := c.infoDataToInfoMap(infoData) if err != nil { return nil, err } retInfosDataMap = append(retInfosDataMap, retInfoMap) } return retInfosDataMap, nil }