operations.go 6.0 KB


  1. package operations
  2. import (
  3. "database/sql"
  4. "git.sxidc.com/service-supports/fslog"
  5. "gorm.io/gorm"
  6. "strings"
  7. "time"
  8. )
  9. type Operations struct {
  10. initDB *gorm.DB
  11. processDB *gorm.DB
  12. stopPingChan chan any
  13. }
  14. func NewOperations(dbConfig *DBConfig) (*Operations, error) {
  15. gormDB, err := newGormDB(dbConfig)
  16. if err != nil {
  17. return nil, err
  18. }
  19. sqlDB, err := gormDB.DB()
  20. if err != nil {
  21. return nil, err
  22. }
  23. if dbConfig.MaxConnections == 0 {
  24. dbConfig.MaxConnections = 50
  25. }
  26. if dbConfig.MaxIdleConnections == 0 {
  27. dbConfig.MaxIdleConnections = 10
  28. }
  29. sqlDB.SetMaxOpenConns(dbConfig.MaxConnections)
  30. sqlDB.SetMaxIdleConns(dbConfig.MaxIdleConnections)
  31. op := &Operations{
  32. initDB: gormDB,
  33. processDB: gormDB,
  34. stopPingChan: make(chan any),
  35. }
  36. op.startBeatHeart(sqlDB)
  37. return op, nil
  38. }
  39. func DestroyOperation(op *Operations) error {
  40. if op == nil {
  41. return nil
  42. }
  43. if op.initDB == nil {
  44. return nil
  45. }
  46. op.stopBeatHeart()
  47. return destroyGormDB(op.initDB)
  48. }
  49. func (op *Operations) startBeatHeart(sqlDB *sql.DB) {
  50. go func() {
  51. pingTicker := time.NewTicker(time.Minute * 1)
  52. defer pingTicker.Stop()
  53. for {
  54. select {
  55. case <-op.stopPingChan:
  56. return
  57. case <-pingTicker.C:
  58. err := sqlDB.Ping()
  59. if err != nil {
  60. fslog.Error(err)
  61. }
  62. }
  63. }
  64. }()
  65. }
  66. func (op *Operations) stopBeatHeart() {
  67. if op.stopPingChan != nil {
  68. close(op.stopPingChan)
  69. op.stopPingChan = nil
  70. }
  71. }
  72. func (op *Operations) BeginTransaction() TransactionDBOperations {
  73. tx := op.initDB.Begin()
  74. return &TransactionOperations{
  75. Operations{
  76. initDB: tx,
  77. processDB: tx,
  78. },
  79. }
  80. }
  81. func (op *Operations) NewSession() DBOperations {
  82. return &Operations{
  83. initDB: op.initDB,
  84. processDB: op.initDB,
  85. }
  86. }
  87. func (op *Operations) AutoMigrate(tables ...Table) error {
  88. tx := op.processDB.Begin()
  89. for _, table := range tables {
  90. dbModel, err := table.ToDBModel()
  91. if err != nil {
  92. tx.Rollback()
  93. return err
  94. }
  95. err = tx.Table(table.TableName).AutoMigrate(dbModel)
  96. if err != nil {
  97. tx.Rollback()
  98. return err
  99. }
  100. }
  101. tx.Commit()
  102. return nil
  103. }
  104. func (op *Operations) Table(name string, args ...any) DBOperations {
  105. op.processDB = op.initDB.Table(name, args...)
  106. return op
  107. }
  108. func (op *Operations) Raw(sql string, values ...any) DBOperations {
  109. op.processDB = op.processDB.Raw(sql, values...)
  110. return op
  111. }
  112. func (op *Operations) Select(query string, args ...any) DBOperations {
  113. op.processDB = op.processDB.Select(query, args...)
  114. return op
  115. }
  116. func (op *Operations) Joins(query string, args ...any) DBOperations {
  117. op.processDB = op.processDB.Joins(query, args...)
  118. return op
  119. }
  120. func (op *Operations) Where(conditions *Conditions) DBOperations {
  121. op.processDB = conditions.where(op.processDB)
  122. return op
  123. }
  124. func (op *Operations) Or(conditions *Conditions) DBOperations {
  125. op.processDB = conditions.or(op.processDB)
  126. return op
  127. }
  128. func (op *Operations) Having(conditions *Conditions) DBOperations {
  129. op.processDB = conditions.having(op.processDB)
  130. return op
  131. }
  132. func (op *Operations) GroupBy(groupBy string) DBOperations {
  133. op.processDB = op.processDB.Group(groupBy)
  134. return op
  135. }
  136. func (op *Operations) OrderBy(orderBy string) DBOperations {
  137. op.processDB = op.processDB.Order(orderBy)
  138. return op
  139. }
  140. func (op *Operations) Paging(pageNo int, pageSize int) DBOperations {
  141. if pageNo != 0 && pageSize != 0 {
  142. offset := -1
  143. if pageNo == -1 || pageSize == -1 {
  144. offset = -1
  145. pageSize = -1
  146. } else {
  147. offset = (pageNo - 1) * pageSize
  148. }
  149. op.processDB = op.processDB.Offset(offset).Limit(pageSize)
  150. }
  151. return op
  152. }
  153. func (op *Operations) Create(tableRow *TableRow) error {
  154. err := op.processDB.Create(tableRow.ToMap()).Error
  155. if err != nil {
  156. if strings.Contains(err.Error(), "SQLSTATE 23505") {
  157. return ErrDBRecordHasExist
  158. }
  159. return err
  160. }
  161. return nil
  162. }
  163. func (op *Operations) CreateBatch(tableRows []TableRow) error {
  164. tableRowMaps := make([]map[string]any, 0)
  165. for _, tableRow := range tableRows {
  166. tableRowMaps = append(tableRowMaps, tableRow.ToMap())
  167. }
  168. err := op.processDB.Create(tableRowMaps).Error
  169. if err != nil {
  170. if strings.Contains(err.Error(), "SQLSTATE 23505") {
  171. return ErrDBRecordHasExist
  172. }
  173. return err
  174. }
  175. return nil
  176. }
  177. func (op *Operations) Delete() error {
  178. return op.processDB.Delete(make(map[string]any)).Error
  179. }
  180. func (op *Operations) Updates(newTableRow *TableRow) error {
  181. err := op.processDB.Updates(newTableRow.ToMap()).Error
  182. if err != nil {
  183. if strings.Contains(err.Error(), "SQLSTATE 23505") {
  184. return ErrDBRecordHasExist
  185. }
  186. return err
  187. }
  188. return nil
  189. }
  190. func (op *Operations) UpdatesWithRowsAffected(newTableRow *TableRow) (int64, error) {
  191. op.processDB = op.processDB.Updates(newTableRow.ToMap())
  192. if op.processDB.Error != nil {
  193. return 0, op.processDB.Error
  194. }
  195. return op.processDB.RowsAffected, nil
  196. }
  197. func (op *Operations) Rows(pageNo int, pageSize int) ([]TableRow, error) {
  198. if pageNo != 0 && pageSize != 0 {
  199. offset := (pageNo - 1) * pageSize
  200. op.processDB = op.processDB.Offset(offset).Limit(pageSize)
  201. }
  202. defer func() {
  203. op.processDB = op.processDB.Offset(-1).Limit(-1)
  204. }()
  205. tableRowMaps := make([]map[string]any, 0)
  206. err := op.processDB.Scan(&tableRowMaps).Error
  207. if err != nil {
  208. return nil, err
  209. }
  210. tableRows := make([]TableRow, 0)
  211. for _, tableRowMap := range tableRowMaps {
  212. tableRows = append(tableRows, *NewTableRowFromMap(tableRowMap))
  213. }
  214. return tableRows, nil
  215. }
  216. func (op *Operations) Row() (*TableRow, error) {
  217. valueMap := make(map[string]any)
  218. err := op.processDB.Scan(&valueMap).Error
  219. if err != nil {
  220. return nil, err
  221. }
  222. if valueMap == nil || len(valueMap) == 0 {
  223. return nil, ErrDBRecordNotExist
  224. }
  225. return NewTableRowFromMap(valueMap), nil
  226. }
  227. func (op *Operations) Count(count *int64) error {
  228. return op.processDB.Count(count).Error
  229. }
  230. func (op *Operations) CheckExist() (bool, error) {
  231. var count int64
  232. err := op.processDB.Count(&count).Error
  233. if err != nil {
  234. return false, err
  235. }
  236. return count > 0, nil
  237. }
  238. func (op *Operations) CheckHasOnlyOne() (bool, error) {
  239. var count int64
  240. err := op.processDB.Count(&count).Error
  241. if err != nil {
  242. return false, err
  243. }
  244. return count == 1, nil
  245. }