operations.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. package operations
  2. import (
  3. "database/sql"
  4. "git.sxidc.com/service-supports/fslog"
  5. "gorm.io/gorm"
  6. "strings"
  7. "time"
  8. )
  9. type Operations struct {
  10. initDB *gorm.DB
  11. processDB *gorm.DB
  12. stopPingChan chan any
  13. }
  14. func NewOperations(dbConfig *DBConfig) (*Operations, error) {
  15. gormDB, err := newGormDB(dbConfig)
  16. if err != nil {
  17. return nil, err
  18. }
  19. sqlDB, err := gormDB.DB()
  20. if err != nil {
  21. return nil, err
  22. }
  23. if dbConfig.MaxConnections == 0 {
  24. dbConfig.MaxConnections = 50
  25. }
  26. if dbConfig.MaxIdleConnections == 0 {
  27. dbConfig.MaxIdleConnections = 10
  28. }
  29. sqlDB.SetMaxOpenConns(dbConfig.MaxConnections)
  30. sqlDB.SetMaxIdleConns(dbConfig.MaxIdleConnections)
  31. op := &Operations{
  32. initDB: gormDB,
  33. processDB: gormDB,
  34. stopPingChan: make(chan any),
  35. }
  36. op.startBeatHeart(sqlDB)
  37. return op, nil
  38. }
  39. func DestroyOperation(op *Operations) error {
  40. if op == nil {
  41. return nil
  42. }
  43. if op.initDB == nil {
  44. return nil
  45. }
  46. op.stopBeatHeart()
  47. return destroyGormDB(op.initDB)
  48. }
  49. func (op *Operations) startBeatHeart(sqlDB *sql.DB) {
  50. go func() {
  51. pingTicker := time.NewTicker(time.Minute * 1)
  52. defer pingTicker.Stop()
  53. for {
  54. select {
  55. case <-op.stopPingChan:
  56. return
  57. case <-pingTicker.C:
  58. err := sqlDB.Ping()
  59. if err != nil {
  60. fslog.Error(err)
  61. }
  62. }
  63. }
  64. }()
  65. }
  66. func (op *Operations) stopBeatHeart() {
  67. if op.stopPingChan != nil {
  68. close(op.stopPingChan)
  69. op.stopPingChan = nil
  70. }
  71. }
  72. func (op *Operations) BeginTransaction() TransactionDBOperations {
  73. tx := op.initDB.Begin()
  74. return &TransactionOperations{
  75. Operations{
  76. initDB: tx,
  77. processDB: tx,
  78. },
  79. }
  80. }
  81. func (op *Operations) NewSession() DBOperations {
  82. return &Operations{
  83. initDB: op.initDB,
  84. processDB: op.initDB,
  85. }
  86. }
  87. func (op *Operations) Table(name string, args ...any) DBOperations {
  88. op.processDB = op.initDB.Table(name, args...)
  89. return op
  90. }
  91. func (op *Operations) Raw(sql string, values ...any) DBOperations {
  92. op.processDB = op.processDB.Raw(sql, values...)
  93. return op
  94. }
  95. func (op *Operations) Select(query string, args ...any) DBOperations {
  96. op.processDB = op.processDB.Select(query, args...)
  97. return op
  98. }
  99. func (op *Operations) Joins(query string, args ...any) DBOperations {
  100. op.processDB = op.processDB.Joins(query, args...)
  101. return op
  102. }
  103. func (op *Operations) Where(conditions *Conditions) DBOperations {
  104. op.processDB = conditions.where(op.processDB)
  105. return op
  106. }
  107. func (op *Operations) Or(conditions *Conditions) DBOperations {
  108. op.processDB = conditions.or(op.processDB)
  109. return op
  110. }
  111. func (op *Operations) Having(conditions *Conditions) DBOperations {
  112. op.processDB = conditions.having(op.processDB)
  113. return op
  114. }
  115. func (op *Operations) GroupBy(groupBy string) DBOperations {
  116. op.processDB = op.processDB.Group(groupBy)
  117. return op
  118. }
  119. func (op *Operations) OrderBy(orderBy string) DBOperations {
  120. op.processDB = op.processDB.Order(orderBy)
  121. return op
  122. }
  123. func (op *Operations) Paging(pageNo int, pageSize int) DBOperations {
  124. if pageNo != 0 && pageSize != 0 {
  125. offset := -1
  126. if pageNo == -1 || pageSize == -1 {
  127. offset = -1
  128. pageSize = -1
  129. } else {
  130. offset = (pageNo - 1) * pageSize
  131. }
  132. op.processDB = op.processDB.Offset(offset).Limit(pageSize)
  133. }
  134. return op
  135. }
  136. func (op *Operations) Create(tableRow *TableRow) error {
  137. err := op.processDB.Create(tableRow.ToMap()).Error
  138. if err != nil {
  139. if strings.Contains(err.Error(), "SQLSTATE 23505") {
  140. return ErrDBRecordHasExist
  141. }
  142. return err
  143. }
  144. return nil
  145. }
  146. func (op *Operations) CreateBatch(tableRows []TableRow) error {
  147. tableRowMaps := make([]map[string]any, 0)
  148. for _, tableRow := range tableRows {
  149. tableRowMaps = append(tableRowMaps, tableRow.ToMap())
  150. }
  151. err := op.processDB.Create(tableRowMaps).Error
  152. if err != nil {
  153. if strings.Contains(err.Error(), "SQLSTATE 23505") {
  154. return ErrDBRecordHasExist
  155. }
  156. return err
  157. }
  158. return nil
  159. }
  160. func (op *Operations) Delete() error {
  161. return op.processDB.Delete(make(map[string]any)).Error
  162. }
  163. func (op *Operations) Updates(newTableRow *TableRow) error {
  164. err := op.processDB.Updates(newTableRow.ToMap()).Error
  165. if err != nil {
  166. if strings.Contains(err.Error(), "SQLSTATE 23505") {
  167. return ErrDBRecordHasExist
  168. }
  169. return err
  170. }
  171. return nil
  172. }
  173. func (op *Operations) UpdatesWithRowsAffected(newTableRow *TableRow) (int64, error) {
  174. op.processDB = op.processDB.Updates(newTableRow.ToMap())
  175. if op.processDB.Error != nil {
  176. return 0, op.processDB.Error
  177. }
  178. return op.processDB.RowsAffected, nil
  179. }
  180. func (op *Operations) Rows(pageNo int, pageSize int) ([]TableRow, error) {
  181. if pageNo != 0 && pageSize != 0 {
  182. offset := (pageNo - 1) * pageSize
  183. op.processDB = op.processDB.Offset(offset).Limit(pageSize)
  184. }
  185. defer func() {
  186. op.processDB = op.processDB.Offset(-1).Limit(-1)
  187. }()
  188. tableRowMaps := make([]map[string]any, 0)
  189. err := op.processDB.Scan(&tableRowMaps).Error
  190. if err != nil {
  191. return nil, err
  192. }
  193. tableRows := make([]TableRow, 0)
  194. for _, tableRowMap := range tableRowMaps {
  195. tableRows = append(tableRows, *NewTableRowFromMap(tableRowMap))
  196. }
  197. return tableRows, nil
  198. }
  199. func (op *Operations) Row() (*TableRow, error) {
  200. valueMap := make(map[string]any)
  201. err := op.processDB.Scan(&valueMap).Error
  202. if err != nil {
  203. return nil, err
  204. }
  205. if valueMap == nil || len(valueMap) == 0 {
  206. return nil, ErrDBRecordNotExist
  207. }
  208. return NewTableRowFromMap(valueMap), nil
  209. }
  210. func (op *Operations) Count(count *int64) error {
  211. return op.processDB.Count(count).Error
  212. }
  213. func (op *Operations) CheckExist() (bool, error) {
  214. var count int64
  215. err := op.processDB.Count(&count).Error
  216. if err != nil {
  217. return false, err
  218. }
  219. return count > 0, nil
  220. }
  221. func (op *Operations) CheckHasOnlyOne() (bool, error) {
  222. var count int64
  223. err := op.processDB.Count(&count).Error
  224. if err != nil {
  225. return false, err
  226. }
  227. return count == 1, nil
  228. }