operations.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. package operations
  2. import (
  3. "database/sql"
  4. "git.sxidc.com/service-supports/fslog"
  5. "gorm.io/gorm"
  6. "strings"
  7. "time"
  8. )
  9. type Operations struct {
  10. initDB *gorm.DB
  11. processDB *gorm.DB
  12. stopPingChan chan any
  13. }
  14. func NewOperations(dbConfig *DBConfig) (*Operations, error) {
  15. gormDB, err := newGormDB(dbConfig)
  16. if err != nil {
  17. return nil, err
  18. }
  19. sqlDB, err := gormDB.DB()
  20. if err != nil {
  21. return nil, err
  22. }
  23. if dbConfig.MaxConnections == 0 {
  24. dbConfig.MaxConnections = 50
  25. }
  26. if dbConfig.MaxIdleConnections == 0 {
  27. dbConfig.MaxIdleConnections = 10
  28. }
  29. sqlDB.SetMaxOpenConns(dbConfig.MaxConnections)
  30. sqlDB.SetMaxIdleConns(dbConfig.MaxIdleConnections)
  31. op := &Operations{
  32. initDB: gormDB,
  33. processDB: gormDB,
  34. stopPingChan: make(chan any),
  35. }
  36. op.startBeatHeart(sqlDB)
  37. return op, nil
  38. }
  39. func DestroyOperation(op *Operations) error {
  40. if op == nil {
  41. return nil
  42. }
  43. if op.initDB == nil {
  44. return nil
  45. }
  46. op.stopBeatHeart()
  47. return destroyGormDB(op.initDB)
  48. }
  49. func (op *Operations) startBeatHeart(sqlDB *sql.DB) {
  50. go func() {
  51. pingTicker := time.NewTicker(time.Minute * 1)
  52. defer pingTicker.Stop()
  53. for {
  54. select {
  55. case <-op.stopPingChan:
  56. return
  57. case <-pingTicker.C:
  58. err := sqlDB.Ping()
  59. if err != nil {
  60. fslog.Error(err)
  61. }
  62. }
  63. }
  64. }()
  65. }
  66. func (op *Operations) stopBeatHeart() {
  67. if op.stopPingChan != nil {
  68. close(op.stopPingChan)
  69. op.stopPingChan = nil
  70. }
  71. }
  72. func (op *Operations) BeginTransaction() TransactionDBOperations {
  73. tx := op.initDB.Begin()
  74. return &TransactionOperations{
  75. Operations{
  76. initDB: tx,
  77. processDB: tx,
  78. },
  79. }
  80. }
  81. func (op *Operations) NewSession() DBOperations {
  82. return &Operations{
  83. initDB: op.initDB,
  84. processDB: op.initDB,
  85. }
  86. }
  87. func (op *Operations) AutoMigrate(tables ...Table) error {
  88. dbModels := make([]any, 0)
  89. for _, table := range tables {
  90. dbModel, err := table.ToDBModel()
  91. if err != nil {
  92. return err
  93. }
  94. dbModels = append(dbModels, dbModel)
  95. }
  96. return op.processDB.AutoMigrate(dbModels...)
  97. }
  98. func (op *Operations) Table(name string, args ...any) DBOperations {
  99. op.processDB = op.initDB.Table(name, args...)
  100. return op
  101. }
  102. func (op *Operations) Raw(sql string, values ...any) DBOperations {
  103. op.processDB = op.processDB.Raw(sql, values...)
  104. return op
  105. }
  106. func (op *Operations) Select(query string, args ...any) DBOperations {
  107. op.processDB = op.processDB.Select(query, args...)
  108. return op
  109. }
  110. func (op *Operations) Joins(query string, args ...any) DBOperations {
  111. op.processDB = op.processDB.Joins(query, args...)
  112. return op
  113. }
  114. func (op *Operations) Where(conditions *Conditions) DBOperations {
  115. op.processDB = conditions.where(op.processDB)
  116. return op
  117. }
  118. func (op *Operations) Or(conditions *Conditions) DBOperations {
  119. op.processDB = conditions.or(op.processDB)
  120. return op
  121. }
  122. func (op *Operations) Having(conditions *Conditions) DBOperations {
  123. op.processDB = conditions.having(op.processDB)
  124. return op
  125. }
  126. func (op *Operations) GroupBy(groupBy string) DBOperations {
  127. op.processDB = op.processDB.Group(groupBy)
  128. return op
  129. }
  130. func (op *Operations) OrderBy(orderBy string) DBOperations {
  131. op.processDB = op.processDB.Order(orderBy)
  132. return op
  133. }
  134. func (op *Operations) Paging(pageNo int, pageSize int) DBOperations {
  135. if pageNo != 0 && pageSize != 0 {
  136. offset := -1
  137. if pageNo == -1 || pageSize == -1 {
  138. offset = -1
  139. pageSize = -1
  140. } else {
  141. offset = (pageNo - 1) * pageSize
  142. }
  143. op.processDB = op.processDB.Offset(offset).Limit(pageSize)
  144. }
  145. return op
  146. }
  147. func (op *Operations) Create(tableRow *TableRow) error {
  148. err := op.processDB.Create(tableRow.ToMap()).Error
  149. if err != nil {
  150. if strings.Contains(err.Error(), "SQLSTATE 23505") {
  151. return ErrDBRecordHasExist
  152. }
  153. return err
  154. }
  155. return nil
  156. }
  157. func (op *Operations) CreateBatch(tableRows []TableRow) error {
  158. tableRowMaps := make([]map[string]any, 0)
  159. for _, tableRow := range tableRows {
  160. tableRowMaps = append(tableRowMaps, tableRow.ToMap())
  161. }
  162. err := op.processDB.Create(tableRowMaps).Error
  163. if err != nil {
  164. if strings.Contains(err.Error(), "SQLSTATE 23505") {
  165. return ErrDBRecordHasExist
  166. }
  167. return err
  168. }
  169. return nil
  170. }
  171. func (op *Operations) Delete() error {
  172. return op.processDB.Delete(make(map[string]any)).Error
  173. }
  174. func (op *Operations) Updates(newTableRow *TableRow) error {
  175. err := op.processDB.Updates(newTableRow.ToMap()).Error
  176. if err != nil {
  177. if strings.Contains(err.Error(), "SQLSTATE 23505") {
  178. return ErrDBRecordHasExist
  179. }
  180. return err
  181. }
  182. return nil
  183. }
  184. func (op *Operations) UpdatesWithRowsAffected(newTableRow *TableRow) (int64, error) {
  185. op.processDB = op.processDB.Updates(newTableRow.ToMap())
  186. if op.processDB.Error != nil {
  187. return 0, op.processDB.Error
  188. }
  189. return op.processDB.RowsAffected, nil
  190. }
  191. func (op *Operations) Rows(pageNo int, pageSize int) ([]TableRow, error) {
  192. if pageNo != 0 && pageSize != 0 {
  193. offset := (pageNo - 1) * pageSize
  194. op.processDB = op.processDB.Offset(offset).Limit(pageSize)
  195. }
  196. defer func() {
  197. op.processDB = op.processDB.Offset(-1).Limit(-1)
  198. }()
  199. tableRowMaps := make([]map[string]any, 0)
  200. err := op.processDB.Scan(&tableRowMaps).Error
  201. if err != nil {
  202. return nil, err
  203. }
  204. tableRows := make([]TableRow, 0)
  205. for _, tableRowMap := range tableRowMaps {
  206. tableRows = append(tableRows, *NewTableRowFromMap(tableRowMap))
  207. }
  208. return tableRows, nil
  209. }
  210. func (op *Operations) Row() (*TableRow, error) {
  211. valueMap := make(map[string]any)
  212. err := op.processDB.Scan(&valueMap).Error
  213. if err != nil {
  214. return nil, err
  215. }
  216. if valueMap == nil || len(valueMap) == 0 {
  217. return nil, ErrDBRecordNotExist
  218. }
  219. return NewTableRowFromMap(valueMap), nil
  220. }
  221. func (op *Operations) Count(count *int64) error {
  222. return op.processDB.Count(count).Error
  223. }
  224. func (op *Operations) CheckExist() (bool, error) {
  225. var count int64
  226. err := op.processDB.Count(&count).Error
  227. if err != nil {
  228. return false, err
  229. }
  230. return count > 0, nil
  231. }
  232. func (op *Operations) CheckHasOnlyOne() (bool, error) {
  233. var count int64
  234. err := op.processDB.Count(&count).Error
  235. if err != nil {
  236. return false, err
  237. }
  238. return count == 1, nil
  239. }