transaction.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. package dpsv1
  2. import (
  3. "git.sxidc.com/service-supports/dps-sdk/pb/v1"
  4. "git.sxidc.com/service-supports/dps-sdk/pb/v1/request"
  5. "git.sxidc.com/service-supports/dps-sdk/ports"
  6. "io"
  7. )
  8. type Transaction struct {
  9. stream v1.CommandService_TransactionClient
  10. client *Client
  11. }
  12. func (tx *Transaction) InsertTx(req *ports.InsertRequest) (string, error) {
  13. var err error
  14. defer func() {
  15. if err != nil {
  16. innerErr := tx.stream.CloseSend()
  17. if innerErr != nil {
  18. panic(innerErr)
  19. }
  20. }
  21. }()
  22. reqTableRow, err := tx.client.formTableRow(req.TableRow)
  23. if err != nil {
  24. return "", err
  25. }
  26. err = tx.stream.Send(&request.TransactionOperation{
  27. Request: &request.TransactionOperation_InsertRequest{
  28. InsertRequest: &request.InsertRequest{
  29. DatabaseID: req.DatabaseID,
  30. TablePrefixWithSchema: req.TablePrefixWithSchema,
  31. Version: req.Version,
  32. KeyColumns: req.KeyColumns,
  33. TableRow: reqTableRow,
  34. UserID: req.UserID,
  35. },
  36. }})
  37. if err != nil {
  38. return "", err
  39. }
  40. txResponse, err := tx.stream.Recv()
  41. if err != nil {
  42. return "", err
  43. }
  44. return txResponse.Statement, nil
  45. }
  46. func (tx *Transaction) InsertBatchTx(req *ports.InsertBatchRequest) (string, error) {
  47. var err error
  48. defer func() {
  49. if err != nil {
  50. innerErr := tx.stream.CloseSend()
  51. if innerErr != nil {
  52. panic(innerErr)
  53. }
  54. }
  55. }()
  56. tableItems := make([]*request.InsertTableItem, 0)
  57. for _, reqTableItem := range req.Items {
  58. items := make([]*request.InsertItem, 0)
  59. for _, reqItem := range reqTableItem.Items {
  60. tableRow, err := tx.client.formTableRow(reqItem.TableRow)
  61. if err != nil {
  62. return "", err
  63. }
  64. items = append(items, &request.InsertItem{
  65. KeyColumns: reqItem.KeyColumns,
  66. TableRow: tableRow,
  67. })
  68. }
  69. tableItems = append(tableItems, &request.InsertTableItem{
  70. TablePrefixWithSchema: reqTableItem.TablePrefixWithSchema,
  71. Version: reqTableItem.Version,
  72. Items: items,
  73. })
  74. }
  75. err = tx.stream.Send(&request.TransactionOperation{
  76. Request: &request.TransactionOperation_InsertBatchRequest{
  77. InsertBatchRequest: &request.InsertBatchRequest{
  78. DatabaseID: req.DatabaseID,
  79. Items: tableItems,
  80. UserID: req.UserID,
  81. },
  82. }})
  83. if err != nil {
  84. return "", err
  85. }
  86. txResponse, err := tx.stream.Recv()
  87. if err != nil {
  88. return "", err
  89. }
  90. return txResponse.Statement, nil
  91. }
  92. func (tx *Transaction) DeleteTx(req *ports.DeleteRequest) (string, error) {
  93. var err error
  94. defer func() {
  95. if err != nil {
  96. innerErr := tx.stream.CloseSend()
  97. if innerErr != nil {
  98. panic(innerErr)
  99. }
  100. }
  101. }()
  102. err = tx.stream.Send(&request.TransactionOperation{
  103. Request: &request.TransactionOperation_DeleteRequest{
  104. DeleteRequest: &request.DeleteRequest{
  105. DatabaseID: req.DatabaseID,
  106. TablePrefixWithSchema: req.TablePrefixWithSchema,
  107. Version: req.Version,
  108. KeyValues: req.KeyValues,
  109. UserID: req.UserID,
  110. },
  111. }})
  112. if err != nil {
  113. return "", err
  114. }
  115. txResponse, err := tx.stream.Recv()
  116. if err != nil {
  117. return "", err
  118. }
  119. return txResponse.Statement, nil
  120. }
  121. func (tx *Transaction) DeleteBatchTx(req *ports.DeleteBatchRequest) (string, error) {
  122. var err error
  123. defer func() {
  124. if err != nil {
  125. innerErr := tx.stream.CloseSend()
  126. if innerErr != nil {
  127. panic(innerErr)
  128. }
  129. }
  130. }()
  131. tableItems := make([]*request.DeleteTableItem, 0)
  132. for _, reqTableItem := range req.Items {
  133. items := make([]*request.DeleteItem, 0)
  134. for _, reqItem := range reqTableItem.Items {
  135. items = append(items, &request.DeleteItem{
  136. KeyValues: reqItem.KeyValues,
  137. })
  138. }
  139. tableItems = append(tableItems, &request.DeleteTableItem{
  140. TablePrefixWithSchema: reqTableItem.TablePrefixWithSchema,
  141. Version: reqTableItem.Version,
  142. Items: items,
  143. })
  144. }
  145. err = tx.stream.Send(&request.TransactionOperation{
  146. Request: &request.TransactionOperation_DeleteBatchRequest{
  147. DeleteBatchRequest: &request.DeleteBatchRequest{
  148. DatabaseID: req.DatabaseID,
  149. Items: tableItems,
  150. UserID: req.UserID,
  151. },
  152. }})
  153. if err != nil {
  154. return "", err
  155. }
  156. txResponse, err := tx.stream.Recv()
  157. if err != nil {
  158. return "", err
  159. }
  160. return txResponse.Statement, nil
  161. }
  162. func (tx *Transaction) UpdateTx(req *ports.UpdateRequest) (string, error) {
  163. var err error
  164. defer func() {
  165. if err != nil {
  166. innerErr := tx.stream.CloseSend()
  167. if innerErr != nil {
  168. panic(innerErr)
  169. }
  170. }
  171. }()
  172. reqNewTableRowMap, err := tx.client.formTableRow(req.NewTableRow)
  173. if err != nil {
  174. return "", err
  175. }
  176. err = tx.stream.Send(&request.TransactionOperation{
  177. Request: &request.TransactionOperation_UpdateRequest{
  178. UpdateRequest: &request.UpdateRequest{
  179. DatabaseID: req.DatabaseID,
  180. TablePrefixWithSchema: req.TablePrefixWithSchema,
  181. Version: req.Version,
  182. KeyValues: req.KeyValues,
  183. NewTableRow: reqNewTableRowMap,
  184. UserID: req.UserID,
  185. },
  186. }})
  187. if err != nil {
  188. return "", err
  189. }
  190. txResponse, err := tx.stream.Recv()
  191. if err != nil {
  192. return "", err
  193. }
  194. return txResponse.Statement, nil
  195. }
  196. func (tx *Transaction) End() error {
  197. var err error
  198. defer func() {
  199. innerErr := tx.stream.CloseSend()
  200. if innerErr != nil {
  201. panic(innerErr)
  202. }
  203. }()
  204. err = tx.stream.Send(&request.TransactionOperation{
  205. Request: &request.TransactionOperation_TransactionEndRequest{
  206. TransactionEndRequest: &request.TransactionEndRequest{},
  207. }})
  208. if err != nil {
  209. return err
  210. }
  211. _, err = tx.stream.Recv()
  212. if err != nil && err != io.EOF {
  213. return err
  214. }
  215. return nil
  216. }