package operations import ( "database/sql" "git.sxidc.com/service-supports/fslog" "gorm.io/gorm" "strings" "time" ) type Operations struct { initDB *gorm.DB processDB *gorm.DB stopPingChan chan any } func NewOperations(dbConfig *DBConfig) (*Operations, error) { gormDB, err := newGormDB(dbConfig) if err != nil { return nil, err } sqlDB, err := gormDB.DB() if err != nil { return nil, err } if dbConfig.MaxConnections == 0 { dbConfig.MaxConnections = 50 } if dbConfig.MaxIdleConnections == 0 { dbConfig.MaxIdleConnections = 10 } sqlDB.SetMaxOpenConns(dbConfig.MaxConnections) sqlDB.SetMaxIdleConns(dbConfig.MaxIdleConnections) op := &Operations{ initDB: gormDB, processDB: gormDB, stopPingChan: make(chan any), } op.startBeatHeart(sqlDB) return op, nil } func DestroyOperation(op *Operations) error { if op == nil { return nil } if op.initDB == nil { return nil } op.stopBeatHeart() return destroyGormDB(op.initDB) } func (op *Operations) startBeatHeart(sqlDB *sql.DB) { go func() { pingTicker := time.NewTicker(time.Minute * 1) defer pingTicker.Stop() for { select { case <-op.stopPingChan: return case <-pingTicker.C: err := sqlDB.Ping() if err != nil { fslog.Error(err) } } } }() } func (op *Operations) stopBeatHeart() { if op.stopPingChan != nil { close(op.stopPingChan) op.stopPingChan = nil } } func (op *Operations) BeginTransaction() TransactionDBOperations { tx := op.initDB.Begin() return &TransactionOperations{ Operations{ initDB: tx, processDB: tx, }, } } func (op *Operations) NewSession() DBOperations { return &Operations{ initDB: op.initDB, processDB: op.initDB, } } func (op *Operations) AutoMigrate(tables ...Table) error { tx := op.processDB.Begin() for _, table := range tables { dbModel, err := table.ToDBModel() if err != nil { tx.Rollback() return err } err = tx.Table(table.TableName).AutoMigrate(dbModel) if err != nil { tx.Rollback() return err } } tx.Commit() return nil } func (op *Operations) Table(name string, args ...any) DBOperations { return &Operations{ initDB: op.initDB, processDB: op.initDB.Table(name, args...), } } func (op *Operations) Raw(sql string, values ...any) DBOperations { op.processDB = op.processDB.Raw(sql, values...) return op } func (op *Operations) Select(query string, args ...any) DBOperations { op.processDB = op.processDB.Select(query, args...) return op } func (op *Operations) Joins(query string, args ...any) DBOperations { op.processDB = op.processDB.Joins(query, args...) return op } func (op *Operations) Where(conditions *Conditions) DBOperations { op.processDB = conditions.where(op.processDB) return op } func (op *Operations) Or(conditions *Conditions) DBOperations { op.processDB = conditions.or(op.processDB) return op } func (op *Operations) Having(conditions *Conditions) DBOperations { op.processDB = conditions.having(op.processDB) return op } func (op *Operations) GroupBy(groupBy string) DBOperations { op.processDB = op.processDB.Group(groupBy) return op } func (op *Operations) OrderBy(orderBy string) DBOperations { op.processDB = op.processDB.Order(orderBy) return op } func (op *Operations) Paging(pageNo int, pageSize int) DBOperations { if pageNo != 0 && pageSize != 0 { offset := -1 if pageNo == -1 || pageSize == -1 { offset = -1 pageSize = -1 } else { offset = (pageNo - 1) * pageSize } op.processDB = op.processDB.Offset(offset).Limit(pageSize) } return op } func (op *Operations) Create(tableRow *TableRow) error { err := op.processDB.Create(tableRow.ToMap()).Error if err != nil { if strings.Contains(err.Error(), "SQLSTATE 23505") { return ErrDBRecordHasExist } return err } return nil } func (op *Operations) CreateBatch(tableRows []TableRow) error { tableRowMaps := make([]map[string]any, 0) for _, tableRow := range tableRows { tableRowMaps = append(tableRowMaps, tableRow.ToMap()) } err := op.processDB.Create(tableRowMaps).Error if err != nil { if strings.Contains(err.Error(), "SQLSTATE 23505") { return ErrDBRecordHasExist } return err } return nil } func (op *Operations) Delete() error { return op.processDB.Delete(make(map[string]any)).Error } func (op *Operations) Updates(newTableRow *TableRow) error { err := op.processDB.Updates(newTableRow.ToMap()).Error if err != nil { if strings.Contains(err.Error(), "SQLSTATE 23505") { return ErrDBRecordHasExist } return err } return nil } func (op *Operations) UpdatesWithRowsAffected(newTableRow *TableRow) (int64, error) { op.processDB = op.processDB.Updates(newTableRow.ToMap()) if op.processDB.Error != nil { return 0, op.processDB.Error } return op.processDB.RowsAffected, nil } func (op *Operations) Rows(pageNo int, pageSize int) ([]TableRow, error) { if pageNo != 0 && pageSize != 0 { offset := (pageNo - 1) * pageSize op.processDB = op.processDB.Offset(offset).Limit(pageSize) } defer func() { op.processDB = op.processDB.Offset(-1).Limit(-1) }() tableRowMaps := make([]map[string]any, 0) err := op.processDB.Scan(&tableRowMaps).Error if err != nil { return nil, err } tableRows := make([]TableRow, 0) for _, tableRowMap := range tableRowMaps { tableRows = append(tableRows, *NewTableRowFromMap(tableRowMap)) } return tableRows, nil } func (op *Operations) Row() (*TableRow, error) { valueMap := make(map[string]any) err := op.processDB.Scan(&valueMap).Error if err != nil { return nil, err } if valueMap == nil || len(valueMap) == 0 { return nil, ErrDBRecordNotExist } return NewTableRowFromMap(valueMap), nil } func (op *Operations) Count(count *int64) error { return op.processDB.Count(count).Error } func (op *Operations) CheckExist() (bool, error) { var count int64 err := op.processDB.Count(&count).Error if err != nil { return false, err } return count > 0, nil } func (op *Operations) CheckHasOnlyOne() (bool, error) { var count int64 err := op.processDB.Count(&count).Error if err != nil { return false, err } return count == 1, nil }