client.go 19 KB


  1. package dpsv1
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "git.sxidc.com/service-supports/dps-sdk/client"
  7. "git.sxidc.com/service-supports/dps-sdk/pb/v1"
  8. "git.sxidc.com/service-supports/dps-sdk/pb/v1/request"
  9. "git.sxidc.com/service-supports/dps-sdk/pb/v1/response"
  10. "google.golang.org/grpc"
  11. "google.golang.org/grpc/credentials/insecure"
  12. "reflect"
  13. "time"
  14. )
  15. type Client struct {
  16. conn *grpc.ClientConn
  17. commandServiceClient v1.CommandServiceClient
  18. queryServiceClient v1.QueryServiceClient
  19. eventQueryServiceClient v1.EventQueryServiceClient
  20. }
  21. func NewClient(address string) (*Client, error) {
  22. conn, err := grpc.DialContext(context.Background(), address,
  23. grpc.WithTransportCredentials(insecure.NewCredentials()))
  24. if err != nil {
  25. return nil, err
  26. }
  27. return &Client{
  28. conn: conn,
  29. commandServiceClient: v1.NewCommandServiceClient(conn),
  30. queryServiceClient: v1.NewQueryServiceClient(conn),
  31. eventQueryServiceClient: v1.NewEventQueryServiceClient(conn),
  32. }, nil
  33. }
  34. func DestroyClient(client *Client) error {
  35. if client == nil {
  36. return nil
  37. }
  38. err := client.conn.Close()
  39. if err != nil {
  40. return err
  41. }
  42. client.conn = nil
  43. client.commandServiceClient = nil
  44. client.queryServiceClient = nil
  45. client = nil
  46. return nil
  47. }
  48. func (c *Client) AutoMigrate(databaseID string, req *client.AutoMigrateRequest) error {
  49. tableModelDescribeJsonBytes, err := json.Marshal(req.TableModelDescribe)
  50. if err != nil {
  51. return err
  52. }
  53. _, err = c.commandServiceClient.AutoMigrate(context.Background(), &request.AutoMigrateRequest{
  54. DatabaseID: databaseID,
  55. TablePrefixWithSchema: req.TablePrefixWithSchema,
  56. Version: req.Version,
  57. TableModelDescribe: tableModelDescribeJsonBytes,
  58. })
  59. if err != nil {
  60. return err
  61. }
  62. return nil
  63. }
  64. func (c *Client) AutoMigrateBatch(databaseID string, req *client.AutoMigrateBatchRequest) error {
  65. items := make([]*request.AutoMigrateItem, 0)
  66. for _, reqItem := range req.Items {
  67. tableModelDescribeJsonBytes, err := json.Marshal(reqItem.TableModelDescribe)
  68. if err != nil {
  69. return err
  70. }
  71. items = append(items, &request.AutoMigrateItem{
  72. TablePrefixWithSchema: reqItem.TablePrefixWithSchema,
  73. Version: reqItem.Version,
  74. TableModelDescribe: tableModelDescribeJsonBytes,
  75. })
  76. }
  77. _, err := c.commandServiceClient.AutoMigrateBatch(context.Background(), &request.AutoMigrateBatchRequest{
  78. DatabaseID: databaseID,
  79. Items: items,
  80. })
  81. if err != nil {
  82. return err
  83. }
  84. return nil
  85. }
  86. func (c *Client) Transaction(databaseID string, txFunc client.TransactionFunc) error {
  87. stream, err := c.commandServiceClient.Transaction(context.Background())
  88. if err != nil {
  89. return err
  90. }
  91. err = stream.Send(&request.TransactionOperation{
  92. Request: &request.TransactionOperation_TransactionBeginRequest{
  93. TransactionBeginRequest: &request.TransactionBeginRequest{DatabaseID: databaseID},
  94. }})
  95. if err != nil {
  96. return err
  97. }
  98. err = txFunc(&Transaction{
  99. stream: stream,
  100. client: c,
  101. })
  102. if err != nil {
  103. return err
  104. }
  105. return nil
  106. }
  107. func (c *Client) Insert(databaseID string, req *client.InsertRequest) (string, error) {
  108. tableRowJsonBytes, err := c.formTableRow(req.TableRow)
  109. if err != nil {
  110. return "", err
  111. }
  112. reply, err := c.commandServiceClient.Insert(context.Background(), &request.InsertRequest{
  113. DatabaseID: databaseID,
  114. TablePrefixWithSchema: req.TablePrefixWithSchema,
  115. Version: req.Version,
  116. KeyColumns: req.KeyColumns,
  117. TableRow: tableRowJsonBytes,
  118. UserID: req.UserID,
  119. })
  120. if err != nil {
  121. return "", err
  122. }
  123. return reply.Statement, nil
  124. }
  125. func (c *Client) InsertBatch(databaseID string, req *client.InsertBatchRequest) (string, error) {
  126. tableItems := make([]*request.InsertTableItem, 0)
  127. for _, reqTableItem := range req.Items {
  128. items := make([]*request.InsertItem, 0)
  129. for _, reqItem := range reqTableItem.Items {
  130. tableRow, err := c.formTableRow(reqItem.TableRow)
  131. if err != nil {
  132. return "", err
  133. }
  134. items = append(items, &request.InsertItem{
  135. KeyColumns: reqItem.KeyColumns,
  136. TableRow: tableRow,
  137. })
  138. }
  139. tableItems = append(tableItems, &request.InsertTableItem{
  140. TablePrefixWithSchema: reqTableItem.TablePrefixWithSchema,
  141. Version: reqTableItem.Version,
  142. Items: items,
  143. })
  144. }
  145. reply, err := c.commandServiceClient.InsertBatch(context.Background(), &request.InsertBatchRequest{
  146. DatabaseID: databaseID,
  147. Items: tableItems,
  148. UserID: req.UserID,
  149. })
  150. if err != nil {
  151. return "", err
  152. }
  153. return reply.Statement, nil
  154. }
  155. func (c *Client) Delete(databaseID string, req *client.DeleteRequest) (string, error) {
  156. reply, err := c.commandServiceClient.Delete(context.Background(), &request.DeleteRequest{
  157. DatabaseID: databaseID,
  158. TablePrefixWithSchema: req.TablePrefixWithSchema,
  159. Version: req.Version,
  160. KeyValues: req.KeyValues,
  161. UserID: req.UserID,
  162. })
  163. if err != nil {
  164. return "", err
  165. }
  166. return reply.Statement, nil
  167. }
  168. func (c *Client) DeleteBatch(databaseID string, req *client.DeleteBatchRequest) (string, error) {
  169. tableItems := make([]*request.DeleteTableItem, 0)
  170. for _, reqTableItem := range req.Items {
  171. items := make([]*request.DeleteItem, 0)
  172. for _, reqItem := range reqTableItem.Items {
  173. items = append(items, &request.DeleteItem{
  174. KeyValues: reqItem.KeyValues,
  175. })
  176. }
  177. tableItems = append(tableItems, &request.DeleteTableItem{
  178. TablePrefixWithSchema: reqTableItem.TablePrefixWithSchema,
  179. Version: reqTableItem.Version,
  180. Items: items,
  181. })
  182. }
  183. reply, err := c.commandServiceClient.DeleteBatch(context.Background(), &request.DeleteBatchRequest{
  184. DatabaseID: databaseID,
  185. Items: tableItems,
  186. UserID: req.UserID,
  187. })
  188. if err != nil {
  189. return "", err
  190. }
  191. return reply.Statement, nil
  192. }
  193. func (c *Client) Update(databaseID string, req *client.UpdateRequest) (string, error) {
  194. newTableRowJsonByte, err := c.formTableRow(req.NewTableRow)
  195. if err != nil {
  196. return "", err
  197. }
  198. reply, err := c.commandServiceClient.Update(context.Background(), &request.UpdateRequest{
  199. DatabaseID: databaseID,
  200. TablePrefixWithSchema: req.TablePrefixWithSchema,
  201. Version: req.Version,
  202. KeyValues: req.KeyValues,
  203. NewTableRow: newTableRowJsonByte,
  204. UserID: req.UserID,
  205. })
  206. if err != nil {
  207. return "", err
  208. }
  209. return reply.Statement, nil
  210. }
  211. func (c *Client) Replay(databaseID string, req *client.ReplayRequest) (string, error) {
  212. reply, err := c.commandServiceClient.Replay(context.Background(), &request.ReplayRequest{
  213. DatabaseID: databaseID,
  214. TablePrefixWithSchema: req.TablePrefixWithSchema,
  215. Version: req.Version,
  216. KeyValues: req.KeyValues,
  217. UserID: req.UserID,
  218. })
  219. if err != nil {
  220. return "", err
  221. }
  222. return reply.Statement, nil
  223. }
  224. func (c *Client) QueryByWhereAndOrderBy(databaseID string, req *client.QueryByWhereAndOrderByRequest) (string, []map[string]any, int64, error) {
  225. whereJsonBytes, err := json.Marshal(req.Where)
  226. if err != nil {
  227. return "", nil, 0, err
  228. }
  229. reply, err := c.queryServiceClient.QueryByWhereAndOrderBy(context.Background(), &request.QueryByWhereAndOrderByRequest{
  230. DatabaseID: databaseID,
  231. TablePrefixWithSchema: req.TablePrefixWithSchema,
  232. Version: req.Version,
  233. Select: req.Select,
  234. Where: whereJsonBytes,
  235. OrderBy: req.OrderBy,
  236. PageNo: int32(req.PageNo),
  237. PageSize: int32(req.PageSize),
  238. })
  239. if err != nil {
  240. return "", nil, 0, err
  241. }
  242. infosMap, err := c.infoDataToInfoMapBatch(reply.Infos)
  243. if err != nil {
  244. return "", nil, 0, err
  245. }
  246. return reply.Statement, infosMap, reply.TotalCount, nil
  247. }
  248. func (c *Client) CommonQuery(databaseID string, req *client.CommonQueryRequest) (string, []map[string]any, int64, error) {
  249. whereJsonBytes, err := json.Marshal(req.Where)
  250. if err != nil {
  251. return "", nil, 0, err
  252. }
  253. orJsonBytes, err := json.Marshal(req.Or)
  254. if err != nil {
  255. return "", nil, 0, err
  256. }
  257. havingJsonBytes, err := json.Marshal(req.Having)
  258. if err != nil {
  259. return "", nil, 0, err
  260. }
  261. reply, err := c.queryServiceClient.CommonQuery(context.Background(), &request.CommonQueryRequest{
  262. DatabaseID: databaseID,
  263. TablePrefixWithSchema: req.TablePrefixWithSchema,
  264. Version: req.Version,
  265. Select: req.Select,
  266. Where: whereJsonBytes,
  267. OrderBy: req.OrderBy,
  268. Or: orJsonBytes,
  269. GroupBy: req.GroupBy,
  270. Join: req.Join,
  271. Having: havingJsonBytes,
  272. PageNo: int32(req.PageNo),
  273. PageSize: int32(req.PageSize),
  274. })
  275. if err != nil {
  276. return "", nil, 0, err
  277. }
  278. infosMap, err := c.infoDataToInfoMapBatch(reply.Infos)
  279. if err != nil {
  280. return "", nil, 0, err
  281. }
  282. return reply.Statement, infosMap, reply.TotalCount, nil
  283. }
  284. func (c *Client) QueryByKeys(databaseID string, req *client.QueryByKeysRequest) (string, map[string]any, error) {
  285. reply, err := c.queryServiceClient.QueryByKeys(context.Background(), &request.QueryByKeysRequest{
  286. DatabaseID: databaseID,
  287. TablePrefixWithSchema: req.TablePrefixWithSchema,
  288. Version: req.Version,
  289. Select: req.Select,
  290. KeyValues: req.KeyValues,
  291. })
  292. if err != nil {
  293. return "", nil, err
  294. }
  295. infoMap, err := c.infoDataToInfoMap(reply.Info)
  296. if err != nil {
  297. return "", nil, err
  298. }
  299. return reply.Statement, infoMap, nil
  300. }
  301. func (c *Client) CountWhere(databaseID string, req *client.CountWhereRequest) (string, int64, error) {
  302. whereJsonBytes, err := json.Marshal(req.Where)
  303. if err != nil {
  304. return "", 0, err
  305. }
  306. reply, err := c.queryServiceClient.CountWhere(context.Background(), &request.CountWhereRequest{
  307. DatabaseID: databaseID,
  308. TablePrefixWithSchema: req.TablePrefixWithSchema,
  309. Version: req.Version,
  310. Where: whereJsonBytes,
  311. })
  312. if err != nil {
  313. return "", 0, err
  314. }
  315. return reply.Statement, reply.Count, nil
  316. }
  317. func (c *Client) CommonCount(databaseID string, req *client.CommonCountRequest) (string, int64, error) {
  318. whereJsonBytes, err := json.Marshal(req.Where)
  319. if err != nil {
  320. return "", 0, err
  321. }
  322. orJsonBytes, err := json.Marshal(req.Or)
  323. if err != nil {
  324. return "", 0, err
  325. }
  326. havingJsonBytes, err := json.Marshal(req.Having)
  327. if err != nil {
  328. return "", 0, err
  329. }
  330. reply, err := c.queryServiceClient.CommonCount(context.Background(), &request.CommonCountRequest{
  331. DatabaseID: databaseID,
  332. TablePrefixWithSchema: req.TablePrefixWithSchema,
  333. Version: req.Version,
  334. Where: whereJsonBytes,
  335. Or: orJsonBytes,
  336. GroupBy: req.GroupBy,
  337. Join: req.Join,
  338. Having: havingJsonBytes,
  339. })
  340. if err != nil {
  341. return "", 0, err
  342. }
  343. return reply.Statement, reply.Count, nil
  344. }
  345. func (c *Client) EventQueryByKeys(databaseID string, req *client.EventQueryByKeysRequest) (string, []client.EventInfo, int64, error) {
  346. reply, err := c.eventQueryServiceClient.EventQueryByKeys(context.Background(), &request.EventQueryByKeysRequest{
  347. DatabaseID: databaseID,
  348. TablePrefixWithSchema: req.TablePrefixWithSchema,
  349. Select: req.Select,
  350. KeyValues: req.KeyValues,
  351. PageNo: int32(req.PageNo),
  352. PageSize: int32(req.PageSize),
  353. })
  354. if err != nil {
  355. return "", nil, 0, err
  356. }
  357. return reply.Statement, client.FormEventInfoBatch(reply.Infos), reply.TotalCount, nil
  358. }
  359. func (c *Client) CommonEventQuery(databaseID string, req *client.CommonEventQueryRequest) (string, []client.EventInfo, int64, error) {
  360. reply, err := c.eventQueryServiceClient.CommonEventQuery(context.Background(), &request.CommonEventQueryRequest{
  361. DatabaseID: databaseID,
  362. TablePrefixWithSchema: req.TablePrefixWithSchema,
  363. Select: req.Select,
  364. KeyValues: req.KeyValues,
  365. Version: req.Version,
  366. Operation: req.Operation,
  367. CreatorID: req.CreatorID,
  368. StartCreatedTime: req.StartCreatedTime,
  369. EndCreatedTime: req.EndCreatedTime,
  370. PageNo: int32(req.PageNo),
  371. PageSize: int32(req.PageSize),
  372. })
  373. if err != nil {
  374. return "", nil, 0, err
  375. }
  376. return reply.Statement, client.FormEventInfoBatch(reply.Infos), reply.TotalCount, nil
  377. }
  378. func (c *Client) CountEventByKeys(databaseID string, req *client.CountEventByKeysRequest) (string, int64, error) {
  379. reply, err := c.eventQueryServiceClient.CountEventByKeys(context.Background(), &request.CountEventByKeysRequest{
  380. DatabaseID: databaseID,
  381. TablePrefixWithSchema: req.TablePrefixWithSchema,
  382. KeyValues: req.KeyValues,
  383. })
  384. if err != nil {
  385. return "", 0, err
  386. }
  387. return reply.Statement, reply.Count, nil
  388. }
  389. func (c *Client) CommonCountEvent(databaseID string, req *client.CommonCountEventRequest) (string, int64, error) {
  390. reply, err := c.eventQueryServiceClient.CommonCountEvent(context.Background(), &request.CommonCountEventRequest{
  391. DatabaseID: databaseID,
  392. TablePrefixWithSchema: req.TablePrefixWithSchema,
  393. KeyValues: req.KeyValues,
  394. Version: req.Version,
  395. Operation: req.Operation,
  396. CreatorID: req.CreatorID,
  397. StartCreatedTime: req.StartCreatedTime,
  398. EndCreatedTime: req.EndCreatedTime,
  399. })
  400. if err != nil {
  401. return "", 0, err
  402. }
  403. return reply.Statement, reply.Count, nil
  404. }
  405. func (c *Client) EventHistoryQueryByKeys(databaseID string, req *client.EventQueryByKeysRequest) (string, []client.EventInfo, int64, error) {
  406. reply, err := c.eventQueryServiceClient.EventHistoryQueryByKeys(context.Background(), &request.EventQueryByKeysRequest{
  407. DatabaseID: databaseID,
  408. TablePrefixWithSchema: req.TablePrefixWithSchema,
  409. Select: req.Select,
  410. KeyValues: req.KeyValues,
  411. PageNo: int32(req.PageNo),
  412. PageSize: int32(req.PageSize),
  413. })
  414. if err != nil {
  415. return "", nil, 0, err
  416. }
  417. return reply.Statement, client.FormEventInfoBatch(reply.Infos), reply.TotalCount, nil
  418. }
  419. func (c *Client) CommonEventHistoryQuery(databaseID string, req *client.CommonEventQueryRequest) (string, []client.EventInfo, int64, error) {
  420. reply, err := c.eventQueryServiceClient.CommonEventHistoryQuery(context.Background(), &request.CommonEventQueryRequest{
  421. DatabaseID: databaseID,
  422. TablePrefixWithSchema: req.TablePrefixWithSchema,
  423. Select: req.Select,
  424. KeyValues: req.KeyValues,
  425. Version: req.Version,
  426. Operation: req.Operation,
  427. CreatorID: req.CreatorID,
  428. StartCreatedTime: req.StartCreatedTime,
  429. EndCreatedTime: req.EndCreatedTime,
  430. PageNo: int32(req.PageNo),
  431. PageSize: int32(req.PageSize),
  432. })
  433. if err != nil {
  434. return "", nil, 0, err
  435. }
  436. return reply.Statement, client.FormEventInfoBatch(reply.Infos), reply.TotalCount, nil
  437. }
  438. func (c *Client) CountEventHistoryByKeys(databaseID string, req *client.CountEventByKeysRequest) (string, int64, error) {
  439. reply, err := c.eventQueryServiceClient.CountEventHistoryByKeys(context.Background(), &request.CountEventByKeysRequest{
  440. DatabaseID: databaseID,
  441. TablePrefixWithSchema: req.TablePrefixWithSchema,
  442. KeyValues: req.KeyValues,
  443. })
  444. if err != nil {
  445. return "", 0, err
  446. }
  447. return reply.Statement, reply.Count, nil
  448. }
  449. func (c *Client) CommonCountEventHistory(databaseID string, req *client.CommonCountEventRequest) (string, int64, error) {
  450. reply, err := c.eventQueryServiceClient.CommonCountEventHistory(context.Background(), &request.CommonCountEventRequest{
  451. DatabaseID: databaseID,
  452. TablePrefixWithSchema: req.TablePrefixWithSchema,
  453. KeyValues: req.KeyValues,
  454. Version: req.Version,
  455. Operation: req.Operation,
  456. CreatorID: req.CreatorID,
  457. StartCreatedTime: req.StartCreatedTime,
  458. EndCreatedTime: req.EndCreatedTime,
  459. })
  460. if err != nil {
  461. return "", 0, err
  462. }
  463. return reply.Statement, reply.Count, nil
  464. }
  465. func (c *Client) formTableRow(tableRow map[string]any) (*request.TableRow, error) {
  466. if tableRow == nil || len(tableRow) == 0 {
  467. return &request.TableRow{Columns: make([]*request.Column, 0)}, nil
  468. }
  469. columns := make([]*request.Column, 0)
  470. for columnName, value := range tableRow {
  471. valueType := reflect.TypeOf(value)
  472. if valueType.Kind() == reflect.Ptr {
  473. reflectValue := reflect.ValueOf(value).Elem()
  474. valueType = reflectValue.Type()
  475. value = reflectValue.Interface()
  476. }
  477. typedValue := new(request.ColumnValue)
  478. typedValue.Kind = int32(valueType.Kind())
  479. typedValue.Type = valueType.Name()
  480. switch valueType.Name() {
  481. case "Time":
  482. timeObj := value.(time.Time)
  483. typedValue.TypedValue = &request.ColumnValue_Uint64Value{Uint64Value: uint64(timeObj.UnixNano())}
  484. case "string":
  485. typedValue.TypedValue = &request.ColumnValue_StringValue{StringValue: value.(string)}
  486. case "bool":
  487. typedValue.TypedValue = &request.ColumnValue_BoolValue{BoolValue: value.(bool)}
  488. case "int":
  489. typedValue.TypedValue = &request.ColumnValue_Uint32Value{Uint32Value: uint32(value.(int))}
  490. case "int32":
  491. typedValue.TypedValue = &request.ColumnValue_Uint32Value{Uint32Value: uint32(value.(int32))}
  492. case "int64":
  493. typedValue.TypedValue = &request.ColumnValue_Uint64Value{Uint64Value: uint64(value.(int64))}
  494. case "uint32":
  495. typedValue.TypedValue = &request.ColumnValue_Uint32Value{Uint32Value: value.(uint32)}
  496. case "uint64":
  497. typedValue.TypedValue = &request.ColumnValue_Uint64Value{Uint64Value: value.(uint64)}
  498. case "float32":
  499. typedValue.TypedValue = &request.ColumnValue_Float64Value{Float64Value: float64(value.(float32))}
  500. case "float64":
  501. typedValue.TypedValue = &request.ColumnValue_Float64Value{Float64Value: value.(float64)}
  502. default:
  503. return nil, errors.New("不支持的数据类型" + valueType.Name())
  504. }
  505. columns = append(columns, &request.Column{
  506. Name: columnName,
  507. Value: typedValue,
  508. })
  509. }
  510. return &request.TableRow{Columns: columns}, nil
  511. }
  512. func (c *Client) infoDataToInfoMap(infoData *response.InfoData) (map[string]any, error) {
  513. retInfoMap := make(map[string]any)
  514. for _, column := range infoData.Columns {
  515. switch column.Value.Type {
  516. case "Time":
  517. timeObj := time.Unix(0, int64(column.Value.GetUint64Value()))
  518. retInfoMap[column.Name] = timeObj
  519. case "string":
  520. retInfoMap[column.Name] = column.Value.GetStringValue()
  521. case "bool":
  522. retInfoMap[column.Name] = column.Value.GetBoolValue()
  523. case "int":
  524. retInfoMap[column.Name] = int(column.Value.GetUint32Value())
  525. case "int32":
  526. retInfoMap[column.Name] = int(column.Value.GetUint32Value())
  527. case "int64":
  528. retInfoMap[column.Name] = int64(column.Value.GetUint64Value())
  529. case "uint32":
  530. retInfoMap[column.Name] = column.Value.GetUint32Value()
  531. case "uint64":
  532. retInfoMap[column.Name] = column.Value.GetUint64Value()
  533. case "float32":
  534. retInfoMap[column.Name] = float32(column.Value.GetFloat64Value())
  535. case "float64":
  536. retInfoMap[column.Name] = column.Value.GetFloat64Value()
  537. default:
  538. return nil, errors.New("不支持的数据类型" + column.Value.Type)
  539. }
  540. }
  541. return retInfoMap, nil
  542. }
  543. func (c *Client) infoDataToInfoMapBatch(infosData []*response.InfoData) ([]map[string]any, error) {
  544. retInfosDataMap := make([]map[string]any, 0)
  545. for _, infoData := range infosData {
  546. retInfoMap, err := c.infoDataToInfoMap(infoData)
  547. if err != nil {
  548. return nil, err
  549. }
  550. retInfosDataMap = append(retInfosDataMap, retInfoMap)
  551. }
  552. return retInfosDataMap, nil
  553. }