transaction.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package sdk
  2. import (
  3. "encoding/json"
  4. "errors"
  5. v1 "git.sxidc.com/service-supports/ds-sdk/grpc_client/v1"
  6. "git.sxidc.com/service-supports/ds-sdk/grpc_client/v1/request"
  7. )
  8. type TxFunc func(tx *Transaction) error
  9. type Transaction struct {
  10. stream v1.SqlService_TransactionClient
  11. }
  12. func (tx *Transaction) ExecuteRawSql(sql string, executeParams map[string]any) ([]map[string]any, error) {
  13. var retErr error
  14. defer func() {
  15. if retErr != nil {
  16. innerErr := tx.stream.CloseSend()
  17. if innerErr != nil {
  18. panic(innerErr)
  19. }
  20. }
  21. }()
  22. executeParamsJsonBytes, err := json.Marshal(executeParams)
  23. if err != nil {
  24. retErr = err
  25. return nil, retErr
  26. }
  27. err = tx.stream.SendMsg(&request.TransactionOperation{
  28. Request: &request.TransactionOperation_ExecuteRawSqlRequest{
  29. ExecuteRawSqlRequest: &request.ExecuteRawSqlRequest{
  30. SQL: sql,
  31. ExecuteParams: string(executeParamsJsonBytes),
  32. },
  33. },
  34. })
  35. if err != nil {
  36. retErr = err
  37. return nil, retErr
  38. }
  39. resp, err := tx.stream.Recv()
  40. if err != nil {
  41. retErr = err
  42. return nil, retErr
  43. }
  44. if !resp.Success {
  45. retErr = errors.New(resp.Msg)
  46. return nil, retErr
  47. }
  48. tableRows := make([]map[string]any, 0)
  49. err = json.Unmarshal([]byte(resp.Results), &tableRows)
  50. if err != nil {
  51. retErr = err
  52. return nil, retErr
  53. }
  54. return tableRows, nil
  55. }
  56. func (tx *Transaction) ExecuteSql(name string, executeParams map[string]any) ([]map[string]any, error) {
  57. var retErr error
  58. defer func() {
  59. if retErr != nil {
  60. innerErr := tx.stream.CloseSend()
  61. if innerErr != nil {
  62. panic(innerErr)
  63. }
  64. }
  65. }()
  66. executeParamsJsonBytes, err := json.Marshal(executeParams)
  67. if err != nil {
  68. retErr = err
  69. return nil, retErr
  70. }
  71. err = tx.stream.Send(&request.TransactionOperation{
  72. Request: &request.TransactionOperation_ExecuteSqlRequest{
  73. ExecuteSqlRequest: &request.ExecuteSqlRequest{
  74. Name: name,
  75. ExecuteParams: string(executeParamsJsonBytes),
  76. },
  77. },
  78. })
  79. if err != nil {
  80. retErr = err
  81. return nil, retErr
  82. }
  83. resp, err := tx.stream.Recv()
  84. if err != nil {
  85. retErr = err
  86. return nil, retErr
  87. }
  88. if !resp.Success {
  89. retErr = errors.New(resp.Msg)
  90. return nil, retErr
  91. }
  92. tableRows := make([]map[string]any, 0)
  93. err = json.Unmarshal([]byte(resp.Results), &tableRows)
  94. if err != nil {
  95. retErr = err
  96. return nil, retErr
  97. }
  98. return tableRows, nil
  99. }