|
- package task_manager
- import (
- "encoding/json"
- "git.sxidc.com/go-framework/baize/convenient/domain/task_manager/task"
- "git.sxidc.com/go-framework/baize/framework/binding"
- "git.sxidc.com/go-framework/baize/framework/core/api"
- "git.sxidc.com/go-framework/baize/framework/core/application"
- "git.sxidc.com/go-framework/baize/framework/core/domain"
- "git.sxidc.com/go-framework/baize/framework/core/infrastructure/database"
- "git.sxidc.com/go-framework/baize/framework/core/infrastructure/database/sql"
- "git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger"
- "git.sxidc.com/go-tools/utils/strutils"
- "github.com/pkg/errors"
- )
- type RunTaskParams struct {
- Group string
- OperatorID string
- Context map[string]any
- DBSchema string
- DBExecutor database.Executor
- }
- func (params *RunTaskParams) check() error {
- if strutils.IsStringEmpty(params.Group) {
- return errors.New("没有传递任务组")
- }
- if strutils.IsStringEmpty(params.OperatorID) {
- return errors.New("没有传递操作人ID")
- }
- if strutils.IsStringEmpty(params.DBSchema) {
- return errors.New("没有传递数据库schema")
- }
- if params.DBExecutor == nil {
- return errors.New("没有传递数据库执行器")
- }
- return nil
- }
- func RunTask(runner task.Runner, params *RunTaskParams) error {
- ctxJsonBytes, err := json.Marshal(params.Context)
- if err != nil {
- return err
- }
- ctxJsonStr := string(ctxJsonBytes)
-
- taskEntity := &task.Entity{
- Group: params.Group,
- Context: ctxJsonStr,
- CreateUserID: params.OperatorID,
- }
- err = createTaskDB(taskEntity, params.DBSchema, params.DBExecutor)
- if err != nil {
- return err
- }
- go runTask(taskEntity, runner, params.DBSchema, params.DBExecutor, func(ctx map[string]any, runner task.Runner) error {
- return runner.Run(ctx)
- })
- return nil
- }
- type RestartTaskParams struct {
- Group string
- DBSchema string
- DBExecutor database.Executor
- }
- func (params *RestartTaskParams) check() error {
- if strutils.IsStringEmpty(params.Group) {
- return errors.New("没有传递任务组")
- }
- if strutils.IsStringEmpty(params.DBSchema) {
- return errors.New("没有传递数据库schema")
- }
- if params.DBExecutor == nil {
- return errors.New("没有传递数据库执行器")
- }
- return nil
- }
- func RestartTask(runner task.Runner, params *RestartTaskParams) error {
- runningResults, _, err := database.Query(params.DBExecutor, &sql.QueryExecuteParams{
- TableName: domain.TableName(params.DBSchema, &task.Entity{}),
- Conditions: sql.NewConditions().Equal(task.ColumnGroup, params.Group).
- Equal(task.ColumnStatus, task.StatusCodeRunning),
- PageNo: 0,
- PageSize: 0,
- })
- if err != nil {
- return err
- }
- taskEntities := make([]task.Entity, 0)
- err = sql.ParseSqlResult(runningResults, &taskEntities)
- if err != nil {
- return err
- }
- for _, taskEntity := range taskEntities {
- go runTask(&taskEntity, runner, params.DBSchema, params.DBExecutor, func(ctx map[string]any, runner task.Runner) error {
- return runner.Restart(ctx)
- })
- }
- return nil
- }
- func createTaskDB(taskEntity *task.Entity, dbSchema string, dbExecutor database.Executor) error {
- taskEntity.SetStatusCreated()
- err := domain.CheckWhen(taskEntity, taskEntity.GetFieldMap(), "run")
- if err != nil {
- return err
- }
- err = database.InsertEntity(dbExecutor, domain.TableName(dbSchema, taskEntity), taskEntity)
- if err != nil {
- return err
- }
- return nil
- }
- func updateTaskStatusRunningDB(taskEntity *task.Entity, dbSchema string, dbExecutor database.Executor) error {
- taskEntity.SetStatusRunning()
- err := database.UpdateEntity(dbExecutor, domain.TableName(dbSchema, taskEntity), taskEntity)
- if err != nil {
- return err
- }
- return nil
- }
- func updateTaskStatusCompleteDB(taskEntity *task.Entity, dbSchema string, dbExecutor database.Executor) error {
- taskEntity.SetStatusCompleted()
- err := database.UpdateEntity(dbExecutor, domain.TableName(dbSchema, taskEntity), taskEntity)
- if err != nil {
- return err
- }
- return nil
- }
- func updateTaskStatusErrorDB(taskEntity *task.Entity, errMsg string, dbSchema string, dbExecutor database.Executor) error {
- taskEntity.SetStatusError(errMsg)
- err := database.UpdateEntity(dbExecutor, domain.TableName(dbSchema, taskEntity), taskEntity)
- if err != nil {
- return err
- }
- return nil
- }
- func runTask(taskEntity *task.Entity, runner task.Runner, dbSchema string, dbExecutor database.Executor, executeFunc func(ctx map[string]any, runner task.Runner) error) {
- ctx := make(map[string]any)
- err := json.Unmarshal([]byte(taskEntity.Context), &ctx)
- if err != nil {
- err = updateTaskStatusErrorDB(taskEntity, err.Error(), dbSchema, dbExecutor)
- if err != nil {
- logger.GetInstance().Error("解析任务上下文失败 ", err)
- return
- }
- return
- }
- err = updateTaskStatusRunningDB(taskEntity, dbSchema, dbExecutor)
- if err != nil {
- logger.GetInstance().Error("更新任务运行状态失败 ", err)
- return
- }
-
- err = executeFunc(ctx, runner)
- if err != nil {
- err = updateTaskStatusErrorDB(taskEntity, err.Error(), dbSchema, dbExecutor)
- if err != nil {
- logger.GetInstance().Error("更新任务错误状态失败 ", err)
- return
- }
- return
- }
- err = updateTaskStatusCompleteDB(taskEntity, dbSchema, dbExecutor)
- if err != nil {
- logger.GetInstance().Error("更新任务完成状态失败 ", err)
- return
- }
- }
- type Simple struct {
-
- Schema string
- }
- func (simple *Simple) bind(binder *binding.Binder) {
-
- }
- func Bind(app *application.App, simple *Simple) {
- binder := binding.NewBinder(app.Api().ChooseRouter(api.RouterPrefix, ""), app.Infrastructure())
- simple.bind(binder)
- }
|