operations.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. package db_operations
  2. import (
  3. "database/sql"
  4. "git.sxidc.com/service-supports/ds-sdk/db_operations/dberr"
  5. "git.sxidc.com/service-supports/fslog"
  6. "github.com/mitchellh/mapstructure"
  7. "gorm.io/gorm"
  8. "strings"
  9. "time"
  10. )
  11. type Operations struct {
  12. initDB *gorm.DB
  13. processDB *gorm.DB
  14. stopPingChan chan any
  15. }
  16. func NewOperationsFromMap(configMap map[string]any) (*Operations, error) {
  17. dbConfig := new(DBConfig)
  18. err := mapstructure.Decode(configMap, dbConfig)
  19. if err != nil {
  20. return nil, err
  21. }
  22. return NewOperations(dbConfig)
  23. }
  24. func NewOperations(dbConfig *DBConfig) (*Operations, error) {
  25. gormDB, err := newGormDB(dbConfig)
  26. if err != nil {
  27. return nil, err
  28. }
  29. sqlDB, err := gormDB.DB()
  30. if err != nil {
  31. return nil, err
  32. }
  33. if dbConfig.MaxConnections == 0 {
  34. dbConfig.MaxConnections = 50
  35. }
  36. if dbConfig.MaxIdleConnections == 0 {
  37. dbConfig.MaxIdleConnections = 10
  38. }
  39. sqlDB.SetMaxOpenConns(dbConfig.MaxConnections)
  40. sqlDB.SetMaxIdleConns(dbConfig.MaxIdleConnections)
  41. op := &Operations{
  42. initDB: gormDB,
  43. processDB: gormDB,
  44. stopPingChan: make(chan any),
  45. }
  46. op.startBeatHeart(sqlDB)
  47. return op, nil
  48. }
  49. func DestroyOperation(op *Operations) error {
  50. if op == nil {
  51. return nil
  52. }
  53. if op.initDB == nil {
  54. return nil
  55. }
  56. op.stopBeatHeart()
  57. return destroyGormDB(op.initDB)
  58. }
  59. func (op *Operations) startBeatHeart(sqlDB *sql.DB) {
  60. go func() {
  61. pingTicker := time.NewTicker(time.Minute * 1)
  62. defer pingTicker.Stop()
  63. for {
  64. select {
  65. case <-op.stopPingChan:
  66. return
  67. case <-pingTicker.C:
  68. err := sqlDB.Ping()
  69. if err != nil {
  70. fslog.Error(err)
  71. }
  72. }
  73. }
  74. }()
  75. }
  76. func (op *Operations) stopBeatHeart() {
  77. if op.stopPingChan != nil {
  78. close(op.stopPingChan)
  79. op.stopPingChan = nil
  80. }
  81. }
  82. func (op *Operations) BeginTransaction() TransactionDBOperations {
  83. tx := op.initDB.Begin()
  84. return &TransactionOperations{
  85. Operations{
  86. initDB: tx,
  87. processDB: tx,
  88. },
  89. }
  90. }
  91. func (op *Operations) NewSession() DBOperations {
  92. return &Operations{
  93. initDB: op.initDB,
  94. processDB: op.initDB,
  95. }
  96. }
  97. func (op *Operations) Table(name string, args ...any) DBOperations {
  98. op.processDB = op.initDB.Table(name, args...)
  99. return op
  100. }
  101. func (op *Operations) Raw(sql string, values ...any) DBOperations {
  102. op.processDB = op.processDB.Raw(sql, values...)
  103. return op
  104. }
  105. func (op *Operations) Select(query string, args ...any) DBOperations {
  106. op.processDB = op.processDB.Select(query, args...)
  107. return op
  108. }
  109. func (op *Operations) Where(query string, args ...any) DBOperations {
  110. op.processDB = op.processDB.Where(query, args...)
  111. return op
  112. }
  113. func (op *Operations) Or(query string, args ...any) DBOperations {
  114. op.processDB = op.processDB.Or(query, args...)
  115. return op
  116. }
  117. func (op *Operations) GroupBy(groupBy string) DBOperations {
  118. op.processDB = op.processDB.Group(groupBy)
  119. return op
  120. }
  121. func (op *Operations) OrderBy(orderBy string) DBOperations {
  122. op.processDB = op.processDB.Order(orderBy)
  123. return op
  124. }
  125. func (op *Operations) Joins(query string, args ...any) DBOperations {
  126. op.processDB = op.processDB.Joins(query, args...)
  127. return op
  128. }
  129. func (op *Operations) Having(query string, args ...any) DBOperations {
  130. op.processDB = op.processDB.Having(query, args...)
  131. return op
  132. }
  133. func (op *Operations) Paging(pageNo int, pageSize int) DBOperations {
  134. if pageNo != 0 && pageSize != 0 {
  135. offset := -1
  136. if pageNo == -1 || pageSize == -1 {
  137. offset = -1
  138. pageSize = -1
  139. } else {
  140. offset = (pageNo - 1) * pageSize
  141. }
  142. op.processDB = op.processDB.Offset(offset).Limit(pageSize)
  143. }
  144. return op
  145. }
  146. func (op *Operations) Create(tableRow map[string]any) error {
  147. err := op.processDB.Create(tableRow).Error
  148. if err != nil {
  149. if strings.Contains(err.Error(), "SQLSTATE 23505") {
  150. return dberr.ErrDBRecordHasExist
  151. }
  152. return err
  153. }
  154. return nil
  155. }
  156. func (op *Operations) CreateBatch(tableRows []map[string]any) error {
  157. err := op.processDB.Create(tableRows).Error
  158. if err != nil {
  159. if strings.Contains(err.Error(), "SQLSTATE 23505") {
  160. return dberr.ErrDBRecordHasExist
  161. }
  162. return err
  163. }
  164. return nil
  165. }
  166. func (op *Operations) Delete() error {
  167. return op.processDB.Delete(make(map[string]any)).Error
  168. }
  169. func (op *Operations) Updates(newTableRow map[string]any) error {
  170. err := op.processDB.Updates(newTableRow).Error
  171. if err != nil {
  172. if strings.Contains(err.Error(), "SQLSTATE 23505") {
  173. return dberr.ErrDBRecordHasExist
  174. }
  175. return err
  176. }
  177. return nil
  178. }
  179. func (op *Operations) UpdatesWithRowsAffected(newTableRow map[string]any) (int64, error) {
  180. op.processDB = op.processDB.Updates(newTableRow)
  181. if op.processDB.Error != nil {
  182. return 0, op.processDB.Error
  183. }
  184. return op.processDB.RowsAffected, nil
  185. }
  186. func (op *Operations) Rows(pageNo int, pageSize int) ([]map[string]any, error) {
  187. if pageNo != 0 && pageSize != 0 {
  188. offset := (pageNo - 1) * pageSize
  189. op.processDB = op.processDB.Offset(offset).Limit(pageSize)
  190. }
  191. defer func() {
  192. op.processDB = op.processDB.Offset(-1).Limit(-1)
  193. }()
  194. valueMaps := make([]map[string]any, 0)
  195. err := op.processDB.Scan(&valueMaps).Error
  196. if err != nil {
  197. return nil, err
  198. }
  199. return valueMaps, nil
  200. }
  201. func (op *Operations) Row() (map[string]any, error) {
  202. valueMap := make(map[string]any)
  203. err := op.processDB.Scan(&valueMap).Error
  204. if err != nil {
  205. return nil, err
  206. }
  207. if valueMap == nil || len(valueMap) == 0 {
  208. return nil, dberr.ErrDBRecordNotExist
  209. }
  210. return valueMap, nil
  211. }
  212. func (op *Operations) Count(count *int64) error {
  213. return op.processDB.Count(count).Error
  214. }