transaction.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package sdk
  2. import (
  3. "encoding/json"
  4. "errors"
  5. v1 "git.sxidc.com/service-supports/ds-sdk/grpc_client/v1"
  6. "git.sxidc.com/service-supports/ds-sdk/grpc_client/v1/request"
  7. )
  8. type TxFunc func(tx *Transaction) error
  9. type Transaction struct {
  10. stream v1.SqlService_TransactionClient
  11. }
  12. func (tx *Transaction) ExecuteRawSql(sql string, executeParams map[string]any) ([]SqlResult, error) {
  13. var retErr error
  14. defer func() {
  15. if retErr != nil {
  16. innerErr := tx.stream.CloseSend()
  17. if innerErr != nil {
  18. panic(innerErr)
  19. }
  20. }
  21. }()
  22. executeParamsJsonBytes, err := json.Marshal(executeParams)
  23. if err != nil {
  24. retErr = err
  25. return nil, retErr
  26. }
  27. err = tx.stream.SendMsg(&request.TransactionOperation{
  28. Request: &request.TransactionOperation_ExecuteRawSqlRequest{
  29. ExecuteRawSqlRequest: &request.ExecuteRawSqlRequest{
  30. SQL: sql,
  31. ExecuteParams: string(executeParamsJsonBytes),
  32. },
  33. },
  34. })
  35. if err != nil {
  36. retErr = err
  37. return nil, retErr
  38. }
  39. resp, err := tx.stream.Recv()
  40. if err != nil {
  41. retErr = err
  42. return nil, retErr
  43. }
  44. if !resp.Success {
  45. retErr = errors.New(resp.Msg)
  46. return nil, retErr
  47. }
  48. tableRows := make([]map[string]any, 0)
  49. err = json.Unmarshal([]byte(resp.Results), &tableRows)
  50. if err != nil {
  51. retErr = err
  52. return nil, retErr
  53. }
  54. results := make([]SqlResult, len(tableRows))
  55. for i, row := range tableRows {
  56. results[i] = row
  57. }
  58. return results, nil
  59. }
  60. func (tx *Transaction) ExecuteSql(name string, executeParams map[string]any) ([]SqlResult, error) {
  61. var retErr error
  62. defer func() {
  63. if retErr != nil {
  64. innerErr := tx.stream.CloseSend()
  65. if innerErr != nil {
  66. panic(innerErr)
  67. }
  68. }
  69. }()
  70. executeParamsJsonBytes, err := json.Marshal(executeParams)
  71. if err != nil {
  72. retErr = err
  73. return nil, retErr
  74. }
  75. err = tx.stream.Send(&request.TransactionOperation{
  76. Request: &request.TransactionOperation_ExecuteSqlRequest{
  77. ExecuteSqlRequest: &request.ExecuteSqlRequest{
  78. Name: name,
  79. ExecuteParams: string(executeParamsJsonBytes),
  80. },
  81. },
  82. })
  83. if err != nil {
  84. retErr = err
  85. return nil, retErr
  86. }
  87. resp, err := tx.stream.Recv()
  88. if err != nil {
  89. retErr = err
  90. return nil, retErr
  91. }
  92. if !resp.Success {
  93. retErr = errors.New(resp.Msg)
  94. return nil, retErr
  95. }
  96. tableRows := make([]map[string]any, 0)
  97. err = json.Unmarshal([]byte(resp.Results), &tableRows)
  98. if err != nil {
  99. retErr = err
  100. return nil, retErr
  101. }
  102. results := make([]SqlResult, len(tableRows))
  103. for i, row := range tableRows {
  104. results[i] = row
  105. }
  106. return results, nil
  107. }