operations.go 6.1 KB


  1. package operations
  2. import (
  3. "database/sql"
  4. "git.sxidc.com/service-supports/fslog"
  5. "gorm.io/gorm"
  6. "strings"
  7. "time"
  8. )
  9. type Operations struct {
  10. initDB *gorm.DB
  11. processDB *gorm.DB
  12. stopPingChan chan any
  13. }
  14. func NewOperations(dbConfig *DBConfig) (*Operations, error) {
  15. gormDB, err := newGormDB(dbConfig)
  16. if err != nil {
  17. return nil, err
  18. }
  19. sqlDB, err := gormDB.DB()
  20. if err != nil {
  21. return nil, err
  22. }
  23. if dbConfig.MaxConnections == 0 {
  24. dbConfig.MaxConnections = 50
  25. }
  26. if dbConfig.MaxIdleConnections == 0 {
  27. dbConfig.MaxIdleConnections = 10
  28. }
  29. sqlDB.SetMaxOpenConns(dbConfig.MaxConnections)
  30. sqlDB.SetMaxIdleConns(dbConfig.MaxIdleConnections)
  31. op := &Operations{
  32. initDB: gormDB,
  33. processDB: gormDB,
  34. stopPingChan: make(chan any),
  35. }
  36. op.startBeatHeart(sqlDB)
  37. return op, nil
  38. }
  39. func DestroyOperation(op *Operations) error {
  40. if op == nil {
  41. return nil
  42. }
  43. if op.initDB == nil {
  44. return nil
  45. }
  46. op.stopBeatHeart()
  47. return destroyGormDB(op.initDB)
  48. }
  49. func (op *Operations) startBeatHeart(sqlDB *sql.DB) {
  50. go func() {
  51. pingTicker := time.NewTicker(time.Minute * 1)
  52. defer pingTicker.Stop()
  53. for {
  54. select {
  55. case <-op.stopPingChan:
  56. return
  57. case <-pingTicker.C:
  58. err := sqlDB.Ping()
  59. if err != nil {
  60. fslog.Error(err)
  61. }
  62. }
  63. }
  64. }()
  65. }
  66. func (op *Operations) stopBeatHeart() {
  67. if op.stopPingChan != nil {
  68. close(op.stopPingChan)
  69. op.stopPingChan = nil
  70. }
  71. }
  72. func (op *Operations) BeginTransaction() TransactionDBOperations {
  73. tx := op.initDB.Begin()
  74. return &TransactionOperations{
  75. Operations{
  76. initDB: tx,
  77. processDB: tx,
  78. },
  79. }
  80. }
  81. func (op *Operations) NewSession() DBOperations {
  82. return &Operations{
  83. initDB: op.initDB,
  84. processDB: op.initDB,
  85. }
  86. }
  87. func (op *Operations) AutoMigrate(tables ...Table) error {
  88. tx := op.processDB.Begin()
  89. for _, table := range tables {
  90. dbModel, err := table.ToDBModel()
  91. if err != nil {
  92. tx.Rollback()
  93. return err
  94. }
  95. err = tx.Table(table.TableName).AutoMigrate(dbModel)
  96. if err != nil {
  97. tx.Rollback()
  98. return err
  99. }
  100. }
  101. tx.Commit()
  102. return nil
  103. }
  104. func (op *Operations) Table(name string, args ...any) DBOperations {
  105. return &Operations{
  106. initDB: op.initDB,
  107. processDB: op.initDB.Table(name, args...),
  108. }
  109. }
  110. func (op *Operations) Raw(sql string, values ...any) DBOperations {
  111. op.processDB = op.processDB.Raw(sql, values...)
  112. return op
  113. }
  114. func (op *Operations) Select(query string, args ...any) DBOperations {
  115. op.processDB = op.processDB.Select(query, args...)
  116. return op
  117. }
  118. func (op *Operations) Joins(query string, args ...any) DBOperations {
  119. op.processDB = op.processDB.Joins(query, args...)
  120. return op
  121. }
  122. func (op *Operations) Where(conditions *Conditions) DBOperations {
  123. op.processDB = conditions.where(op.processDB)
  124. return op
  125. }
  126. func (op *Operations) Or(conditions *Conditions) DBOperations {
  127. op.processDB = conditions.or(op.processDB)
  128. return op
  129. }
  130. func (op *Operations) Having(conditions *Conditions) DBOperations {
  131. op.processDB = conditions.having(op.processDB)
  132. return op
  133. }
  134. func (op *Operations) GroupBy(groupBy string) DBOperations {
  135. op.processDB = op.processDB.Group(groupBy)
  136. return op
  137. }
  138. func (op *Operations) OrderBy(orderBy string) DBOperations {
  139. op.processDB = op.processDB.Order(orderBy)
  140. return op
  141. }
  142. func (op *Operations) Paging(pageNo int, pageSize int) DBOperations {
  143. if pageNo != 0 && pageSize != 0 {
  144. offset := -1
  145. if pageNo == -1 || pageSize == -1 {
  146. offset = -1
  147. pageSize = -1
  148. } else {
  149. offset = (pageNo - 1) * pageSize
  150. }
  151. op.processDB = op.processDB.Offset(offset).Limit(pageSize)
  152. }
  153. return op
  154. }
  155. func (op *Operations) Create(tableRow *TableRow) error {
  156. err := op.processDB.Create(tableRow.ToMap()).Error
  157. if err != nil {
  158. if strings.Contains(err.Error(), "SQLSTATE 23505") {
  159. return ErrDBRecordHasExist
  160. }
  161. return err
  162. }
  163. return nil
  164. }
  165. func (op *Operations) CreateBatch(tableRows []TableRow) error {
  166. tableRowMaps := make([]map[string]any, 0)
  167. for _, tableRow := range tableRows {
  168. tableRowMaps = append(tableRowMaps, tableRow.ToMap())
  169. }
  170. err := op.processDB.Create(tableRowMaps).Error
  171. if err != nil {
  172. if strings.Contains(err.Error(), "SQLSTATE 23505") {
  173. return ErrDBRecordHasExist
  174. }
  175. return err
  176. }
  177. return nil
  178. }
  179. func (op *Operations) Delete() error {
  180. return op.processDB.Delete(make(map[string]any)).Error
  181. }
  182. func (op *Operations) Updates(newTableRow *TableRow) error {
  183. err := op.processDB.Updates(newTableRow.ToMap()).Error
  184. if err != nil {
  185. if strings.Contains(err.Error(), "SQLSTATE 23505") {
  186. return ErrDBRecordHasExist
  187. }
  188. return err
  189. }
  190. return nil
  191. }
  192. func (op *Operations) UpdatesWithRowsAffected(newTableRow *TableRow) (int64, error) {
  193. op.processDB = op.processDB.Updates(newTableRow.ToMap())
  194. if op.processDB.Error != nil {
  195. return 0, op.processDB.Error
  196. }
  197. return op.processDB.RowsAffected, nil
  198. }
  199. func (op *Operations) Rows(pageNo int, pageSize int) ([]TableRow, error) {
  200. if pageNo != 0 && pageSize != 0 {
  201. offset := (pageNo - 1) * pageSize
  202. op.processDB = op.processDB.Offset(offset).Limit(pageSize)
  203. }
  204. defer func() {
  205. op.processDB = op.processDB.Offset(-1).Limit(-1)
  206. }()
  207. tableRowMaps := make([]map[string]any, 0)
  208. err := op.processDB.Scan(&tableRowMaps).Error
  209. if err != nil {
  210. return nil, err
  211. }
  212. tableRows := make([]TableRow, 0)
  213. for _, tableRowMap := range tableRowMaps {
  214. tableRows = append(tableRows, *NewTableRowFromMap(tableRowMap))
  215. }
  216. return tableRows, nil
  217. }
  218. func (op *Operations) Row() (*TableRow, error) {
  219. valueMap := make(map[string]any)
  220. err := op.processDB.Scan(&valueMap).Error
  221. if err != nil {
  222. return nil, err
  223. }
  224. if valueMap == nil || len(valueMap) == 0 {
  225. return nil, ErrDBRecordNotExist
  226. }
  227. return NewTableRowFromMap(valueMap), nil
  228. }
  229. func (op *Operations) Count() (int64, error) {
  230. var count int64
  231. err := op.processDB.Count(&count).Error
  232. if err != nil {
  233. return 0, err
  234. }
  235. return count, nil
  236. }
  237. func (op *Operations) CheckExist() (bool, error) {
  238. var count int64
  239. err := op.processDB.Count(&count).Error
  240. if err != nil {
  241. return false, err
  242. }
  243. return count > 0, nil
  244. }
  245. func (op *Operations) CheckHasOnlyOne() (bool, error) {
  246. var count int64
  247. err := op.processDB.Count(&count).Error
  248. if err != nil {
  249. return false, err
  250. }
  251. return count == 1, nil
  252. }