operations.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. package db_operations
  2. import (
  3. "database/sql"
  4. "git.sxidc.com/service-supports/fslog"
  5. "github.com/mitchellh/mapstructure"
  6. "gorm.io/gorm"
  7. "strings"
  8. "time"
  9. )
  10. type Operations struct {
  11. initDB *gorm.DB
  12. processDB *gorm.DB
  13. stopPingChan chan any
  14. }
  15. func NewOperationsFromMap(configMap map[string]any) (*Operations, error) {
  16. dbConfig := new(DBConfig)
  17. err := mapstructure.Decode(configMap, dbConfig)
  18. if err != nil {
  19. return nil, err
  20. }
  21. return NewOperations(dbConfig)
  22. }
  23. func NewOperations(dbConfig *DBConfig) (*Operations, error) {
  24. gormDB, err := newGormDB(dbConfig)
  25. if err != nil {
  26. return nil, err
  27. }
  28. sqlDB, err := gormDB.DB()
  29. if err != nil {
  30. return nil, err
  31. }
  32. if dbConfig.MaxConnections == 0 {
  33. dbConfig.MaxConnections = 50
  34. }
  35. if dbConfig.MaxIdleConnections == 0 {
  36. dbConfig.MaxIdleConnections = 10
  37. }
  38. sqlDB.SetMaxOpenConns(dbConfig.MaxConnections)
  39. sqlDB.SetMaxIdleConns(dbConfig.MaxIdleConnections)
  40. op := &Operations{
  41. initDB: gormDB,
  42. processDB: gormDB,
  43. stopPingChan: make(chan any),
  44. }
  45. op.startBeatHeart(sqlDB)
  46. return op, nil
  47. }
  48. func DestroyOperation(op *Operations) error {
  49. if op == nil {
  50. return nil
  51. }
  52. if op.initDB == nil {
  53. return nil
  54. }
  55. op.stopBeatHeart()
  56. return destroyGormDB(op.initDB)
  57. }
  58. func (op *Operations) startBeatHeart(sqlDB *sql.DB) {
  59. go func() {
  60. pingTicker := time.NewTicker(time.Minute * 1)
  61. defer pingTicker.Stop()
  62. for {
  63. select {
  64. case <-op.stopPingChan:
  65. return
  66. case <-pingTicker.C:
  67. err := sqlDB.Ping()
  68. if err != nil {
  69. fslog.Error(err)
  70. }
  71. }
  72. }
  73. }()
  74. }
  75. func (op *Operations) stopBeatHeart() {
  76. if op.stopPingChan != nil {
  77. close(op.stopPingChan)
  78. op.stopPingChan = nil
  79. }
  80. }
  81. func (op *Operations) BeginTransaction() TransactionDBOperations {
  82. tx := op.initDB.Begin()
  83. return &TransactionOperations{
  84. Operations{
  85. initDB: tx,
  86. processDB: tx,
  87. },
  88. }
  89. }
  90. func (op *Operations) NewSession() DBOperations {
  91. return &Operations{
  92. initDB: op.initDB,
  93. processDB: op.initDB,
  94. }
  95. }
  96. func (op *Operations) Table(name string, args ...any) DBOperations {
  97. op.processDB = op.initDB.Table(name, args...)
  98. return op
  99. }
  100. func (op *Operations) Raw(sql string, values ...any) DBOperations {
  101. op.processDB = op.processDB.Raw(sql, values...)
  102. return op
  103. }
  104. func (op *Operations) Select(query string, args ...any) DBOperations {
  105. op.processDB = op.processDB.Select(query, args...)
  106. return op
  107. }
  108. func (op *Operations) Joins(query string, args ...any) DBOperations {
  109. op.processDB = op.processDB.Joins(query, args...)
  110. return op
  111. }
  112. func (op *Operations) Where(conditions *Conditions) DBOperations {
  113. op.processDB = conditions.where(op.processDB)
  114. return op
  115. }
  116. func (op *Operations) Or(conditions *Conditions) DBOperations {
  117. op.processDB = conditions.or(op.processDB)
  118. return op
  119. }
  120. func (op *Operations) Having(conditions *Conditions) DBOperations {
  121. op.processDB = conditions.having(op.processDB)
  122. return op
  123. }
  124. func (op *Operations) GroupBy(groupBy string) DBOperations {
  125. op.processDB = op.processDB.Group(groupBy)
  126. return op
  127. }
  128. func (op *Operations) OrderBy(orderBy string) DBOperations {
  129. op.processDB = op.processDB.Order(orderBy)
  130. return op
  131. }
  132. func (op *Operations) Paging(pageNo int, pageSize int) DBOperations {
  133. if pageNo != 0 && pageSize != 0 {
  134. offset := -1
  135. if pageNo == -1 || pageSize == -1 {
  136. offset = -1
  137. pageSize = -1
  138. } else {
  139. offset = (pageNo - 1) * pageSize
  140. }
  141. op.processDB = op.processDB.Offset(offset).Limit(pageSize)
  142. }
  143. return op
  144. }
  145. func (op *Operations) Create(tableRow *TableRow) error {
  146. err := op.processDB.Create(tableRow.ToMap()).Error
  147. if err != nil {
  148. if strings.Contains(err.Error(), "SQLSTATE 23505") {
  149. return ErrDBRecordHasExist
  150. }
  151. return err
  152. }
  153. return nil
  154. }
  155. func (op *Operations) CreateBatch(tableRows []TableRow) error {
  156. tableRowMaps := make([]map[string]any, 0)
  157. for _, tableRow := range tableRows {
  158. tableRowMaps = append(tableRowMaps, tableRow.ToMap())
  159. }
  160. err := op.processDB.Create(tableRowMaps).Error
  161. if err != nil {
  162. if strings.Contains(err.Error(), "SQLSTATE 23505") {
  163. return ErrDBRecordHasExist
  164. }
  165. return err
  166. }
  167. return nil
  168. }
  169. func (op *Operations) Delete() error {
  170. return op.processDB.Delete(make(map[string]any)).Error
  171. }
  172. func (op *Operations) Updates(newTableRow *TableRow) error {
  173. err := op.processDB.Updates(newTableRow.ToMap()).Error
  174. if err != nil {
  175. if strings.Contains(err.Error(), "SQLSTATE 23505") {
  176. return ErrDBRecordHasExist
  177. }
  178. return err
  179. }
  180. return nil
  181. }
  182. func (op *Operations) UpdatesWithRowsAffected(newTableRow *TableRow) (int64, error) {
  183. op.processDB = op.processDB.Updates(newTableRow.ToMap())
  184. if op.processDB.Error != nil {
  185. return 0, op.processDB.Error
  186. }
  187. return op.processDB.RowsAffected, nil
  188. }
  189. func (op *Operations) Rows(pageNo int, pageSize int) ([]TableRow, error) {
  190. if pageNo != 0 && pageSize != 0 {
  191. offset := (pageNo - 1) * pageSize
  192. op.processDB = op.processDB.Offset(offset).Limit(pageSize)
  193. }
  194. defer func() {
  195. op.processDB = op.processDB.Offset(-1).Limit(-1)
  196. }()
  197. tableRowMaps := make([]map[string]any, 0)
  198. err := op.processDB.Scan(&tableRowMaps).Error
  199. if err != nil {
  200. return nil, err
  201. }
  202. tableRows := make([]TableRow, 0)
  203. for _, tableRowMap := range tableRowMaps {
  204. tableRows = append(tableRows, *NewTableRowFromMap(tableRowMap))
  205. }
  206. return tableRows, nil
  207. }
  208. func (op *Operations) Row() (*TableRow, error) {
  209. valueMap := make(map[string]any)
  210. err := op.processDB.Scan(&valueMap).Error
  211. if err != nil {
  212. return nil, err
  213. }
  214. if valueMap == nil || len(valueMap) == 0 {
  215. return nil, ErrDBRecordNotExist
  216. }
  217. return NewTableRowFromMap(valueMap), nil
  218. }
  219. func (op *Operations) Count(count *int64) error {
  220. return op.processDB.Count(count).Error
  221. }
  222. func (op *Operations) CheckExist() (bool, error) {
  223. var count int64
  224. err := op.processDB.Count(&count).Error
  225. if err != nil {
  226. return false, err
  227. }
  228. return count > 0, nil
  229. }
  230. func (op *Operations) CheckHasOnlyOne() (bool, error) {
  231. var count int64
  232. err := op.processDB.Count(&count).Error
  233. if err != nil {
  234. return false, err
  235. }
  236. return count == 1, nil
  237. }