package database import ( "git.sxidc.com/go-framework/baize/framework/core/infrastructure/database/operations" "git.sxidc.com/go-framework/baize/framework/core/infrastructure/database/sql" "git.sxidc.com/go-framework/baize/framework/core/tag/sql/sql_mapping" "git.sxidc.com/go-tools/utils/reflectutils" "git.sxidc.com/go-tools/utils/strutils" "github.com/pkg/errors" "reflect" "strings" "time" ) // Executor 数据库基础设施接口 type Executor interface { // ExecuteRawSql SQL执行接口 // 参数: // - sql: SQL语句,可以使用预编译,需要填充的值用?占位 // - values: 预编译填充值 // 返回值: // - SQL执行结果 // - 错误 ExecuteRawSql(sql string, args ...any) ([]sql.Result, error) // ExecuteRawSqlTemplate SQL模板执行接口 // 参数: // - sql: SQL语句模板,可以使用预编译,需要填充的值用?占位,可以使用Go模板构造SQL语句 // - template: 渲染SQL语句模板的模板参数 // - values: 预编译填充值 // 返回值: // - SQL执行结果 // - 错误 ExecuteRawSqlTemplate(sql string, template map[string]any, args ...any) ([]sql.Result, error) // ExecuteRawSqlWithRowsAffected SQL执行接口,返回影响行数 // 参数: // - sql: SQL语句,可以使用预编译,需要填充的值用?占位 // - values: 预编译填充值 // 返回值: // - SQL执行结果 // - 影响行数 // - 错误 ExecuteRawSqlWithRowsAffected(sql string, args ...any) ([]sql.Result, int64, error) // ExecuteRawSqlTemplateWithRowsAffected SQL模板执行接口,返回影响行数 // 参数: // - sql: SQL语句模板,可以使用预编译,需要填充的值用?占位,可以使用Go模板构造SQL语句 // - template: 渲染SQL语句模板的模板参数 // - values: 预编译填充值 // 返回值: // - SQL执行结果 // - 影响行数 // - 错误 ExecuteRawSqlTemplateWithRowsAffected(sql string, template map[string]any, args ...any) ([]sql.Result, int64, error) } const ( createdTimeFieldName = "CreatedTime" lastUpdatedTimeFieldName = "LastUpdatedTime" ) // Transaction 执行事务 // 参数: // - executor: 数据库基础设施接口 // - txFunc: 事务函数 // 返回值: // - 错误 func Transaction(executor Executor, txFunc func(tx Executor) error) (err error) { if executor == nil { return nil } if txFunc == nil { return nil } switch e := executor.(type) { case *operations.Operations: tx := e.BeginTransaction() defer func() { if r := recover(); r != nil { tx.RollbackTransaction() err = errors.Errorf("panic: %v", r) } }() err = txFunc(tx) if err != nil { tx.RollbackTransaction() return err } tx.CommitTransaction() default: return nil } return nil } // InsertEntity 通过结构插入数据 // 参数: // - executor: 数据库基础设施接口 // - tableName: 表名 // - es: 结构或结构slice(批量插入),结构字段需要使用sqlmapping标注 // 返回值: // - 错误 func InsertEntity(executor Executor, tableName string, es any) error { if executor == nil { return errors.New("没有传递执行器") } if strutils.IsStringEmpty(tableName) { return errors.New("没有传递表名") } if es == nil { return nil } entityType := reflect.TypeOf(es) entityElemType := reflectutils.PointerTypeElem(entityType) if entityElemType.Kind() == reflect.Struct { return insertEntitySingle(executor, tableName, es) } else if entityElemType.Kind() == reflect.Slice { return insertEntityBatch(executor, tableName, es) } else { return errors.New("实体可以是结构,结构指针,结构Slice,结构指针的Slice或Slice的指针") } } func insertEntitySingle(executor Executor, tableName string, e any) error { fields, err := sql_mapping.DefaultUsage(e) if err != nil { return err } executeParams := sql.InsertExecuteParams{ TableName: tableName, TableRow: formInsertTableRow(fields, time.Now().Local()), } executeParamsMap, err := executeParams.Map() if err != nil { return err } _, err = executor.ExecuteRawSqlTemplate(sql.InsertTpl, executeParamsMap, executeParams.TableRow.Values()...) if err != nil { if strings.Contains(err.Error(), "SQLSTATE 23505") { return ErrDBRecordHasExist } return err } return nil } func insertEntityBatch(executor Executor, tableName string, es any) error { now := time.Now().Local() tableRowBatch := make([]sql.TableRow, 0) entitiesValue := reflectutils.PointerValueElem(reflect.ValueOf(es)) for i := 0; i < entitiesValue.Len(); i++ { entityValue := entitiesValue.Index(i) if !entityValue.IsValid() || entityValue.IsZero() { continue } e := entityValue.Interface() entityType := reflect.TypeOf(e) if !reflectutils.IsTypeStructOrStructPointer(entityType) { return errors.New("实体参数不是结构或结构指针") } fields, err := sql_mapping.DefaultUsage(e) if err != nil { return err } tableRowBatch = append(tableRowBatch, *formInsertTableRow(fields, now)) } executeParams := sql.InsertBatchExecuteParams{ TableName: tableName, TableRowBatch: tableRowBatch, } executeParamsMap, err := executeParams.Map() if err != nil { return err } values := make([]any, 0) for _, tableRow := range executeParams.TableRowBatch { values = append(values, tableRow.Values()...) } _, err = executor.ExecuteRawSqlTemplate(sql.InsertTpl, executeParamsMap, values...) if err != nil { if strings.Contains(err.Error(), "SQLSTATE 23505") { return ErrDBRecordHasExist } return err } return nil } func formInsertTableRow(fields []sql_mapping.Field, createTime time.Time) *sql.TableRow { tableRow := sql.NewTableRow() for _, field := range fields { fieldValue := reflect.ValueOf(field.Value) if (field.FieldName == createdTimeFieldName || field.FieldName == lastUpdatedTimeFieldName) && reflectutils.IsValueTime(fieldValue) && fieldValue.IsZero() { field.Value = createTime } if field.FieldName != createdTimeFieldName && field.FieldName != lastUpdatedTimeFieldName && reflectutils.IsValueTime(fieldValue) && fieldValue.IsZero() { field.Value = nil } tableRow.Add(field.ColumnName, field.Value) } return tableRow } // DeleteEntity 通过结构删除数据 // 参数: // - executor: 数据库基础设施接口 // - tableName: 表名 // - e: 结构,结构字段需要使用sqlmapping标注 // 返回值: // - 错误 func DeleteEntity(executor Executor, tableName string, e any) error { if executor == nil { return errors.New("没有传递执行器") } if strutils.IsStringEmpty(tableName) { return errors.New("没有传递表名") } if e == nil { return nil } entityType := reflect.TypeOf(e) if !reflectutils.IsTypeStructOrStructPointer(entityType) { return errors.New("实体参数不是结构或结构指针") } fields, err := sql_mapping.DefaultUsage(e) if err != nil { return err } conditions := sql.NewConditions() for _, field := range fields { // 不是键,字段跳过 if !field.IsKey { continue } conditions.Equal(field.ColumnName, field.Value) } executeParams := sql.DeleteExecuteParams{ TableName: tableName, Conditions: conditions, } executeParamsMap, err := executeParams.Map() if err != nil { return err } _, err = executor.ExecuteRawSqlTemplate(sql.DeleteTpl, executeParamsMap, executeParams.Conditions.Args()...) if err != nil { return err } return nil } // UpdateEntity 通过结构更新数据 // 参数: // - executor: 数据库基础设施接口 // - tableName: 表名 // - e: 结构,结构字段需要使用sqlmapping标注 // 返回值: // - 错误 func UpdateEntity(executor Executor, tableName string, e any) error { _, err := UpdateEntityWithRowsAffected(executor, tableName, e) if err != nil { return err } return nil } // UpdateEntityWithRowsAffected 通过结构更新数据,返回影响行数 // 参数: // - executor: 数据库基础设施接口 // - tableName: 表名 // - e: 结构,结构字段需要使用sqlmapping标注 // 返回值: // - 影响行数 // - 错误 func UpdateEntityWithRowsAffected(executor Executor, tableName string, e any) (int64, error) { if executor == nil { return 0, errors.New("没有传递执行器") } if strutils.IsStringEmpty(tableName) { return 0, errors.New("没有传递表名") } if e == nil { return 0, nil } entityType := reflect.TypeOf(e) if !reflectutils.IsTypeStructOrStructPointer(entityType) { return 0, errors.New("实体参数不是结构或结构指针") } fields, err := sql_mapping.DefaultUsage(e) if err != nil { return 0, err } now := time.Now().Local() tableRow := sql.NewTableRow() conditions := sql.NewConditions() for _, field := range fields { // 不是键字段 // 不是更新时间字段 // 不更新的字段或者字段为零值且不能清空,跳过 if !field.IsKey && field.FieldName != lastUpdatedTimeFieldName && (!field.CanUpdate || (reflect.ValueOf(field.Value).IsZero() && !field.CanUpdateClear)) { continue } fieldValue := reflect.ValueOf(field.Value) if field.FieldName == lastUpdatedTimeFieldName && reflectutils.IsValueTime(fieldValue) && fieldValue.IsZero() { field.Value = now } if field.FieldName != lastUpdatedTimeFieldName && reflectutils.IsValueTime(fieldValue) && fieldValue.IsZero() { field.Value = nil } if field.IsKey { conditions.Equal(field.ColumnName, field.Value) } else { if (field.Value == nil || reflect.ValueOf(field.Value).IsZero()) && !field.CanUpdateClear { continue } tableRow.Add(field.ColumnName, field.Value) } } executeParams := sql.UpdateExecuteParams{ TableName: tableName, TableRow: tableRow, Conditions: conditions, } executeParamsMap, err := executeParams.Map() if err != nil { return 0, err } args := make([]any, 0) args = append(args, executeParams.TableRow.Values()...) args = append(args, executeParams.Conditions.Args()...) _, rowsAffected, err := executor.ExecuteRawSqlTemplateWithRowsAffected(sql.UpdateTpl, executeParamsMap, args...) if err != nil { return 0, err } return rowsAffected, nil } // Insert 插入数据 // 参数: // - executor: 数据库基础设施接口 // - executeParams: 插入数据参数 // 返回值: // - 错误 func Insert(executor Executor, executeParams *sql.InsertExecuteParams) error { if executor == nil { return errors.New("没有传递执行器") } if executeParams == nil { return errors.New("没有传递执行参数") } executeParamsMap, err := executeParams.Map() if err != nil { return err } _, err = executor.ExecuteRawSqlTemplate(sql.InsertTpl, executeParamsMap, executeParams.TableRow.Values()...) if err != nil { return err } return nil } // InsertBatch 批量插入数据 // 参数: // - executor: 数据库基础设施接口 // - executeParams: 批量插入数据参数 // 返回值: // - 错误 func InsertBatch(executor Executor, executeParams *sql.InsertBatchExecuteParams) error { if executor == nil { return errors.New("没有传递执行器") } if executeParams == nil { return errors.New("没有传递执行参数") } executeParamsMap, err := executeParams.Map() if err != nil { return err } values := make([]any, 0) for _, tableRow := range executeParams.TableRowBatch { values = append(values, tableRow.Values()...) } _, err = executor.ExecuteRawSqlTemplate(sql.InsertTpl, executeParamsMap, values...) if err != nil { return err } return nil } // Delete 删除数据 // 参数: // - executor: 数据库基础设施接口 // - executeParams: 删除数据参数 // 返回值: // - 错误 func Delete(executor Executor, executeParams *sql.DeleteExecuteParams) error { if executor == nil { return errors.New("没有传递执行器") } if executeParams == nil { return errors.New("没有传递执行参数") } executeParamsMap, err := executeParams.Map() if err != nil { return err } _, err = executor.ExecuteRawSqlTemplate(sql.DeleteTpl, executeParamsMap, executeParams.Conditions.Args()...) if err != nil { return err } return nil } // Update 更新数据 // 参数: // - executor: 数据库基础设施接口 // - executeParams: 更新数据参数 // 返回值: // - 错误 func Update(executor Executor, executeParams *sql.UpdateExecuteParams) error { _, err := UpdateWithRowsAffected(executor, executeParams) if err != nil { return err } return nil } // UpdateWithRowsAffected 更新数据,返回影响行数 // 参数: // - executor: 数据库基础设施接口 // - executeParams: 更新数据参数 // 返回值: // - 影响行数 // - 错误 func UpdateWithRowsAffected(executor Executor, executeParams *sql.UpdateExecuteParams) (int64, error) { if executor == nil { return 0, errors.New("没有传递执行器") } if executeParams == nil { return 0, errors.New("没有传递执行参数") } if executeParams.Conditions == nil { executeParams.Conditions = sql.NewConditions() } executeParamsMap, err := executeParams.Map() if err != nil { return 0, err } args := make([]any, 0) args = append(args, executeParams.TableRow.Values()...) args = append(args, executeParams.Conditions.Args()...) _, rowsAffected, err := executor.ExecuteRawSqlTemplateWithRowsAffected(sql.UpdateTpl, executeParamsMap, args...) if err != nil { return 0, err } return rowsAffected, nil } // Query 查询数据 // 参数: // - executor: 数据库基础设施接口 // - executeParams: 查询数据参数 // 返回值: // - 查询结果 // - 总数 // - 错误 func Query(executor Executor, executeParams *sql.QueryExecuteParams) ([]sql.Result, int64, error) { if executor == nil { return nil, 0, errors.New("没有传递执行器") } if executeParams == nil { return nil, 0, errors.New("没有传递执行参数") } if executeParams.Conditions == nil { executeParams.Conditions = sql.NewConditions() } queryExecuteParamsMap, err := executeParams.Map() if err != nil { return nil, 0, err } countExecuteParams := sql.CountExecuteParams{ TableName: executeParams.TableName, Conditions: executeParams.Conditions, } countExecuteParamsMap, err := countExecuteParams.Map() if err != nil { return nil, 0, err } tableRows, err := executor.ExecuteRawSqlTemplate(sql.QueryTpl, queryExecuteParamsMap, executeParams.Conditions.Args()...) if err != nil { return nil, 0, err } countTableRow, err := executor.ExecuteRawSqlTemplate(sql.CountTpl, countExecuteParamsMap, countExecuteParams.Conditions.Args()...) if err != nil { return nil, 0, err } results := make([]sql.Result, len(tableRows)) for i, row := range tableRows { results[i] = row } totalCount, err := reflectutils.ToInt64(countTableRow[0]["count"]) if err != nil { return nil, 0, err } return results, totalCount, nil } // QueryOne 查询单条数据 // 参数: // - executor: 数据库基础设施接口 // - executeParams: 查询单条数据参数 // 返回值: // - 查询结果 // - 错误 func QueryOne(executor Executor, executeParams *sql.QueryOneExecuteParams) (sql.Result, error) { if executor == nil { return nil, errors.New("没有传递执行器") } if executeParams == nil { return nil, errors.New("没有传递执行参数") } if executeParams.Conditions == nil { executeParams.Conditions = sql.NewConditions() } executeParamsMap, err := executeParams.Map() if err != nil { return nil, err } tableRows, err := executor.ExecuteRawSqlTemplate(sql.QueryTpl, executeParamsMap, executeParams.Conditions.Args()...) if err != nil { return nil, err } if tableRows == nil || len(tableRows) == 0 { return nil, ErrDBRecordNotExist } return tableRows[0], nil } // Count 数据计数 // 参数: // - executor: 数据库基础设施接口 // - executeParams: 数据计数参数 // 返回值: // - 数量 // - 错误 func Count(executor Executor, executeParams *sql.CountExecuteParams) (int64, error) { if executor == nil { return 0, errors.New("没有传递执行器") } if executeParams == nil { return 0, errors.New("没有传递执行参数") } if executeParams.Conditions == nil { executeParams.Conditions = sql.NewConditions() } executeParamsMap, err := executeParams.Map() if err != nil { return 0, err } tableRows, err := executor.ExecuteRawSqlTemplate(sql.CountTpl, executeParamsMap, executeParams.Conditions.Args()...) if err != nil { return 0, err } count, err := reflectutils.ToInt64(tableRows[0]["count"]) if err != nil { return 0, err } return count, nil } // CheckExist 数据存在性检查 // 参数: // - executor: 数据库基础设施接口 // - executeParams: 数据存在性检查参数 // 返回值: // - 是否存在 // - 错误 func CheckExist(executor Executor, executeParams *sql.CheckExistExecuteParams) (bool, error) { if executor == nil { return false, errors.New("没有传递执行器") } if executeParams == nil { return false, errors.New("没有传递执行参数") } if executeParams.Conditions == nil { executeParams.Conditions = sql.NewConditions() } executeParamsMap, err := executeParams.Map() if err != nil { return false, err } tableRows, err := executor.ExecuteRawSqlTemplate(sql.CountTpl, executeParamsMap, executeParams.Conditions.Args()...) if err != nil { return false, err } count, err := reflectutils.ToInt64(tableRows[0]["count"]) if err != nil { return false, err } return count > 0, nil } // CheckHasOnlyOne 数据唯一性检查 // 参数: // - executor: 数据库基础设施接口 // - executeParams: 数据唯一性检查参数 // 返回值: // - 是否唯一 // - 错误 func CheckHasOnlyOne(executor Executor, executeParams *sql.CheckHasOnlyOneExecuteParams) (bool, error) { if executor == nil { return false, errors.New("没有传递执行器") } if executeParams == nil { return false, errors.New("没有传递执行参数") } if executeParams.Conditions == nil { executeParams.Conditions = sql.NewConditions() } executeParamsMap, err := executeParams.Map() if err != nil { return false, err } tableRows, err := executor.ExecuteRawSqlTemplate(sql.CountTpl, executeParamsMap, executeParams.Conditions.Args()...) if err != nil { return false, err } count, err := reflectutils.ToInt64(tableRows[0]["count"]) if err != nil { return false, err } return count == 1, nil } // ExecuteRawSql SQL执行接口 // 参数: // - executor: 数据库基础设施接口 // - sql: SQL语句,可以使用预编译,需要填充的值用?占位 // - args: 预编译填充值 // 返回值: // - SQL执行结果 // - 错误 func ExecuteRawSql(executor Executor, sql string, args ...any) ([]sql.Result, error) { return ExecuteRawSqlTemplate(executor, sql, nil, args...) } // ExecuteRawSqlTemplate SQL模板执行接口 // 参数: // - executor: 数据库基础设施接口 // - sql: SQL语句模板,可以使用预编译,需要填充的值用?占位,可以使用Go模板构造SQL语句 // - template: 渲染SQL语句模板的模板参数 // - args: 预编译填充值 // 返回值: // - SQL执行结果 // - 错误 func ExecuteRawSqlTemplate(executor Executor, sql string, executeParams map[string]any, args ...any) ([]sql.Result, error) { if executor == nil { return nil, errors.New("没有传递执行器") } if strutils.IsStringEmpty(sql) { return nil, errors.New("没有sql") } tableRows, err := executor.ExecuteRawSqlTemplate(sql, executeParams, args...) if err != nil { return nil, err } return tableRows, nil }