operations.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package operations
  2. import (
  3. dbSql "database/sql"
  4. "encoding/json"
  5. "git.sxidc.com/go-framework/baize/infrastructure/database/sql"
  6. "git.sxidc.com/go-tools/utils/template"
  7. "git.sxidc.com/service-supports/fslog"
  8. "gorm.io/gorm"
  9. "time"
  10. )
  11. type Operations struct {
  12. db *gorm.DB
  13. stopPingChan chan any
  14. }
  15. func NewOperations(dbConfig *OperationsConfig) (*Operations, error) {
  16. gormDB, err := newGormDB(dbConfig)
  17. if err != nil {
  18. return nil, err
  19. }
  20. sqlDB, err := gormDB.DB()
  21. if err != nil {
  22. return nil, err
  23. }
  24. if dbConfig.MaxConnections == 0 {
  25. dbConfig.MaxConnections = 50
  26. }
  27. if dbConfig.MaxIdleConnections == 0 {
  28. dbConfig.MaxIdleConnections = 10
  29. }
  30. sqlDB.SetMaxOpenConns(dbConfig.MaxConnections)
  31. sqlDB.SetMaxIdleConns(dbConfig.MaxIdleConnections)
  32. op := &Operations{
  33. db: gormDB,
  34. stopPingChan: make(chan any),
  35. }
  36. op.startBeatHeart(sqlDB)
  37. return op, nil
  38. }
  39. func DestroyOperation(op *Operations) error {
  40. if op == nil {
  41. return nil
  42. }
  43. if op.db == nil {
  44. return nil
  45. }
  46. op.stopBeatHeart()
  47. return destroyGormDB(op.db)
  48. }
  49. func (op *Operations) startBeatHeart(sqlDB *dbSql.DB) {
  50. go func() {
  51. pingTicker := time.NewTicker(time.Minute * 1)
  52. defer pingTicker.Stop()
  53. for {
  54. select {
  55. case <-op.stopPingChan:
  56. return
  57. case <-pingTicker.C:
  58. err := sqlDB.Ping()
  59. if err != nil {
  60. fslog.Error(err)
  61. }
  62. }
  63. }
  64. }()
  65. }
  66. func (op *Operations) stopBeatHeart() {
  67. if op.stopPingChan != nil {
  68. close(op.stopPingChan)
  69. op.stopPingChan = nil
  70. }
  71. }
  72. func (op *Operations) BeginTransaction() *TransactionOperations {
  73. tx := op.db.Begin()
  74. return &TransactionOperations{
  75. Operations{
  76. db: tx,
  77. },
  78. }
  79. }
  80. func (op *Operations) AutoMigrate(tables ...Table) error {
  81. tx := op.db.Begin()
  82. for _, table := range tables {
  83. dbModel, err := table.ToDBModel()
  84. if err != nil {
  85. tx.Rollback()
  86. return err
  87. }
  88. err = tx.Table(table.TableName).AutoMigrate(dbModel)
  89. if err != nil {
  90. tx.Rollback()
  91. return err
  92. }
  93. }
  94. tx.Commit()
  95. return nil
  96. }
  97. func (op *Operations) ExecuteRawSql(sql string, executeParams map[string]any) ([]sql.Result, error) {
  98. parsedSql, err := template.ParseTemplateStringToString(sql, executeParams)
  99. if err != nil {
  100. return nil, err
  101. }
  102. tableRows := make([]map[string]any, 0)
  103. err = op.db.Raw(parsedSql).Scan(&tableRows).Error
  104. if err != nil {
  105. return nil, err
  106. }
  107. // 简化一下类型体系,与DataService保持一致
  108. jsonTableRows, err := json.Marshal(tableRows)
  109. if err != nil {
  110. return nil, err
  111. }
  112. err = json.Unmarshal(jsonTableRows, &tableRows)
  113. if err != nil {
  114. return nil, err
  115. }
  116. results := make([]sql.Result, len(tableRows))
  117. for i, row := range tableRows {
  118. results[i] = row
  119. }
  120. return results, nil
  121. }
  122. func (op *Operations) ExecuteSql(sql string, _ map[string]any) ([]sql.Result, error) {
  123. return make([]sql.Result, 0), nil
  124. }