client.go 26 KB


  1. package dpsv1
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "git.sxidc.com/service-supports/dps-sdk/client"
  7. "git.sxidc.com/service-supports/dps-sdk/pb/v1"
  8. "git.sxidc.com/service-supports/dps-sdk/pb/v1/request"
  9. "git.sxidc.com/service-supports/dps-sdk/pb/v1/response"
  10. "google.golang.org/grpc"
  11. "google.golang.org/grpc/credentials/insecure"
  12. "io"
  13. "reflect"
  14. "strings"
  15. "sync"
  16. "time"
  17. )
  18. type Client struct {
  19. databaseID string
  20. conn *grpc.ClientConn
  21. commandServiceClient v1.CommandServiceClient
  22. queryServiceClient v1.QueryServiceClient
  23. eventQueryServiceClient v1.EventQueryServiceClient
  24. transactionMutex sync.Mutex
  25. }
  26. func NewClient(address string, databaseID string) (*Client, error) {
  27. conn, err := grpc.DialContext(context.Background(), address,
  28. grpc.WithTransportCredentials(insecure.NewCredentials()))
  29. if err != nil {
  30. return nil, err
  31. }
  32. return &Client{
  33. databaseID: databaseID,
  34. conn: conn,
  35. commandServiceClient: v1.NewCommandServiceClient(conn),
  36. queryServiceClient: v1.NewQueryServiceClient(conn),
  37. eventQueryServiceClient: v1.NewEventQueryServiceClient(conn),
  38. }, nil
  39. }
  40. func DestroyClient(client *Client) error {
  41. if client == nil {
  42. return nil
  43. }
  44. client.transactionMutex.Lock()
  45. defer client.transactionMutex.Unlock()
  46. err := client.conn.Close()
  47. if err != nil {
  48. return err
  49. }
  50. client.databaseID = ""
  51. client.conn = nil
  52. client.commandServiceClient = nil
  53. client.queryServiceClient = nil
  54. return nil
  55. }
  56. func (c *Client) AutoMigrate(req *client.AutoMigrateRequest) error {
  57. items := make([]*request.AutoMigrateItem, 0)
  58. for _, reqItem := range req.Items {
  59. tableModelDescribeJsonBytes, err := json.Marshal(reqItem.TableModelDescribe)
  60. if err != nil {
  61. return err
  62. }
  63. items = append(items, &request.AutoMigrateItem{
  64. TablePrefixWithSchema: reqItem.TablePrefixWithSchema,
  65. Version: reqItem.Version,
  66. TableModelDescribe: tableModelDescribeJsonBytes,
  67. })
  68. }
  69. _, err := c.commandServiceClient.AutoMigrate(context.Background(), &request.AutoMigrateRequest{
  70. DatabaseID: c.databaseID,
  71. Items: items,
  72. })
  73. if err != nil {
  74. return err
  75. }
  76. return nil
  77. }
  78. func (c *Client) Transaction(txFunc client.TransactionFunc) error {
  79. stream, err := c.commandServiceClient.Transaction(context.Background())
  80. if err != nil {
  81. return err
  82. }
  83. defer func() {
  84. innerErr := stream.CloseSend()
  85. if innerErr != nil {
  86. panic(innerErr)
  87. }
  88. }()
  89. err = stream.Send(&request.TransactionOperation{
  90. Request: &request.TransactionOperation_TransactionBeginRequest{
  91. TransactionBeginRequest: &request.TransactionBeginRequest{DatabaseID: c.databaseID},
  92. }})
  93. if err != nil {
  94. return err
  95. }
  96. err = txFunc(&Transaction{
  97. stream: stream,
  98. client: c,
  99. })
  100. if err != nil {
  101. return err
  102. }
  103. err = stream.Send(&request.TransactionOperation{
  104. Request: &request.TransactionOperation_TransactionEndRequest{
  105. TransactionEndRequest: &request.TransactionEndRequest{},
  106. }})
  107. if err != nil {
  108. return err
  109. }
  110. _, err = stream.Recv()
  111. if err != nil && err != io.EOF {
  112. return err
  113. }
  114. return nil
  115. }
  116. func (c *Client) Insert(req *client.InsertRequest) (string, error) {
  117. tableRowJsonBytes, err := c.formTableRow(req.TableRow)
  118. if err != nil {
  119. return "", err
  120. }
  121. reply, err := c.commandServiceClient.Insert(context.Background(), &request.InsertRequest{
  122. DatabaseID: c.databaseID,
  123. TablePrefixWithSchema: req.TablePrefixWithSchema,
  124. Version: req.Version,
  125. KeyColumns: req.KeyColumns,
  126. TableRow: tableRowJsonBytes,
  127. UserID: req.UserID,
  128. })
  129. if err != nil {
  130. return "", err
  131. }
  132. return reply.Statement, nil
  133. }
  134. func (c *Client) InsertBatch(req *client.InsertBatchRequest) (string, error) {
  135. tableRowItems := make([]*request.InsertTableRowItem, 0)
  136. for _, reqTableItem := range req.Items {
  137. tableRows := make([]*request.TableRow, 0)
  138. for _, reqTableRow := range reqTableItem.TableRows {
  139. tableRow, err := c.formTableRow(reqTableRow)
  140. if err != nil {
  141. return "", err
  142. }
  143. tableRows = append(tableRows, tableRow)
  144. }
  145. tableRowItems = append(tableRowItems, &request.InsertTableRowItem{
  146. TablePrefixWithSchema: reqTableItem.TablePrefixWithSchema,
  147. Version: reqTableItem.Version,
  148. KeyColumns: reqTableItem.KeyColumns,
  149. TableRows: tableRows,
  150. })
  151. }
  152. reply, err := c.commandServiceClient.InsertBatch(context.Background(), &request.InsertBatchRequest{
  153. DatabaseID: c.databaseID,
  154. Items: tableRowItems,
  155. UserID: req.UserID,
  156. })
  157. if err != nil {
  158. return "", err
  159. }
  160. return reply.Statement, nil
  161. }
  162. func (c *Client) Delete(req *client.DeleteRequest) (string, error) {
  163. reply, err := c.commandServiceClient.Delete(context.Background(), &request.DeleteRequest{
  164. DatabaseID: c.databaseID,
  165. TablePrefixWithSchema: req.TablePrefixWithSchema,
  166. Version: req.Version,
  167. KeyValues: req.KeyValues,
  168. UserID: req.UserID,
  169. })
  170. if err != nil {
  171. return "", err
  172. }
  173. return reply.Statement, nil
  174. }
  175. func (c *Client) DeleteBatch(req *client.DeleteBatchRequest) (string, error) {
  176. tableRowItems := make([]*request.DeleteTableRowItem, 0)
  177. for _, reqTableItem := range req.Items {
  178. items := make([]*request.DeleteItem, 0)
  179. for _, reqKeyValues := range reqTableItem.KeyValues {
  180. items = append(items, &request.DeleteItem{
  181. KeyValues: reqKeyValues,
  182. })
  183. }
  184. tableRowItems = append(tableRowItems, &request.DeleteTableRowItem{
  185. TablePrefixWithSchema: reqTableItem.TablePrefixWithSchema,
  186. Version: reqTableItem.Version,
  187. Items: items,
  188. })
  189. }
  190. reply, err := c.commandServiceClient.DeleteBatch(context.Background(), &request.DeleteBatchRequest{
  191. DatabaseID: c.databaseID,
  192. Items: tableRowItems,
  193. UserID: req.UserID,
  194. })
  195. if err != nil {
  196. return "", err
  197. }
  198. return reply.Statement, nil
  199. }
  200. func (c *Client) Update(req *client.UpdateRequest) (string, error) {
  201. newTableRowJsonByte, err := c.formTableRow(req.NewTableRow)
  202. if err != nil {
  203. return "", err
  204. }
  205. reply, err := c.commandServiceClient.Update(context.Background(), &request.UpdateRequest{
  206. DatabaseID: c.databaseID,
  207. TablePrefixWithSchema: req.TablePrefixWithSchema,
  208. Version: req.Version,
  209. KeyValues: req.KeyValues,
  210. NewTableRow: newTableRowJsonByte,
  211. UserID: req.UserID,
  212. })
  213. if err != nil {
  214. return "", err
  215. }
  216. return reply.Statement, nil
  217. }
  218. func (c *Client) Replay(req *client.ReplayRequest) (string, error) {
  219. reply, err := c.commandServiceClient.Replay(context.Background(), &request.ReplayRequest{
  220. DatabaseID: c.databaseID,
  221. TablePrefixWithSchema: req.TablePrefixWithSchema,
  222. Version: req.Version,
  223. KeyValues: req.KeyValues,
  224. UserID: req.UserID,
  225. })
  226. if err != nil {
  227. return "", err
  228. }
  229. return reply.Statement, nil
  230. }
  231. func (c *Client) QueryByWhereAndOrderBy(req *client.QueryByWhereAndOrderByRequest) (string, []map[string]any, int64, error) {
  232. selectJsonBytes, err := json.Marshal(req.Select)
  233. if err != nil {
  234. return "", nil, 0, err
  235. }
  236. whereJsonBytes, err := json.Marshal(req.Where)
  237. if err != nil {
  238. return "", nil, 0, err
  239. }
  240. reply, err := c.queryServiceClient.QueryByWhereAndOrderBy(context.Background(), &request.QueryByWhereAndOrderByRequest{
  241. DatabaseID: c.databaseID,
  242. TablePrefixWithSchema: req.TablePrefixWithSchema,
  243. Version: req.Version,
  244. Select: selectJsonBytes,
  245. Where: whereJsonBytes,
  246. OrderBy: req.OrderBy,
  247. PageNo: int32(req.PageNo),
  248. PageSize: int32(req.PageSize),
  249. })
  250. if err != nil {
  251. return "", nil, 0, err
  252. }
  253. infosMap, err := c.infoDataToInfoMapBatch(reply.Infos)
  254. if err != nil {
  255. return "", nil, 0, err
  256. }
  257. return reply.Statement, infosMap, reply.TotalCount, nil
  258. }
  259. func (c *Client) CommonQuery(req *client.CommonQueryRequest) (string, []map[string]any, int64, error) {
  260. selectJsonBytes, err := json.Marshal(req.Select)
  261. if err != nil {
  262. return "", nil, 0, err
  263. }
  264. whereJsonBytes, err := json.Marshal(req.Where)
  265. if err != nil {
  266. return "", nil, 0, err
  267. }
  268. orJsonBytes, err := json.Marshal(req.Or)
  269. if err != nil {
  270. return "", nil, 0, err
  271. }
  272. joinsJsonBytes, err := json.Marshal(req.Joins)
  273. if err != nil {
  274. return "", nil, 0, err
  275. }
  276. havingJsonBytes, err := json.Marshal(req.Having)
  277. if err != nil {
  278. return "", nil, 0, err
  279. }
  280. reply, err := c.queryServiceClient.CommonQuery(context.Background(), &request.CommonQueryRequest{
  281. DatabaseID: c.databaseID,
  282. TablePrefixWithSchema: req.TablePrefixWithSchema,
  283. Version: req.Version,
  284. Select: selectJsonBytes,
  285. Where: whereJsonBytes,
  286. OrderBy: req.OrderBy,
  287. Or: orJsonBytes,
  288. GroupBy: req.GroupBy,
  289. Joins: joinsJsonBytes,
  290. Having: havingJsonBytes,
  291. PageNo: int32(req.PageNo),
  292. PageSize: int32(req.PageSize),
  293. })
  294. if err != nil {
  295. return "", nil, 0, err
  296. }
  297. infosMap, err := c.infoDataToInfoMapBatch(reply.Infos)
  298. if err != nil {
  299. return "", nil, 0, err
  300. }
  301. return reply.Statement, infosMap, reply.TotalCount, nil
  302. }
  303. func (c *Client) QueryOnlyByWhereAndOrderBy(req *client.QueryByWhereAndOrderByRequest) (string, []map[string]any, error) {
  304. selectJsonBytes, err := json.Marshal(req.Select)
  305. if err != nil {
  306. return "", nil, err
  307. }
  308. whereJsonBytes, err := json.Marshal(req.Where)
  309. if err != nil {
  310. return "", nil, err
  311. }
  312. reply, err := c.queryServiceClient.QueryOnlyByWhereAndOrderBy(context.Background(), &request.QueryByWhereAndOrderByRequest{
  313. DatabaseID: c.databaseID,
  314. TablePrefixWithSchema: req.TablePrefixWithSchema,
  315. Version: req.Version,
  316. Select: selectJsonBytes,
  317. Where: whereJsonBytes,
  318. OrderBy: req.OrderBy,
  319. PageNo: int32(req.PageNo),
  320. PageSize: int32(req.PageSize),
  321. })
  322. if err != nil {
  323. return "", nil, err
  324. }
  325. infosMap, err := c.infoDataToInfoMapBatch(reply.Infos)
  326. if err != nil {
  327. return "", nil, err
  328. }
  329. return reply.Statement, infosMap, nil
  330. }
  331. func (c *Client) CommonQueryOnly(req *client.CommonQueryRequest) (string, []map[string]any, error) {
  332. selectJsonBytes, err := json.Marshal(req.Select)
  333. if err != nil {
  334. return "", nil, err
  335. }
  336. whereJsonBytes, err := json.Marshal(req.Where)
  337. if err != nil {
  338. return "", nil, err
  339. }
  340. orJsonBytes, err := json.Marshal(req.Or)
  341. if err != nil {
  342. return "", nil, err
  343. }
  344. joinsJsonBytes, err := json.Marshal(req.Joins)
  345. if err != nil {
  346. return "", nil, err
  347. }
  348. havingJsonBytes, err := json.Marshal(req.Having)
  349. if err != nil {
  350. return "", nil, err
  351. }
  352. reply, err := c.queryServiceClient.CommonQueryOnly(context.Background(), &request.CommonQueryRequest{
  353. DatabaseID: c.databaseID,
  354. TablePrefixWithSchema: req.TablePrefixWithSchema,
  355. Version: req.Version,
  356. Select: selectJsonBytes,
  357. Where: whereJsonBytes,
  358. OrderBy: req.OrderBy,
  359. Or: orJsonBytes,
  360. GroupBy: req.GroupBy,
  361. Joins: joinsJsonBytes,
  362. Having: havingJsonBytes,
  363. PageNo: int32(req.PageNo),
  364. PageSize: int32(req.PageSize),
  365. })
  366. if err != nil {
  367. return "", nil, err
  368. }
  369. infosMap, err := c.infoDataToInfoMapBatch(reply.Infos)
  370. if err != nil {
  371. return "", nil, err
  372. }
  373. return reply.Statement, infosMap, nil
  374. }
  375. func (c *Client) QueryByKeys(req *client.QueryByKeysRequest) (string, map[string]any, error) {
  376. selectJsonBytes, err := json.Marshal(req.Select)
  377. if err != nil {
  378. return "", nil, err
  379. }
  380. reply, err := c.queryServiceClient.QueryByKeys(context.Background(), &request.QueryByKeysRequest{
  381. DatabaseID: c.databaseID,
  382. TablePrefixWithSchema: req.TablePrefixWithSchema,
  383. Version: req.Version,
  384. Select: selectJsonBytes,
  385. KeyValues: req.KeyValues,
  386. })
  387. if err != nil {
  388. return "", nil, err
  389. }
  390. infoMap, err := c.infoDataToInfoMap(reply.Info)
  391. if err != nil {
  392. return "", nil, err
  393. }
  394. return reply.Statement, infoMap, nil
  395. }
  396. func (c *Client) CountWhere(req *client.CountWhereRequest) (string, int64, error) {
  397. whereJsonBytes, err := json.Marshal(req.Where)
  398. if err != nil {
  399. return "", 0, err
  400. }
  401. reply, err := c.queryServiceClient.CountWhere(context.Background(), &request.CountWhereRequest{
  402. DatabaseID: c.databaseID,
  403. TablePrefixWithSchema: req.TablePrefixWithSchema,
  404. Version: req.Version,
  405. Where: whereJsonBytes,
  406. })
  407. if err != nil {
  408. return "", 0, err
  409. }
  410. return reply.Statement, reply.Count, nil
  411. }
  412. func (c *Client) CommonCount(req *client.CommonCountRequest) (string, int64, error) {
  413. whereJsonBytes, err := json.Marshal(req.Where)
  414. if err != nil {
  415. return "", 0, err
  416. }
  417. orJsonBytes, err := json.Marshal(req.Or)
  418. if err != nil {
  419. return "", 0, err
  420. }
  421. joinsJsonBytes, err := json.Marshal(req.Joins)
  422. if err != nil {
  423. return "", 0, err
  424. }
  425. havingJsonBytes, err := json.Marshal(req.Having)
  426. if err != nil {
  427. return "", 0, err
  428. }
  429. reply, err := c.queryServiceClient.CommonCount(context.Background(), &request.CommonCountRequest{
  430. DatabaseID: c.databaseID,
  431. TablePrefixWithSchema: req.TablePrefixWithSchema,
  432. Version: req.Version,
  433. Where: whereJsonBytes,
  434. Or: orJsonBytes,
  435. GroupBy: req.GroupBy,
  436. Joins: joinsJsonBytes,
  437. Having: havingJsonBytes,
  438. })
  439. if err != nil {
  440. return "", 0, err
  441. }
  442. return reply.Statement, reply.Count, nil
  443. }
  444. func (c *Client) EventQueryByKeys(req *client.EventQueryByKeysRequest) (string, []client.EventInfo, int64, error) {
  445. selectJsonBytes, err := json.Marshal(req.Select)
  446. if err != nil {
  447. return "", nil, 0, err
  448. }
  449. reply, err := c.eventQueryServiceClient.EventQueryByKeys(context.Background(), &request.EventQueryByKeysRequest{
  450. DatabaseID: c.databaseID,
  451. TablePrefixWithSchema: req.TablePrefixWithSchema,
  452. Select: selectJsonBytes,
  453. KeyValues: req.KeyValues,
  454. PageNo: int32(req.PageNo),
  455. PageSize: int32(req.PageSize),
  456. })
  457. if err != nil {
  458. return "", nil, 0, err
  459. }
  460. return reply.Statement, client.FormEventInfoBatch(reply.Infos), reply.TotalCount, nil
  461. }
  462. func (c *Client) CommonEventQuery(req *client.CommonEventQueryRequest) (string, []client.EventInfo, int64, error) {
  463. selectJsonBytes, err := json.Marshal(req.Select)
  464. if err != nil {
  465. return "", nil, 0, err
  466. }
  467. reply, err := c.eventQueryServiceClient.CommonEventQuery(context.Background(), &request.CommonEventQueryRequest{
  468. DatabaseID: c.databaseID,
  469. TablePrefixWithSchema: req.TablePrefixWithSchema,
  470. Select: selectJsonBytes,
  471. KeyValues: req.KeyValues,
  472. Version: req.Version,
  473. Operation: req.Operation,
  474. CreatorID: req.CreatorID,
  475. StartCreatedTime: req.StartCreatedTime,
  476. EndCreatedTime: req.EndCreatedTime,
  477. PageNo: int32(req.PageNo),
  478. PageSize: int32(req.PageSize),
  479. })
  480. if err != nil {
  481. return "", nil, 0, err
  482. }
  483. return reply.Statement, client.FormEventInfoBatch(reply.Infos), reply.TotalCount, nil
  484. }
  485. func (c *Client) EventQueryOnlyByKeys(req *client.EventQueryByKeysRequest) (string, []client.EventInfo, error) {
  486. selectJsonBytes, err := json.Marshal(req.Select)
  487. if err != nil {
  488. return "", nil, err
  489. }
  490. reply, err := c.eventQueryServiceClient.EventQueryOnlyByKeys(context.Background(), &request.EventQueryByKeysRequest{
  491. DatabaseID: c.databaseID,
  492. TablePrefixWithSchema: req.TablePrefixWithSchema,
  493. Select: selectJsonBytes,
  494. KeyValues: req.KeyValues,
  495. PageNo: int32(req.PageNo),
  496. PageSize: int32(req.PageSize),
  497. })
  498. if err != nil {
  499. return "", nil, err
  500. }
  501. return reply.Statement, client.FormEventInfoBatch(reply.Infos), nil
  502. }
  503. func (c *Client) CommonEventQueryOnly(req *client.CommonEventQueryRequest) (string, []client.EventInfo, error) {
  504. selectJsonBytes, err := json.Marshal(req.Select)
  505. if err != nil {
  506. return "", nil, err
  507. }
  508. reply, err := c.eventQueryServiceClient.CommonEventQueryOnly(context.Background(), &request.CommonEventQueryRequest{
  509. DatabaseID: c.databaseID,
  510. TablePrefixWithSchema: req.TablePrefixWithSchema,
  511. Select: selectJsonBytes,
  512. KeyValues: req.KeyValues,
  513. Version: req.Version,
  514. Operation: req.Operation,
  515. CreatorID: req.CreatorID,
  516. StartCreatedTime: req.StartCreatedTime,
  517. EndCreatedTime: req.EndCreatedTime,
  518. PageNo: int32(req.PageNo),
  519. PageSize: int32(req.PageSize),
  520. })
  521. if err != nil {
  522. return "", nil, err
  523. }
  524. return reply.Statement, client.FormEventInfoBatch(reply.Infos), nil
  525. }
  526. func (c *Client) CountEventByKeys(req *client.CountEventByKeysRequest) (string, int64, error) {
  527. reply, err := c.eventQueryServiceClient.CountEventByKeys(context.Background(), &request.CountEventByKeysRequest{
  528. DatabaseID: c.databaseID,
  529. TablePrefixWithSchema: req.TablePrefixWithSchema,
  530. KeyValues: req.KeyValues,
  531. })
  532. if err != nil {
  533. return "", 0, err
  534. }
  535. return reply.Statement, reply.Count, nil
  536. }
  537. func (c *Client) CommonCountEvent(req *client.CommonCountEventRequest) (string, int64, error) {
  538. reply, err := c.eventQueryServiceClient.CommonCountEvent(context.Background(), &request.CommonCountEventRequest{
  539. DatabaseID: c.databaseID,
  540. TablePrefixWithSchema: req.TablePrefixWithSchema,
  541. KeyValues: req.KeyValues,
  542. Version: req.Version,
  543. Operation: req.Operation,
  544. CreatorID: req.CreatorID,
  545. StartCreatedTime: req.StartCreatedTime,
  546. EndCreatedTime: req.EndCreatedTime,
  547. })
  548. if err != nil {
  549. return "", 0, err
  550. }
  551. return reply.Statement, reply.Count, nil
  552. }
  553. func (c *Client) EventHistoryQueryByKeys(req *client.EventQueryByKeysRequest) (string, []client.EventInfo, int64, error) {
  554. selectJsonBytes, err := json.Marshal(req.Select)
  555. if err != nil {
  556. return "", nil, 0, err
  557. }
  558. reply, err := c.eventQueryServiceClient.EventHistoryQueryByKeys(context.Background(), &request.EventQueryByKeysRequest{
  559. DatabaseID: c.databaseID,
  560. TablePrefixWithSchema: req.TablePrefixWithSchema,
  561. Select: selectJsonBytes,
  562. KeyValues: req.KeyValues,
  563. PageNo: int32(req.PageNo),
  564. PageSize: int32(req.PageSize),
  565. })
  566. if err != nil {
  567. return "", nil, 0, err
  568. }
  569. return reply.Statement, client.FormEventInfoBatch(reply.Infos), reply.TotalCount, nil
  570. }
  571. func (c *Client) CommonEventHistoryQuery(req *client.CommonEventQueryRequest) (string, []client.EventInfo, int64, error) {
  572. selectJsonBytes, err := json.Marshal(req.Select)
  573. if err != nil {
  574. return "", nil, 0, err
  575. }
  576. reply, err := c.eventQueryServiceClient.CommonEventHistoryQuery(context.Background(), &request.CommonEventQueryRequest{
  577. DatabaseID: c.databaseID,
  578. TablePrefixWithSchema: req.TablePrefixWithSchema,
  579. Select: selectJsonBytes,
  580. KeyValues: req.KeyValues,
  581. Version: req.Version,
  582. Operation: req.Operation,
  583. CreatorID: req.CreatorID,
  584. StartCreatedTime: req.StartCreatedTime,
  585. EndCreatedTime: req.EndCreatedTime,
  586. PageNo: int32(req.PageNo),
  587. PageSize: int32(req.PageSize),
  588. })
  589. if err != nil {
  590. return "", nil, 0, err
  591. }
  592. return reply.Statement, client.FormEventInfoBatch(reply.Infos), reply.TotalCount, nil
  593. }
  594. func (c *Client) EventHistoryQueryOnlyByKeys(req *client.EventQueryByKeysRequest) (string, []client.EventInfo, error) {
  595. selectJsonBytes, err := json.Marshal(req.Select)
  596. if err != nil {
  597. return "", nil, err
  598. }
  599. reply, err := c.eventQueryServiceClient.EventHistoryQueryOnlyByKeys(context.Background(), &request.EventQueryByKeysRequest{
  600. DatabaseID: c.databaseID,
  601. TablePrefixWithSchema: req.TablePrefixWithSchema,
  602. Select: selectJsonBytes,
  603. KeyValues: req.KeyValues,
  604. PageNo: int32(req.PageNo),
  605. PageSize: int32(req.PageSize),
  606. })
  607. if err != nil {
  608. return "", nil, err
  609. }
  610. return reply.Statement, client.FormEventInfoBatch(reply.Infos), nil
  611. }
  612. func (c *Client) CommonEventHistoryQueryOnly(req *client.CommonEventQueryRequest) (string, []client.EventInfo, error) {
  613. selectJsonBytes, err := json.Marshal(req.Select)
  614. if err != nil {
  615. return "", nil, err
  616. }
  617. reply, err := c.eventQueryServiceClient.CommonEventHistoryQueryOnly(context.Background(), &request.CommonEventQueryRequest{
  618. DatabaseID: c.databaseID,
  619. TablePrefixWithSchema: req.TablePrefixWithSchema,
  620. Select: selectJsonBytes,
  621. KeyValues: req.KeyValues,
  622. Version: req.Version,
  623. Operation: req.Operation,
  624. CreatorID: req.CreatorID,
  625. StartCreatedTime: req.StartCreatedTime,
  626. EndCreatedTime: req.EndCreatedTime,
  627. PageNo: int32(req.PageNo),
  628. PageSize: int32(req.PageSize),
  629. })
  630. if err != nil {
  631. return "", nil, err
  632. }
  633. return reply.Statement, client.FormEventInfoBatch(reply.Infos), nil
  634. }
  635. func (c *Client) CountEventHistoryByKeys(req *client.CountEventByKeysRequest) (string, int64, error) {
  636. reply, err := c.eventQueryServiceClient.CountEventHistoryByKeys(context.Background(), &request.CountEventByKeysRequest{
  637. DatabaseID: c.databaseID,
  638. TablePrefixWithSchema: req.TablePrefixWithSchema,
  639. KeyValues: req.KeyValues,
  640. })
  641. if err != nil {
  642. return "", 0, err
  643. }
  644. return reply.Statement, reply.Count, nil
  645. }
  646. func (c *Client) CommonCountEventHistory(req *client.CommonCountEventRequest) (string, int64, error) {
  647. reply, err := c.eventQueryServiceClient.CommonCountEventHistory(context.Background(), &request.CommonCountEventRequest{
  648. DatabaseID: c.databaseID,
  649. TablePrefixWithSchema: req.TablePrefixWithSchema,
  650. KeyValues: req.KeyValues,
  651. Version: req.Version,
  652. Operation: req.Operation,
  653. CreatorID: req.CreatorID,
  654. StartCreatedTime: req.StartCreatedTime,
  655. EndCreatedTime: req.EndCreatedTime,
  656. })
  657. if err != nil {
  658. return "", 0, err
  659. }
  660. return reply.Statement, reply.Count, nil
  661. }
  662. func (c *Client) formTableRow(tableRow map[string]any) (*request.TableRow, error) {
  663. if tableRow == nil || len(tableRow) == 0 {
  664. return &request.TableRow{Columns: make([]*request.Column, 0)}, nil
  665. }
  666. columns := make([]*request.Column, 0)
  667. for columnName, value := range tableRow {
  668. valueType := reflect.TypeOf(value)
  669. if valueType.Kind() == reflect.Ptr {
  670. reflectValue := reflect.ValueOf(value).Elem()
  671. valueType = reflectValue.Type()
  672. value = reflectValue.Interface()
  673. }
  674. typedValue := new(request.ColumnValue)
  675. typedValue.Kind = int32(valueType.Kind())
  676. typedValue.Type = valueType.Name()
  677. switch valueType.Name() {
  678. case "Time":
  679. timeObj := value.(time.Time)
  680. typedValue.TypedValue = &request.ColumnValue_Uint64Value{Uint64Value: uint64(timeObj.UnixNano())}
  681. case "string":
  682. typedValue.TypedValue = &request.ColumnValue_StringValue{StringValue: value.(string)}
  683. case "bool":
  684. typedValue.TypedValue = &request.ColumnValue_BoolValue{BoolValue: value.(bool)}
  685. case "int":
  686. typedValue.TypedValue = &request.ColumnValue_Uint64Value{Uint64Value: uint64(value.(int))}
  687. case "int8":
  688. typedValue.TypedValue = &request.ColumnValue_Uint64Value{Uint64Value: uint64(value.(int8))}
  689. case "int16":
  690. typedValue.TypedValue = &request.ColumnValue_Uint64Value{Uint64Value: uint64(value.(int16))}
  691. case "int32":
  692. typedValue.TypedValue = &request.ColumnValue_Uint64Value{Uint64Value: uint64(value.(int32))}
  693. case "int64":
  694. typedValue.TypedValue = &request.ColumnValue_Uint64Value{Uint64Value: uint64(value.(int64))}
  695. case "uint":
  696. typedValue.TypedValue = &request.ColumnValue_Uint64Value{Uint64Value: uint64(value.(uint))}
  697. case "uint8":
  698. typedValue.TypedValue = &request.ColumnValue_Uint64Value{Uint64Value: uint64(value.(uint8))}
  699. case "uint16":
  700. typedValue.TypedValue = &request.ColumnValue_Uint64Value{Uint64Value: uint64(value.(uint16))}
  701. case "uint32":
  702. typedValue.TypedValue = &request.ColumnValue_Uint64Value{Uint64Value: uint64(value.(uint32))}
  703. case "uint64":
  704. typedValue.TypedValue = &request.ColumnValue_Uint64Value{Uint64Value: value.(uint64)}
  705. case "float32":
  706. typedValue.TypedValue = &request.ColumnValue_Float64Value{Float64Value: float64(value.(float32))}
  707. case "float64":
  708. typedValue.TypedValue = &request.ColumnValue_Float64Value{Float64Value: value.(float64)}
  709. default:
  710. return nil, errors.New("不支持的数据类型" + valueType.Name())
  711. }
  712. columns = append(columns, &request.Column{
  713. Name: columnName,
  714. Value: typedValue,
  715. })
  716. }
  717. return &request.TableRow{Columns: columns}, nil
  718. }
  719. func (c *Client) infoDataToInfoMap(infoData *response.InfoData) (map[string]any, error) {
  720. retInfoMap := make(map[string]any)
  721. for _, column := range infoData.Columns {
  722. typeStr := column.Value.Type
  723. if typeStr == "Time" {
  724. timeObj := time.Unix(0, int64(column.Value.GetUint64Value()))
  725. retInfoMap[column.Name] = timeObj
  726. } else if typeStr == "string" {
  727. retInfoMap[column.Name] = column.Value.GetStringValue()
  728. } else if typeStr == "bool" {
  729. retInfoMap[column.Name] = column.Value.GetBoolValue()
  730. } else if strings.Contains(typeStr, "int") {
  731. retInfoMap[column.Name] = column.Value.GetUint64Value()
  732. } else if strings.Contains(typeStr, "float") {
  733. retInfoMap[column.Name] = column.Value.GetFloat64Value()
  734. } else {
  735. return nil, errors.New("不支持的数据类型" + column.Value.Type)
  736. }
  737. }
  738. return retInfoMap, nil
  739. }
  740. func (c *Client) infoDataToInfoMapBatch(infosData []*response.InfoData) ([]map[string]any, error) {
  741. retInfosDataMap := make([]map[string]any, 0)
  742. for _, infoData := range infosData {
  743. retInfoMap, err := c.infoDataToInfoMap(infoData)
  744. if err != nil {
  745. return nil, err
  746. }
  747. retInfosDataMap = append(retInfosDataMap, retInfoMap)
  748. }
  749. return retInfosDataMap, nil
  750. }