package dpsv1 import ( "context" "encoding/json" "errors" "git.sxidc.com/service-supports/dps-sdk/pb/v1" "git.sxidc.com/service-supports/dps-sdk/pb/v1/request" "git.sxidc.com/service-supports/dps-sdk/pb/v1/response" "git.sxidc.com/service-supports/dps-sdk/ports" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "time" ) type Client struct { conn *grpc.ClientConn commandServiceClient v1.CommandServiceClient queryServiceClient v1.QueryServiceClient } func NewClient(address string) (*Client, error) { conn, err := grpc.DialContext(context.Background(), address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } return &Client{ conn: conn, commandServiceClient: v1.NewCommandServiceClient(conn), queryServiceClient: v1.NewQueryServiceClient(conn), }, nil } func DestroyClient(client *Client) error { if client == nil { return nil } err := client.conn.Close() if err != nil { return err } client.conn = nil client.commandServiceClient = nil client.queryServiceClient = nil client = nil return nil } func (c *Client) AutoMigrate(req *ports.AutoMigrateRequest) error { tableModelDescribeJsonBytes, err := json.Marshal(req.TableModelDescribe) if err != nil { return err } _, err = c.commandServiceClient.AutoMigrate(context.Background(), &request.AutoMigrateRequest{ DatabaseID: req.DatabaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, TableModelDescribe: tableModelDescribeJsonBytes, }) if err != nil { return err } return nil } func (c *Client) Insert(req *ports.InsertRequest) (string, error) { keysJsonBytes, err := json.Marshal(req.Keys) if err != nil { return "", err } tableRowJsonBytes, err := json.Marshal(req.TableRow) if err != nil { return "", err } reply, err := c.commandServiceClient.Insert(context.Background(), &request.InsertRequest{ DatabaseID: req.DatabaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, Keys: keysJsonBytes, TableRow: tableRowJsonBytes, UserID: req.UserID, }) if err != nil { return "", err } return reply.Statement, nil } // TODO InsertBatch func (c *Client) Delete(req *ports.DeleteRequest) (string, error) { keysJsonBytes, err := json.Marshal(req.Keys) if err != nil { return "", err } reply, err := c.commandServiceClient.Delete(context.Background(), &request.DeleteRequest{ DatabaseID: req.DatabaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, Keys: keysJsonBytes, UserID: req.UserID, }) if err != nil { return "", err } return reply.Statement, nil } // TODO DeleteBatch func (c *Client) Update(req *ports.UpdateRequest) (string, error) { keysJsonBytes, err := json.Marshal(req.Keys) if err != nil { return "", err } newTableRowJsonByte, err := json.Marshal(req.NewTableRow) if err != nil { return "", err } reply, err := c.commandServiceClient.Update(context.Background(), &request.UpdateRequest{ DatabaseID: req.DatabaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, Keys: keysJsonBytes, NewTableRow: newTableRowJsonByte, UserID: req.UserID, }) if err != nil { return "", err } return reply.Statement, nil } func (c *Client) Replay(req *ports.ReplayRequest) (string, error) { keysJsonBytes, err := json.Marshal(req.Keys) if err != nil { return "", err } reply, err := c.commandServiceClient.Replay(context.Background(), &request.ReplayRequest{ DatabaseID: req.DatabaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, Keys: keysJsonBytes, UserID: req.UserID, }) if err != nil { return "", err } return reply.Statement, nil } func (c *Client) QueryByWhereAndOrderBy(req *ports.QueryByWhereAndOrderByRequest) (string, []map[string]any, int64, error) { whereJsonBytes, err := json.Marshal(req.Where) if err != nil { return "", nil, 0, err } reply, err := c.queryServiceClient.QueryByWhereAndOrderBy(context.Background(), &request.QueryByWhereAndOrderByRequest{ DatabaseID: req.DatabaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, Select: req.Select, Where: whereJsonBytes, OrderBy: req.OrderBy, PageNo: req.PageNo, PageSize: req.PageSize, }) if err != nil { return "", nil, 0, err } infosMap, err := c.infoDataToInfoMapBatch(reply.Infos) if err != nil { return "", nil, 0, err } return reply.Statement, infosMap, reply.TotalCount, nil } func (c *Client) CommonQuery(req *ports.CommonQueryRequest) (string, []map[string]any, int64, error) { whereJsonBytes, err := json.Marshal(req.Where) if err != nil { return "", nil, 0, err } orJsonBytes, err := json.Marshal(req.Or) if err != nil { return "", nil, 0, err } havingJsonBytes, err := json.Marshal(req.Having) if err != nil { return "", nil, 0, err } reply, err := c.queryServiceClient.CommonQuery(context.Background(), &request.CommonQueryRequest{ DatabaseID: req.DatabaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, Select: req.Select, Where: whereJsonBytes, OrderBy: req.OrderBy, Or: orJsonBytes, GroupBy: req.GroupBy, Join: req.Join, Having: havingJsonBytes, PageNo: req.PageNo, PageSize: req.PageSize, }) if err != nil { return "", nil, 0, err } infosMap, err := c.infoDataToInfoMapBatch(reply.Infos) if err != nil { return "", nil, 0, err } return reply.Statement, infosMap, reply.TotalCount, nil } func (c *Client) QueryByKeys(req *ports.QueryByKeysRequest) (string, map[string]any, error) { keysJsonBytes, err := json.Marshal(req.Keys) if err != nil { return "", nil, err } whereJsonBytes, err := json.Marshal(req.Where) if err != nil { return "", nil, err } reply, err := c.queryServiceClient.QueryByKeys(context.Background(), &request.QueryByKeysRequest{ DatabaseID: req.DatabaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, Select: req.Select, Keys: keysJsonBytes, Where: whereJsonBytes, }) if err != nil { return "", nil, err } infoMap, err := c.infoDataToInfoMap(reply.Info) if err != nil { return "", nil, err } return reply.Statement, infoMap, nil } func (c *Client) CommonQueryByKeys(req *ports.CommonQueryByKeysRequest) (string, map[string]any, error) { keysJsonBytes, err := json.Marshal(req.Keys) if err != nil { return "", nil, err } whereJsonBytes, err := json.Marshal(req.Where) if err != nil { return "", nil, err } orJsonBytes, err := json.Marshal(req.Or) if err != nil { return "", nil, err } reply, err := c.queryServiceClient.CommonQueryByKeys(context.Background(), &request.CommonQueryByKeysRequest{ DatabaseID: req.DatabaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, Select: req.Select, Keys: keysJsonBytes, Where: whereJsonBytes, Or: orJsonBytes, }) if err != nil { return "", nil, err } infoMap, err := c.infoDataToInfoMap(reply.Info) if err != nil { return "", nil, err } return reply.Statement, infoMap, nil } func (c *Client) CountWhere(req *ports.CountWhereRequest) (string, int64, error) { whereJsonBytes, err := json.Marshal(req.Where) if err != nil { return "", 0, err } reply, err := c.queryServiceClient.CountWhere(context.Background(), &request.CountWhereRequest{ DatabaseID: req.DatabaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, Where: whereJsonBytes, PageNo: req.PageNo, PageSize: req.PageSize, }) if err != nil { return "", 0, err } return reply.Statement, reply.Count, nil } func (c *Client) CommonCount(req *ports.CommonCountRequest) (string, int64, error) { whereJsonBytes, err := json.Marshal(req.Where) if err != nil { return "", 0, err } orJsonBytes, err := json.Marshal(req.Or) if err != nil { return "", 0, err } havingJsonBytes, err := json.Marshal(req.Having) if err != nil { return "", 0, err } reply, err := c.queryServiceClient.CommonCount(context.Background(), &request.CommonCountRequest{ DatabaseID: req.DatabaseID, TablePrefixWithSchema: req.TablePrefixWithSchema, Version: req.Version, Where: whereJsonBytes, Or: orJsonBytes, GroupBy: req.GroupBy, Join: req.Join, Having: havingJsonBytes, PageNo: req.PageNo, PageSize: req.PageSize, }) if err != nil { return "", 0, err } return reply.Statement, reply.Count, nil } func (c *Client) infoDataToInfoMap(infoData *response.InfoData) (map[string]any, error) { retInfoMap := make(map[string]any) for _, column := range infoData.Columns { switch column.Value.Type { case "Time": timeObj := time.Unix(0, int64(column.Value.GetUint64Value())) retInfoMap[column.Name] = timeObj case "string": retInfoMap[column.Name] = column.Value.GetStringValue() case "bool": retInfoMap[column.Name] = column.Value.GetBoolValue() case "int32": retInfoMap[column.Name] = int(column.Value.GetUint32Value()) case "int64": retInfoMap[column.Name] = int64(column.Value.GetUint64Value()) case "uint32": retInfoMap[column.Name] = column.Value.GetUint32Value() case "uint64": retInfoMap[column.Name] = column.Value.GetUint64Value() case "float32": retInfoMap[column.Name] = float32(column.Value.GetFloat64Value()) case "float64": retInfoMap[column.Name] = column.Value.GetFloat64Value() default: return nil, errors.New("不支持的数据类型" + column.Value.Type) } } return retInfoMap, nil } func (c *Client) infoDataToInfoMapBatch(infosData []*response.InfoData) ([]map[string]any, error) { retInfosDataMap := make([]map[string]any, 0) for _, infoData := range infosData { retInfoMap, err := c.infoDataToInfoMap(infoData) if err != nil { return nil, err } retInfosDataMap = append(retInfosDataMap, retInfoMap) } return retInfosDataMap, nil }