operations.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. package db_operations
  2. import (
  3. "database/sql"
  4. "git.sxidc.com/service-supports/ds-sdk/db_operations/dberr"
  5. "git.sxidc.com/service-supports/fslog"
  6. "github.com/mitchellh/mapstructure"
  7. "gorm.io/gorm"
  8. "strings"
  9. "time"
  10. )
  11. type TransactionOperations struct {
  12. Operations
  13. }
  14. func (op *TransactionOperations) RollbackTransaction() {
  15. defer func() {
  16. op.processDB = op.initDB
  17. }()
  18. op.processDB.Rollback()
  19. }
  20. func (op *TransactionOperations) CommitTransaction() {
  21. defer func() {
  22. op.processDB = op.initDB
  23. }()
  24. op.processDB.Commit()
  25. }
  26. type Operations struct {
  27. initDB *gorm.DB
  28. processDB *gorm.DB
  29. stopPingChan chan any
  30. }
  31. func NewOperationsFromMap(configMap map[string]any) (*Operations, error) {
  32. dbConfig := new(DBConfig)
  33. err := mapstructure.Decode(configMap, dbConfig)
  34. if err != nil {
  35. return nil, err
  36. }
  37. return NewOperations(dbConfig)
  38. }
  39. func NewOperations(dbConfig *DBConfig) (*Operations, error) {
  40. gormDB, err := newGormDB(dbConfig)
  41. if err != nil {
  42. return nil, err
  43. }
  44. sqlDB, err := gormDB.DB()
  45. if err != nil {
  46. return nil, err
  47. }
  48. if dbConfig.MaxConnections == 0 {
  49. dbConfig.MaxConnections = 50
  50. }
  51. if dbConfig.MaxIdleConnections == 0 {
  52. dbConfig.MaxIdleConnections = 10
  53. }
  54. sqlDB.SetMaxOpenConns(dbConfig.MaxConnections)
  55. sqlDB.SetMaxIdleConns(dbConfig.MaxIdleConnections)
  56. op := &Operations{
  57. initDB: gormDB,
  58. processDB: gormDB,
  59. stopPingChan: make(chan any),
  60. }
  61. op.startBeatHeart(sqlDB)
  62. return op, nil
  63. }
  64. func DestroyOperation(op *Operations) error {
  65. if op == nil {
  66. return nil
  67. }
  68. if op.initDB == nil {
  69. return nil
  70. }
  71. op.stopBeatHeart()
  72. return destroyGormDB(op.initDB)
  73. }
  74. func (op *Operations) startBeatHeart(sqlDB *sql.DB) {
  75. go func() {
  76. pingTicker := time.NewTicker(time.Minute * 1)
  77. defer pingTicker.Stop()
  78. for {
  79. select {
  80. case <-op.stopPingChan:
  81. return
  82. case <-pingTicker.C:
  83. err := sqlDB.Ping()
  84. if err != nil {
  85. fslog.Error(err)
  86. }
  87. }
  88. }
  89. }()
  90. }
  91. func (op *Operations) stopBeatHeart() {
  92. if op.stopPingChan != nil {
  93. close(op.stopPingChan)
  94. op.stopPingChan = nil
  95. }
  96. }
  97. func (op *Operations) BeginTransaction() TransactionDBOperations {
  98. tx := op.initDB.Begin()
  99. return &TransactionOperations{
  100. Operations{
  101. initDB: tx,
  102. processDB: tx,
  103. },
  104. }
  105. }
  106. func (op *Operations) NewSession() DBOperations {
  107. return &Operations{
  108. initDB: op.initDB,
  109. processDB: op.initDB,
  110. }
  111. }
  112. func (op *Operations) Table(name string, args ...any) DBOperations {
  113. op.processDB = op.initDB.Table(name, args...)
  114. return op
  115. }
  116. func (op *Operations) Raw(sql string, values ...any) DBOperations {
  117. op.processDB = op.processDB.Raw(sql, values...)
  118. return op
  119. }
  120. func (op *Operations) Select(query string, args ...any) DBOperations {
  121. op.processDB = op.processDB.Select(query, args...)
  122. return op
  123. }
  124. func (op *Operations) Where(query string, args ...any) DBOperations {
  125. op.processDB = op.processDB.Where(query, args...)
  126. return op
  127. }
  128. func (op *Operations) Or(query string, args ...any) DBOperations {
  129. op.processDB = op.processDB.Or(query, args...)
  130. return op
  131. }
  132. func (op *Operations) GroupBy(groupBy string) DBOperations {
  133. op.processDB = op.processDB.Group(groupBy)
  134. return op
  135. }
  136. func (op *Operations) OrderBy(orderBy string) DBOperations {
  137. op.processDB = op.processDB.Order(orderBy)
  138. return op
  139. }
  140. func (op *Operations) Joins(query string, args ...any) DBOperations {
  141. op.processDB = op.processDB.Joins(query, args...)
  142. return op
  143. }
  144. func (op *Operations) Having(query string, args ...any) DBOperations {
  145. op.processDB = op.processDB.Having(query, args...)
  146. return op
  147. }
  148. func (op *Operations) Paging(pageNo int, pageSize int) DBOperations {
  149. if pageNo != 0 && pageSize != 0 {
  150. offset := -1
  151. if pageNo == -1 || pageSize == -1 {
  152. offset = -1
  153. pageSize = -1
  154. } else {
  155. offset = (pageNo - 1) * pageSize
  156. }
  157. op.processDB = op.processDB.Offset(offset).Limit(pageSize)
  158. }
  159. return op
  160. }
  161. func (op *Operations) Create(tableRow map[string]any) error {
  162. err := op.processDB.Create(tableRow).Error
  163. if err != nil {
  164. if strings.Contains(err.Error(), "SQLSTATE 23505") {
  165. return dberr.ErrDBRecordHasExist
  166. }
  167. return err
  168. }
  169. return nil
  170. }
  171. func (op *Operations) CreateBatch(tableRows []map[string]any) error {
  172. err := op.processDB.Create(tableRows).Error
  173. if err != nil {
  174. if strings.Contains(err.Error(), "SQLSTATE 23505") {
  175. return dberr.ErrDBRecordHasExist
  176. }
  177. return err
  178. }
  179. return nil
  180. }
  181. func (op *Operations) Delete() error {
  182. return op.processDB.Delete(make(map[string]any)).Error
  183. }
  184. func (op *Operations) Updates(updateData map[string]any) error {
  185. err := op.processDB.Updates(updateData).Error
  186. if err != nil {
  187. if strings.Contains(err.Error(), "SQLSTATE 23505") {
  188. return dberr.ErrDBRecordHasExist
  189. }
  190. return err
  191. }
  192. return nil
  193. }
  194. func (op *Operations) UpdatesWithRowsAffected(updateData map[string]any) (int64, error) {
  195. op.processDB = op.processDB.Updates(updateData)
  196. if op.processDB.Error != nil {
  197. return 0, op.processDB.Error
  198. }
  199. return op.processDB.RowsAffected, nil
  200. }
  201. func (op *Operations) Rows(pageNo int, pageSize int) ([]map[string]any, error) {
  202. if pageNo != 0 && pageSize != 0 {
  203. offset := (pageNo - 1) * pageSize
  204. op.processDB = op.processDB.Offset(offset).Limit(pageSize)
  205. }
  206. defer func() {
  207. op.processDB = op.processDB.Offset(-1).Limit(-1)
  208. }()
  209. valueMaps := make([]map[string]any, 0)
  210. err := op.processDB.Scan(&valueMaps).Error
  211. if err != nil {
  212. return nil, err
  213. }
  214. return valueMaps, nil
  215. }
  216. func (op *Operations) Row() (map[string]any, error) {
  217. valueMap := make(map[string]any)
  218. err := op.processDB.Scan(&valueMap).Error
  219. if err != nil {
  220. return nil, err
  221. }
  222. if valueMap == nil || len(valueMap) == 0 {
  223. return nil, dberr.ErrDBRecordNotExist
  224. }
  225. return valueMap, nil
  226. }
  227. func (op *Operations) Count(count *int64) error {
  228. return op.processDB.Count(count).Error
  229. }