client.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. package dpsv1
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "git.sxidc.com/service-supports/dps-sdk/pb/v1"
  7. "git.sxidc.com/service-supports/dps-sdk/pb/v1/request"
  8. "git.sxidc.com/service-supports/dps-sdk/pb/v1/response"
  9. "git.sxidc.com/service-supports/dps-sdk/ports"
  10. "google.golang.org/grpc"
  11. "google.golang.org/grpc/credentials/insecure"
  12. "reflect"
  13. "time"
  14. )
  15. type Client struct {
  16. conn *grpc.ClientConn
  17. commandServiceClient v1.CommandServiceClient
  18. queryServiceClient v1.QueryServiceClient
  19. }
  20. func NewClient(address string) (*Client, error) {
  21. conn, err := grpc.DialContext(context.Background(), address,
  22. grpc.WithTransportCredentials(insecure.NewCredentials()))
  23. if err != nil {
  24. return nil, err
  25. }
  26. return &Client{
  27. conn: conn,
  28. commandServiceClient: v1.NewCommandServiceClient(conn),
  29. queryServiceClient: v1.NewQueryServiceClient(conn),
  30. }, nil
  31. }
  32. func DestroyClient(client *Client) error {
  33. if client == nil {
  34. return nil
  35. }
  36. err := client.conn.Close()
  37. if err != nil {
  38. return err
  39. }
  40. client.conn = nil
  41. client.commandServiceClient = nil
  42. client.queryServiceClient = nil
  43. client = nil
  44. return nil
  45. }
  46. func (c *Client) AutoMigrate(req *ports.AutoMigrateRequest) error {
  47. tableModelDescribeJsonBytes, err := json.Marshal(req.TableModelDescribe)
  48. if err != nil {
  49. return err
  50. }
  51. _, err = c.commandServiceClient.AutoMigrate(context.Background(), &request.AutoMigrateRequest{
  52. DatabaseID: req.DatabaseID,
  53. TablePrefixWithSchema: req.TablePrefixWithSchema,
  54. Version: req.Version,
  55. TableModelDescribe: tableModelDescribeJsonBytes,
  56. })
  57. if err != nil {
  58. return err
  59. }
  60. return nil
  61. }
  62. func (c *Client) Transaction(databaseID string, txFunc ports.TransactionFunc) error {
  63. stream, err := c.commandServiceClient.Transaction(context.Background())
  64. if err != nil {
  65. return err
  66. }
  67. err = stream.Send(&request.TransactionOperation{
  68. Request: &request.TransactionOperation_TransactionBeginRequest{
  69. TransactionBeginRequest: &request.TransactionBeginRequest{DatabaseID: databaseID},
  70. }})
  71. if err != nil {
  72. return err
  73. }
  74. err = txFunc(&Transaction{
  75. stream: stream,
  76. client: c,
  77. })
  78. if err != nil {
  79. return err
  80. }
  81. return nil
  82. }
  83. func (c *Client) Insert(req *ports.InsertRequest) (string, error) {
  84. tableRowJsonBytes, err := c.formTableRow(req.TableRow)
  85. if err != nil {
  86. return "", err
  87. }
  88. reply, err := c.commandServiceClient.Insert(context.Background(), &request.InsertRequest{
  89. DatabaseID: req.DatabaseID,
  90. TablePrefixWithSchema: req.TablePrefixWithSchema,
  91. Version: req.Version,
  92. KeyColumns: req.KeyColumns,
  93. TableRow: tableRowJsonBytes,
  94. UserID: req.UserID,
  95. })
  96. if err != nil {
  97. return "", err
  98. }
  99. return reply.Statement, nil
  100. }
  101. func (c *Client) InsertBatch(req *ports.InsertBatchRequest) (string, error) {
  102. tableItems := make([]*request.InsertTableItem, 0)
  103. for _, reqTableItem := range req.Items {
  104. items := make([]*request.InsertItem, 0)
  105. for _, reqItem := range reqTableItem.Items {
  106. tableRow, err := c.formTableRow(reqItem.TableRow)
  107. if err != nil {
  108. return "", err
  109. }
  110. items = append(items, &request.InsertItem{
  111. KeyColumns: reqItem.KeyColumns,
  112. TableRow: tableRow,
  113. })
  114. }
  115. tableItems = append(tableItems, &request.InsertTableItem{
  116. TablePrefixWithSchema: reqTableItem.TablePrefixWithSchema,
  117. Version: reqTableItem.Version,
  118. Items: items,
  119. })
  120. }
  121. reply, err := c.commandServiceClient.InsertBatch(context.Background(), &request.InsertBatchRequest{
  122. DatabaseID: req.DatabaseID,
  123. Items: tableItems,
  124. UserID: req.UserID,
  125. })
  126. if err != nil {
  127. return "", err
  128. }
  129. return reply.Statement, nil
  130. }
  131. func (c *Client) Delete(req *ports.DeleteRequest) (string, error) {
  132. reply, err := c.commandServiceClient.Delete(context.Background(), &request.DeleteRequest{
  133. DatabaseID: req.DatabaseID,
  134. TablePrefixWithSchema: req.TablePrefixWithSchema,
  135. Version: req.Version,
  136. KeyValues: req.KeyValues,
  137. UserID: req.UserID,
  138. })
  139. if err != nil {
  140. return "", err
  141. }
  142. return reply.Statement, nil
  143. }
  144. func (c *Client) DeleteBatch(req *ports.DeleteBatchRequest) (string, error) {
  145. tableItems := make([]*request.DeleteTableItem, 0)
  146. for _, reqTableItem := range req.Items {
  147. items := make([]*request.DeleteItem, 0)
  148. for _, reqItem := range reqTableItem.Items {
  149. items = append(items, &request.DeleteItem{
  150. KeyValues: reqItem.KeyValues,
  151. })
  152. }
  153. tableItems = append(tableItems, &request.DeleteTableItem{
  154. TablePrefixWithSchema: reqTableItem.TablePrefixWithSchema,
  155. Version: reqTableItem.Version,
  156. Items: items,
  157. })
  158. }
  159. reply, err := c.commandServiceClient.DeleteBatch(context.Background(), &request.DeleteBatchRequest{
  160. DatabaseID: req.DatabaseID,
  161. Items: tableItems,
  162. UserID: req.UserID,
  163. })
  164. if err != nil {
  165. return "", err
  166. }
  167. return reply.Statement, nil
  168. }
  169. func (c *Client) Update(req *ports.UpdateRequest) (string, error) {
  170. newTableRowJsonByte, err := c.formTableRow(req.NewTableRow)
  171. if err != nil {
  172. return "", err
  173. }
  174. reply, err := c.commandServiceClient.Update(context.Background(), &request.UpdateRequest{
  175. DatabaseID: req.DatabaseID,
  176. TablePrefixWithSchema: req.TablePrefixWithSchema,
  177. Version: req.Version,
  178. KeyValues: req.KeyValues,
  179. NewTableRow: newTableRowJsonByte,
  180. UserID: req.UserID,
  181. })
  182. if err != nil {
  183. return "", err
  184. }
  185. return reply.Statement, nil
  186. }
  187. func (c *Client) Replay(req *ports.ReplayRequest) (string, error) {
  188. reply, err := c.commandServiceClient.Replay(context.Background(), &request.ReplayRequest{
  189. DatabaseID: req.DatabaseID,
  190. TablePrefixWithSchema: req.TablePrefixWithSchema,
  191. Version: req.Version,
  192. KeyValues: req.KeyValues,
  193. UserID: req.UserID,
  194. })
  195. if err != nil {
  196. return "", err
  197. }
  198. return reply.Statement, nil
  199. }
  200. func (c *Client) QueryByWhereAndOrderBy(req *ports.QueryByWhereAndOrderByRequest) (string, []map[string]any, int64, error) {
  201. whereJsonBytes, err := json.Marshal(req.Where)
  202. if err != nil {
  203. return "", nil, 0, err
  204. }
  205. reply, err := c.queryServiceClient.QueryByWhereAndOrderBy(context.Background(), &request.QueryByWhereAndOrderByRequest{
  206. DatabaseID: req.DatabaseID,
  207. TablePrefixWithSchema: req.TablePrefixWithSchema,
  208. Version: req.Version,
  209. Select: req.Select,
  210. Where: whereJsonBytes,
  211. OrderBy: req.OrderBy,
  212. PageNo: req.PageNo,
  213. PageSize: req.PageSize,
  214. })
  215. if err != nil {
  216. return "", nil, 0, err
  217. }
  218. infosMap, err := c.infoDataToInfoMapBatch(reply.Infos)
  219. if err != nil {
  220. return "", nil, 0, err
  221. }
  222. return reply.Statement, infosMap, reply.TotalCount, nil
  223. }
  224. func (c *Client) CommonQuery(req *ports.CommonQueryRequest) (string, []map[string]any, int64, error) {
  225. whereJsonBytes, err := json.Marshal(req.Where)
  226. if err != nil {
  227. return "", nil, 0, err
  228. }
  229. orJsonBytes, err := json.Marshal(req.Or)
  230. if err != nil {
  231. return "", nil, 0, err
  232. }
  233. havingJsonBytes, err := json.Marshal(req.Having)
  234. if err != nil {
  235. return "", nil, 0, err
  236. }
  237. reply, err := c.queryServiceClient.CommonQuery(context.Background(), &request.CommonQueryRequest{
  238. DatabaseID: req.DatabaseID,
  239. TablePrefixWithSchema: req.TablePrefixWithSchema,
  240. Version: req.Version,
  241. Select: req.Select,
  242. Where: whereJsonBytes,
  243. OrderBy: req.OrderBy,
  244. Or: orJsonBytes,
  245. GroupBy: req.GroupBy,
  246. Join: req.Join,
  247. Having: havingJsonBytes,
  248. PageNo: req.PageNo,
  249. PageSize: req.PageSize,
  250. })
  251. if err != nil {
  252. return "", nil, 0, err
  253. }
  254. infosMap, err := c.infoDataToInfoMapBatch(reply.Infos)
  255. if err != nil {
  256. return "", nil, 0, err
  257. }
  258. return reply.Statement, infosMap, reply.TotalCount, nil
  259. }
  260. func (c *Client) QueryByKeys(req *ports.QueryByKeysRequest) (string, map[string]any, error) {
  261. reply, err := c.queryServiceClient.QueryByKeys(context.Background(), &request.QueryByKeysRequest{
  262. DatabaseID: req.DatabaseID,
  263. TablePrefixWithSchema: req.TablePrefixWithSchema,
  264. Version: req.Version,
  265. Select: req.Select,
  266. KeyValues: req.KeyValues,
  267. })
  268. if err != nil {
  269. return "", nil, err
  270. }
  271. infoMap, err := c.infoDataToInfoMap(reply.Info)
  272. if err != nil {
  273. return "", nil, err
  274. }
  275. return reply.Statement, infoMap, nil
  276. }
  277. func (c *Client) CountWhere(req *ports.CountWhereRequest) (string, int64, error) {
  278. whereJsonBytes, err := json.Marshal(req.Where)
  279. if err != nil {
  280. return "", 0, err
  281. }
  282. reply, err := c.queryServiceClient.CountWhere(context.Background(), &request.CountWhereRequest{
  283. DatabaseID: req.DatabaseID,
  284. TablePrefixWithSchema: req.TablePrefixWithSchema,
  285. Version: req.Version,
  286. Where: whereJsonBytes,
  287. })
  288. if err != nil {
  289. return "", 0, err
  290. }
  291. return reply.Statement, reply.Count, nil
  292. }
  293. func (c *Client) CommonCount(req *ports.CommonCountRequest) (string, int64, error) {
  294. whereJsonBytes, err := json.Marshal(req.Where)
  295. if err != nil {
  296. return "", 0, err
  297. }
  298. orJsonBytes, err := json.Marshal(req.Or)
  299. if err != nil {
  300. return "", 0, err
  301. }
  302. havingJsonBytes, err := json.Marshal(req.Having)
  303. if err != nil {
  304. return "", 0, err
  305. }
  306. reply, err := c.queryServiceClient.CommonCount(context.Background(), &request.CommonCountRequest{
  307. DatabaseID: req.DatabaseID,
  308. TablePrefixWithSchema: req.TablePrefixWithSchema,
  309. Version: req.Version,
  310. Where: whereJsonBytes,
  311. Or: orJsonBytes,
  312. GroupBy: req.GroupBy,
  313. Join: req.Join,
  314. Having: havingJsonBytes,
  315. })
  316. if err != nil {
  317. return "", 0, err
  318. }
  319. return reply.Statement, reply.Count, nil
  320. }
  321. func (c *Client) formTableRow(tableRow map[string]any) (*request.TableRow, error) {
  322. if tableRow == nil || len(tableRow) == 0 {
  323. return &request.TableRow{Columns: make([]*request.Column, 0)}, nil
  324. }
  325. columns := make([]*request.Column, 0)
  326. for columnName, value := range tableRow {
  327. valueType := reflect.TypeOf(value)
  328. if valueType.Kind() == reflect.Ptr {
  329. reflectValue := reflect.ValueOf(value).Elem()
  330. valueType = reflectValue.Type()
  331. value = reflectValue.Interface()
  332. }
  333. typedValue := new(request.ColumnValue)
  334. typedValue.Kind = int32(valueType.Kind())
  335. typedValue.Type = valueType.Name()
  336. switch valueType.Name() {
  337. case "Time":
  338. timeObj := value.(time.Time)
  339. typedValue.TypedValue = &request.ColumnValue_Uint64Value{Uint64Value: uint64(timeObj.UnixNano())}
  340. case "string":
  341. typedValue.TypedValue = &request.ColumnValue_StringValue{StringValue: value.(string)}
  342. case "bool":
  343. typedValue.TypedValue = &request.ColumnValue_BoolValue{BoolValue: value.(bool)}
  344. case "int":
  345. typedValue.TypedValue = &request.ColumnValue_Uint32Value{Uint32Value: uint32(value.(int))}
  346. case "int32":
  347. typedValue.TypedValue = &request.ColumnValue_Uint32Value{Uint32Value: uint32(value.(int32))}
  348. case "int64":
  349. typedValue.TypedValue = &request.ColumnValue_Uint64Value{Uint64Value: uint64(value.(int64))}
  350. case "uint32":
  351. typedValue.TypedValue = &request.ColumnValue_Uint32Value{Uint32Value: value.(uint32)}
  352. case "uint64":
  353. typedValue.TypedValue = &request.ColumnValue_Uint64Value{Uint64Value: value.(uint64)}
  354. case "float32":
  355. typedValue.TypedValue = &request.ColumnValue_Float64Value{Float64Value: float64(value.(float32))}
  356. case "float64":
  357. typedValue.TypedValue = &request.ColumnValue_Float64Value{Float64Value: value.(float64)}
  358. default:
  359. return nil, errors.New("不支持的数据类型" + valueType.Name())
  360. }
  361. columns = append(columns, &request.Column{
  362. Name: columnName,
  363. Value: typedValue,
  364. })
  365. }
  366. return &request.TableRow{Columns: columns}, nil
  367. }
  368. func (c *Client) infoDataToInfoMap(infoData *response.InfoData) (map[string]any, error) {
  369. retInfoMap := make(map[string]any)
  370. for _, column := range infoData.Columns {
  371. switch column.Value.Type {
  372. case "Time":
  373. timeObj := time.Unix(0, int64(column.Value.GetUint64Value()))
  374. retInfoMap[column.Name] = timeObj
  375. case "string":
  376. retInfoMap[column.Name] = column.Value.GetStringValue()
  377. case "bool":
  378. retInfoMap[column.Name] = column.Value.GetBoolValue()
  379. case "int":
  380. retInfoMap[column.Name] = int(column.Value.GetUint32Value())
  381. case "int32":
  382. retInfoMap[column.Name] = int(column.Value.GetUint32Value())
  383. case "int64":
  384. retInfoMap[column.Name] = int64(column.Value.GetUint64Value())
  385. case "uint32":
  386. retInfoMap[column.Name] = column.Value.GetUint32Value()
  387. case "uint64":
  388. retInfoMap[column.Name] = column.Value.GetUint64Value()
  389. case "float32":
  390. retInfoMap[column.Name] = float32(column.Value.GetFloat64Value())
  391. case "float64":
  392. retInfoMap[column.Name] = column.Value.GetFloat64Value()
  393. default:
  394. return nil, errors.New("不支持的数据类型" + column.Value.Type)
  395. }
  396. }
  397. return retInfoMap, nil
  398. }
  399. func (c *Client) infoDataToInfoMapBatch(infosData []*response.InfoData) ([]map[string]any, error) {
  400. retInfosDataMap := make([]map[string]any, 0)
  401. for _, infoData := range infosData {
  402. retInfoMap, err := c.infoDataToInfoMap(infoData)
  403. if err != nil {
  404. return nil, err
  405. }
  406. retInfosDataMap = append(retInfosDataMap, retInfoMap)
  407. }
  408. return retInfosDataMap, nil
  409. }