|
|
@@ -0,0 +1,296 @@
|
|
|
+package db_operations
|
|
|
+
|
|
|
+import (
|
|
|
+ "database/sql"
|
|
|
+ "errors"
|
|
|
+ "git.sxidc.com/service-supports/ds-sdk/db_operations/dberr"
|
|
|
+ "git.sxidc.com/service-supports/fslog"
|
|
|
+ "github.com/mitchellh/mapstructure"
|
|
|
+ "gorm.io/gorm"
|
|
|
+ "strings"
|
|
|
+ "time"
|
|
|
+)
|
|
|
+
|
|
|
+type TransactionOperations struct {
|
|
|
+ Operations
|
|
|
+}
|
|
|
+
|
|
|
+func (op *TransactionOperations) RollbackTransaction() {
|
|
|
+ defer func() {
|
|
|
+ op.processDB = op.initDB
|
|
|
+ }()
|
|
|
+
|
|
|
+ op.processDB.Rollback()
|
|
|
+}
|
|
|
+
|
|
|
+func (op *TransactionOperations) CommitTransaction() {
|
|
|
+ defer func() {
|
|
|
+ op.processDB = op.initDB
|
|
|
+ }()
|
|
|
+
|
|
|
+ op.processDB.Commit()
|
|
|
+}
|
|
|
+
|
|
|
+type Operations struct {
|
|
|
+ initDB *gorm.DB
|
|
|
+ processDB *gorm.DB
|
|
|
+ stopPingChan chan any
|
|
|
+}
|
|
|
+
|
|
|
+func NewOperationsFromMap(configMap map[string]any) (*Operations, error) {
|
|
|
+ dbConfig := new(DBConfig)
|
|
|
+ err := mapstructure.Decode(configMap, dbConfig)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ return NewOperations(dbConfig)
|
|
|
+}
|
|
|
+
|
|
|
+func NewOperations(dbConfig *DBConfig) (*Operations, error) {
|
|
|
+ gormDB, err := newGormDB(dbConfig)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ sqlDB, err := gormDB.DB()
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ if dbConfig.MaxConnections == 0 {
|
|
|
+ dbConfig.MaxConnections = 50
|
|
|
+ }
|
|
|
+
|
|
|
+ if dbConfig.MaxIdleConnections == 0 {
|
|
|
+ dbConfig.MaxIdleConnections = 10
|
|
|
+ }
|
|
|
+
|
|
|
+ sqlDB.SetMaxOpenConns(dbConfig.MaxConnections)
|
|
|
+ sqlDB.SetMaxIdleConns(dbConfig.MaxIdleConnections)
|
|
|
+
|
|
|
+ op := &Operations{
|
|
|
+ initDB: gormDB,
|
|
|
+ processDB: gormDB,
|
|
|
+ stopPingChan: make(chan any),
|
|
|
+ }
|
|
|
+
|
|
|
+ op.startBeatHeart(sqlDB)
|
|
|
+
|
|
|
+ return op, nil
|
|
|
+}
|
|
|
+
|
|
|
+func DestroyOperation(op *Operations) error {
|
|
|
+ if op == nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ if op.initDB == nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ op.stopBeatHeart()
|
|
|
+
|
|
|
+ return destroyGormDB(op.initDB)
|
|
|
+}
|
|
|
+
|
|
|
+func (op *Operations) startBeatHeart(sqlDB *sql.DB) {
|
|
|
+ go func() {
|
|
|
+ pingTicker := time.NewTicker(time.Minute * 1)
|
|
|
+ defer pingTicker.Stop()
|
|
|
+
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-op.stopPingChan:
|
|
|
+ return
|
|
|
+ case <-pingTicker.C:
|
|
|
+ err := sqlDB.Ping()
|
|
|
+ if err != nil {
|
|
|
+ fslog.Error(err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }()
|
|
|
+}
|
|
|
+
|
|
|
+func (op *Operations) stopBeatHeart() {
|
|
|
+ if op.stopPingChan != nil {
|
|
|
+ close(op.stopPingChan)
|
|
|
+ op.stopPingChan = nil
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (op *Operations) BeginTransaction() TransactionDBOperations {
|
|
|
+ tx := op.initDB.Begin()
|
|
|
+ return &TransactionOperations{
|
|
|
+ Operations{
|
|
|
+ initDB: tx,
|
|
|
+ processDB: tx,
|
|
|
+ },
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (op *Operations) NewSession() DBOperations {
|
|
|
+ return &Operations{
|
|
|
+ initDB: op.initDB,
|
|
|
+ processDB: op.initDB,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (op *Operations) Table(name string, args ...any) DBOperations {
|
|
|
+ op.processDB = op.initDB.Table(name, args...)
|
|
|
+ return op
|
|
|
+}
|
|
|
+
|
|
|
+func (op *Operations) Raw(sql string, values ...any) DBOperations {
|
|
|
+ op.processDB = op.initDB.Raw(sql, values...)
|
|
|
+ return op
|
|
|
+}
|
|
|
+
|
|
|
+func (op *Operations) Select(query string, args ...any) DBOperations {
|
|
|
+ op.processDB = op.processDB.Select(query, args...)
|
|
|
+ return op
|
|
|
+}
|
|
|
+
|
|
|
+func (op *Operations) Where(query string, args ...any) DBOperations {
|
|
|
+ op.processDB = op.processDB.Where(query, args...)
|
|
|
+ return op
|
|
|
+}
|
|
|
+
|
|
|
+func (op *Operations) Or(query string, args ...any) DBOperations {
|
|
|
+ op.processDB = op.processDB.Or(query, args...)
|
|
|
+ return op
|
|
|
+}
|
|
|
+
|
|
|
+func (op *Operations) GroupBy(groupBy string) DBOperations {
|
|
|
+ op.processDB = op.processDB.Group(groupBy)
|
|
|
+ return op
|
|
|
+}
|
|
|
+
|
|
|
+func (op *Operations) OrderBy(orderBy string) DBOperations {
|
|
|
+ op.processDB = op.processDB.Order(orderBy)
|
|
|
+ return op
|
|
|
+}
|
|
|
+
|
|
|
+func (op *Operations) Joins(query string, args ...any) DBOperations {
|
|
|
+ op.processDB = op.processDB.Joins(query, args...)
|
|
|
+ return op
|
|
|
+}
|
|
|
+
|
|
|
+func (op *Operations) Having(query string, args ...any) DBOperations {
|
|
|
+ op.processDB = op.processDB.Having(query, args...)
|
|
|
+ return op
|
|
|
+}
|
|
|
+
|
|
|
+func (op *Operations) Paging(pageNo int, pageSize int) DBOperations {
|
|
|
+ if pageNo != 0 && pageSize != 0 {
|
|
|
+ offset := -1
|
|
|
+
|
|
|
+ if pageNo == -1 || pageSize == -1 {
|
|
|
+ offset = -1
|
|
|
+ pageSize = -1
|
|
|
+ } else {
|
|
|
+ offset = (pageNo - 1) * pageSize
|
|
|
+ }
|
|
|
+
|
|
|
+ op.processDB = op.processDB.Offset(offset).Limit(pageSize)
|
|
|
+ }
|
|
|
+
|
|
|
+ return op
|
|
|
+}
|
|
|
+
|
|
|
+func (op *Operations) Create(model interface{}) error {
|
|
|
+ err := op.processDB.Create(model).Error
|
|
|
+ if err != nil {
|
|
|
+ if strings.Contains(err.Error(), "SQLSTATE 23505") {
|
|
|
+ return dberr.ErrDBRecordHasExist
|
|
|
+ }
|
|
|
+
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (op *Operations) Delete(idModel interface{}) error {
|
|
|
+ return op.processDB.Delete(idModel).Error
|
|
|
+}
|
|
|
+
|
|
|
+func (op *Operations) Updates(idModel interface{}, updateData map[string]any) error {
|
|
|
+ err := op.processDB.Model(idModel).Updates(updateData).Error
|
|
|
+ if err != nil {
|
|
|
+ if strings.Contains(err.Error(), "SQLSTATE 23505") {
|
|
|
+ return dberr.ErrDBRecordHasExist
|
|
|
+ }
|
|
|
+
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (op *Operations) UpdatesWithRowsAffected(idModel interface{}, updateData map[string]any) (int64, error) {
|
|
|
+ op.processDB = op.processDB.Model(idModel).Updates(updateData)
|
|
|
+ if op.processDB.Error != nil {
|
|
|
+ return 0, op.processDB.Error
|
|
|
+ }
|
|
|
+
|
|
|
+ return op.processDB.RowsAffected, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (op *Operations) Query(models interface{}, pageNo int, pageSize int) error {
|
|
|
+ if pageNo != 0 && pageSize != 0 {
|
|
|
+ offset := (pageNo - 1) * pageSize
|
|
|
+ op.processDB = op.processDB.Offset(offset).Limit(pageSize)
|
|
|
+ }
|
|
|
+
|
|
|
+ err := op.processDB.Find(models).Error
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ op.processDB = op.processDB.Offset(-1).Limit(-1)
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (op *Operations) QueryOne(model interface{}) error {
|
|
|
+ err := op.processDB.First(model).Error
|
|
|
+ if err != nil {
|
|
|
+ if errors.Is(err, gorm.ErrRecordNotFound) {
|
|
|
+ return dberr.ErrDBRecordNotExist
|
|
|
+ }
|
|
|
+
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (op *Operations) Row() (map[string]any, error) {
|
|
|
+ valueMap := make(map[string]any)
|
|
|
+ err := op.processDB.Scan(&valueMap).Error
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ if valueMap == nil || len(valueMap) == 0 {
|
|
|
+ return nil, dberr.ErrDBRecordNotExist
|
|
|
+ }
|
|
|
+
|
|
|
+ return valueMap, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (op *Operations) Rows() ([]map[string]any, error) {
|
|
|
+ valueMaps := make([]map[string]any, 0)
|
|
|
+ err := op.processDB.Scan(&valueMaps).Error
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ return valueMaps, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (op *Operations) Count(count *int64) error {
|
|
|
+ return op.processDB.Count(count).Error
|
|
|
+}
|