transaction.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. package dpsv1
  2. import (
  3. "git.sxidc.com/service-supports/dps-sdk/client"
  4. "git.sxidc.com/service-supports/dps-sdk/pb/v1"
  5. "git.sxidc.com/service-supports/dps-sdk/pb/v1/request"
  6. )
  7. type Transaction struct {
  8. stream v1.CommandService_TransactionClient
  9. client *Client
  10. }
  11. func (tx *Transaction) InsertTx(req *client.InsertRequest) (string, error) {
  12. tx.client.transactionMutex.Lock()
  13. defer tx.client.transactionMutex.Unlock()
  14. if tx.client.conn == nil {
  15. return "", nil
  16. }
  17. var err error
  18. defer func() {
  19. if err != nil {
  20. innerErr := tx.stream.CloseSend()
  21. if innerErr != nil {
  22. panic(innerErr)
  23. }
  24. }
  25. }()
  26. reqTableRow, err := tx.client.formTableRow(req.TableRow)
  27. if err != nil {
  28. return "", err
  29. }
  30. err = tx.stream.Send(&request.TransactionOperation{
  31. Request: &request.TransactionOperation_InsertTxRequest{
  32. InsertTxRequest: &request.InsertTxRequest{
  33. TablePrefixWithSchema: req.TablePrefixWithSchema,
  34. Version: req.Version,
  35. KeyColumns: req.KeyColumns,
  36. TableRow: reqTableRow,
  37. UserID: req.UserID,
  38. },
  39. }})
  40. if err != nil {
  41. return "", err
  42. }
  43. txResponse, err := tx.stream.Recv()
  44. if err != nil {
  45. return "", err
  46. }
  47. return txResponse.Statement, nil
  48. }
  49. func (tx *Transaction) InsertBatchTx(req *client.InsertBatchRequest) (string, error) {
  50. tx.client.transactionMutex.Lock()
  51. defer tx.client.transactionMutex.Unlock()
  52. if tx.client.conn == nil {
  53. return "", nil
  54. }
  55. var err error
  56. defer func() {
  57. if err != nil {
  58. innerErr := tx.stream.CloseSend()
  59. if innerErr != nil {
  60. panic(innerErr)
  61. }
  62. }
  63. }()
  64. tableRowItems := make([]*request.InsertTableRowItem, 0)
  65. for _, reqTableItem := range req.Items {
  66. tableRows := make([]*request.TableRow, 0)
  67. for _, reqTableRow := range reqTableItem.TableRows {
  68. tableRow, err := tx.client.formTableRow(reqTableRow)
  69. if err != nil {
  70. return "", err
  71. }
  72. tableRows = append(tableRows, tableRow)
  73. }
  74. tableRowItems = append(tableRowItems, &request.InsertTableRowItem{
  75. TablePrefixWithSchema: reqTableItem.TablePrefixWithSchema,
  76. Version: reqTableItem.Version,
  77. KeyColumns: reqTableItem.KeyColumns,
  78. TableRows: tableRows,
  79. })
  80. }
  81. err = tx.stream.Send(&request.TransactionOperation{
  82. Request: &request.TransactionOperation_InsertBatchTxRequest{
  83. InsertBatchTxRequest: &request.InsertBatchTxRequest{
  84. Items: tableRowItems,
  85. UserID: req.UserID,
  86. },
  87. }})
  88. if err != nil {
  89. return "", err
  90. }
  91. txResponse, err := tx.stream.Recv()
  92. if err != nil {
  93. return "", err
  94. }
  95. return txResponse.Statement, nil
  96. }
  97. func (tx *Transaction) DeleteTx(req *client.DeleteRequest) (string, error) {
  98. tx.client.transactionMutex.Lock()
  99. defer tx.client.transactionMutex.Unlock()
  100. if tx.client.conn == nil {
  101. return "", nil
  102. }
  103. var err error
  104. defer func() {
  105. if err != nil {
  106. innerErr := tx.stream.CloseSend()
  107. if innerErr != nil {
  108. panic(innerErr)
  109. }
  110. }
  111. }()
  112. err = tx.stream.Send(&request.TransactionOperation{
  113. Request: &request.TransactionOperation_DeleteTxRequest{
  114. DeleteTxRequest: &request.DeleteTxRequest{
  115. TablePrefixWithSchema: req.TablePrefixWithSchema,
  116. Version: req.Version,
  117. KeyValues: req.KeyValues,
  118. UserID: req.UserID,
  119. },
  120. }})
  121. if err != nil {
  122. return "", err
  123. }
  124. txResponse, err := tx.stream.Recv()
  125. if err != nil {
  126. return "", err
  127. }
  128. return txResponse.Statement, nil
  129. }
  130. func (tx *Transaction) DeleteBatchTx(req *client.DeleteBatchRequest) (string, error) {
  131. tx.client.transactionMutex.Lock()
  132. defer tx.client.transactionMutex.Unlock()
  133. if tx.client.conn == nil {
  134. return "", nil
  135. }
  136. var err error
  137. defer func() {
  138. if err != nil {
  139. innerErr := tx.stream.CloseSend()
  140. if innerErr != nil {
  141. panic(innerErr)
  142. }
  143. }
  144. }()
  145. tableRowItems := make([]*request.DeleteTableRowItem, 0)
  146. for _, reqTableItem := range req.Items {
  147. items := make([]*request.DeleteItem, 0)
  148. for _, reqKeyValues := range reqTableItem.KeyValues {
  149. items = append(items, &request.DeleteItem{
  150. KeyValues: reqKeyValues,
  151. })
  152. }
  153. tableRowItems = append(tableRowItems, &request.DeleteTableRowItem{
  154. TablePrefixWithSchema: reqTableItem.TablePrefixWithSchema,
  155. Version: reqTableItem.Version,
  156. Items: items,
  157. })
  158. }
  159. err = tx.stream.Send(&request.TransactionOperation{
  160. Request: &request.TransactionOperation_DeleteBatchTxRequest{
  161. DeleteBatchTxRequest: &request.DeleteBatchTxRequest{
  162. Items: tableRowItems,
  163. UserID: req.UserID,
  164. },
  165. }})
  166. if err != nil {
  167. return "", err
  168. }
  169. txResponse, err := tx.stream.Recv()
  170. if err != nil {
  171. return "", err
  172. }
  173. return txResponse.Statement, nil
  174. }
  175. func (tx *Transaction) UpdateTx(req *client.UpdateRequest) (string, error) {
  176. tx.client.transactionMutex.Lock()
  177. defer tx.client.transactionMutex.Unlock()
  178. if tx.client.conn == nil {
  179. return "", nil
  180. }
  181. var err error
  182. defer func() {
  183. if err != nil {
  184. innerErr := tx.stream.CloseSend()
  185. if innerErr != nil {
  186. panic(innerErr)
  187. }
  188. }
  189. }()
  190. reqNewTableRowMap, err := tx.client.formTableRow(req.NewTableRow)
  191. if err != nil {
  192. return "", err
  193. }
  194. err = tx.stream.Send(&request.TransactionOperation{
  195. Request: &request.TransactionOperation_UpdateTxRequest{
  196. UpdateTxRequest: &request.UpdateTxRequest{
  197. TablePrefixWithSchema: req.TablePrefixWithSchema,
  198. Version: req.Version,
  199. KeyValues: req.KeyValues,
  200. NewTableRow: reqNewTableRowMap,
  201. UserID: req.UserID,
  202. },
  203. }})
  204. if err != nil {
  205. return "", err
  206. }
  207. txResponse, err := tx.stream.Recv()
  208. if err != nil {
  209. return "", err
  210. }
  211. return txResponse.Statement, nil
  212. }