transaction.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package sdk
  2. import (
  3. "encoding/json"
  4. v1 "git.sxidc.com/service-supports/ds-sdk/grpc_client/v1"
  5. "git.sxidc.com/service-supports/ds-sdk/grpc_client/v1/request"
  6. )
  7. type TxFunc func(tx *Transaction) error
  8. type Transaction struct {
  9. stream v1.SqlService_TransactionClient
  10. }
  11. func (tx *Transaction) ExecuteRawSql(sql string, executeParams map[string]any) ([]map[string]any, error) {
  12. var retErr error
  13. defer func() {
  14. if retErr != nil {
  15. innerErr := tx.stream.CloseSend()
  16. if innerErr != nil {
  17. panic(innerErr)
  18. }
  19. }
  20. }()
  21. executeParamsJsonBytes, err := json.Marshal(executeParams)
  22. if err != nil {
  23. retErr = err
  24. return nil, retErr
  25. }
  26. err = tx.stream.SendMsg(&request.TransactionOperation{
  27. Request: &request.TransactionOperation_ExecuteRawSqlRequest{
  28. ExecuteRawSqlRequest: &request.ExecuteRawSqlRequest{
  29. SQL: sql,
  30. ExecuteParams: string(executeParamsJsonBytes),
  31. },
  32. },
  33. })
  34. if err != nil {
  35. retErr = err
  36. return nil, retErr
  37. }
  38. resp, err := tx.stream.Recv()
  39. if err != nil {
  40. retErr = err
  41. return nil, retErr
  42. }
  43. tableRows := make([]map[string]any, 0)
  44. err = json.Unmarshal([]byte(resp.Results), &tableRows)
  45. if err != nil {
  46. retErr = err
  47. return nil, retErr
  48. }
  49. return tableRows, nil
  50. }
  51. func (tx *Transaction) ExecuteSql(name string, executeParams map[string]any) ([]map[string]any, error) {
  52. var retErr error
  53. defer func() {
  54. if retErr != nil {
  55. innerErr := tx.stream.CloseSend()
  56. if innerErr != nil {
  57. panic(innerErr)
  58. }
  59. }
  60. }()
  61. executeParamsJsonBytes, err := json.Marshal(executeParams)
  62. if err != nil {
  63. retErr = err
  64. return nil, retErr
  65. }
  66. err = tx.stream.Send(&request.TransactionOperation{
  67. Request: &request.TransactionOperation_ExecuteSqlRequest{
  68. ExecuteSqlRequest: &request.ExecuteSqlRequest{
  69. Name: name,
  70. ExecuteParams: string(executeParamsJsonBytes),
  71. },
  72. },
  73. })
  74. if err != nil {
  75. retErr = err
  76. return nil, retErr
  77. }
  78. resp, err := tx.stream.Recv()
  79. if err != nil {
  80. retErr = err
  81. return nil, retErr
  82. }
  83. tableRows := make([]map[string]any, 0)
  84. err = json.Unmarshal([]byte(resp.Results), &tableRows)
  85. if err != nil {
  86. retErr = err
  87. return nil, retErr
  88. }
  89. return tableRows, nil
  90. }