|
@@ -1,8 +1,8 @@
|
|
|
-package task_manager
|
|
|
+package task_runner
|
|
|
|
|
|
import (
|
|
|
"encoding/json"
|
|
|
- "git.sxidc.com/go-framework/baize/convenient/domain/task_manager/task"
|
|
|
+ "git.sxidc.com/go-framework/baize/convenient/domain/task_runner/task"
|
|
|
"git.sxidc.com/go-framework/baize/framework/core/domain"
|
|
|
"git.sxidc.com/go-framework/baize/framework/core/domain/entity"
|
|
|
"git.sxidc.com/go-framework/baize/framework/core/infrastructure/database"
|
|
@@ -43,7 +43,7 @@ func (params *RunTaskParams) check() error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func RunTask(runner task.Runner, params *RunTaskParams) (string, error) {
|
|
|
+func RunTask(runner Runner, params *RunTaskParams) (string, error) {
|
|
|
err := params.check()
|
|
|
if err != nil {
|
|
|
return "", err
|
|
@@ -69,9 +69,9 @@ func RunTask(runner task.Runner, params *RunTaskParams) (string, error) {
|
|
|
}
|
|
|
|
|
|
loaded, _ := runnerRegister.LoadOrStore(params.Group, runner)
|
|
|
- loadedRunner := loaded.(task.Runner)
|
|
|
+ loadedRunner := loaded.(Runner)
|
|
|
|
|
|
- go runTask(taskEntity, loadedRunner, params.DBSchema, params.DBExecutor, func(ctx map[string]any, runner task.Runner) error {
|
|
|
+ go runTask(taskEntity, loadedRunner, params.DBSchema, params.DBExecutor, func(ctx map[string]any, runner Runner) error {
|
|
|
return runner.Run(ctx)
|
|
|
})
|
|
|
|
|
@@ -100,7 +100,7 @@ func (params *RestartTaskParams) check() error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func RestartTask(runner task.Runner, params *RestartTaskParams) error {
|
|
|
+func RestartTask(runner Runner, params *RestartTaskParams) error {
|
|
|
err := params.check()
|
|
|
if err != nil {
|
|
|
return err
|
|
@@ -125,9 +125,9 @@ func RestartTask(runner task.Runner, params *RestartTaskParams) error {
|
|
|
|
|
|
for _, taskEntity := range taskEntities {
|
|
|
loaded, _ := runnerRegister.LoadOrStore(params.Group, runner)
|
|
|
- loadedRunner := loaded.(task.Runner)
|
|
|
+ loadedRunner := loaded.(Runner)
|
|
|
|
|
|
- go runTask(&taskEntity, loadedRunner, params.DBSchema, params.DBExecutor, func(ctx map[string]any, runner task.Runner) error {
|
|
|
+ go runTask(&taskEntity, loadedRunner, params.DBSchema, params.DBExecutor, func(ctx map[string]any, runner Runner) error {
|
|
|
return runner.Restart(ctx)
|
|
|
})
|
|
|
}
|
|
@@ -196,7 +196,7 @@ func StopTask(params *StopTaskParams) error {
|
|
|
return errors.New("没有找到任务对应的执行器")
|
|
|
}
|
|
|
|
|
|
- loadedRunner := loaded.(task.Runner)
|
|
|
+ loadedRunner := loaded.(Runner)
|
|
|
|
|
|
err = database.Transaction(params.DBExecutor, func(tx database.Executor) error {
|
|
|
err := updateTaskStatusStopDBTx(taskEntity, params.DBSchema, tx)
|
|
@@ -278,7 +278,7 @@ func updateTaskStatusErrorDB(taskEntity *task.Entity, errMsg string, dbSchema st
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func runTask(taskEntity *task.Entity, runner task.Runner, dbSchema string, dbExecutor database.Executor, executeFunc func(ctx map[string]any, runner task.Runner) error) {
|
|
|
+func runTask(taskEntity *task.Entity, runner Runner, dbSchema string, dbExecutor database.Executor, executeFunc func(ctx map[string]any, runner Runner) error) {
|
|
|
ctx, err := taskEntity.GetMapContext()
|
|
|
if err != nil {
|
|
|
err = updateTaskStatusErrorDB(taskEntity, err.Error(), dbSchema, dbExecutor)
|