6 Commits 3aefcad5b6 ... 0e65a0444f

Author SHA1 Message Date
  yjp 0e65a0444f 修改任务的bug 1 month ago
  yjp d2f369bd81 修改bug 1 month ago
  yjp 91dfb3921a 完成查询接口 1 month ago
  yjp da1c50ea25 添加停止任务接口 1 month ago
  yjp fca76369ec 添加任务领域 1 month ago
  yjp 99a2b428ab 添加任务领域 1 month ago

+ 39 - 0
convenient/data_containers/task_runner/task.yaml

@@ -0,0 +1,39 @@
+kind: DataContainer
+spec:
+  namespace: # 替换
+  data_source: # 替换
+  name: # 替换.sql_execute_logs
+  spec:
+    table_name: # 替换.sql_execute_logs
+    columns:
+      - name: id
+        type: varchar(32)
+        comment: id
+        primary_key: true
+      - name: group
+        type: varchar(256)
+        comment: 任务组
+        index: true
+      - name: context
+        type: text
+        comment: 上下文
+      - name: status_code
+        type: integer
+        comment: 状态码
+        index: true
+      - name: err_msg
+        type: text
+        comment: 错误信息
+      - name: create_user_id
+        type: varchar(32)
+        comment: 创建者ID
+        index: true
+        not_null: true
+      - name: created_time
+        type: "timestamp with time zone"
+        comment: 创建时间
+        not_null: true
+      - name: last_updated_time
+        type: "timestamp with time zone"
+        comment: 最近更新时间
+        not_null: true

+ 7 - 0
convenient/domain/task_runner/runner.go

@@ -0,0 +1,7 @@
+package task_runner
+
+type Runner interface {
+	Run(taskID string, ctx map[string]any) (bool, error)
+	Restart(taskID string, ctx map[string]any) (bool, error)
+	Stop(taskID string, ctx map[string]any) error
+}

+ 93 - 0
convenient/domain/task_runner/simple.go

@@ -0,0 +1,93 @@
+package task_runner
+
+import (
+	"git.sxidc.com/go-framework/baize/convenient/domain/task_runner/task"
+	"git.sxidc.com/go-framework/baize/framework/binding"
+	"git.sxidc.com/go-framework/baize/framework/core/api"
+	"git.sxidc.com/go-framework/baize/framework/core/api/request"
+	"git.sxidc.com/go-framework/baize/framework/core/api/response"
+	"git.sxidc.com/go-framework/baize/framework/core/application"
+	"git.sxidc.com/go-framework/baize/framework/core/domain"
+	"git.sxidc.com/go-framework/baize/framework/core/domain/entity"
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure"
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/database"
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/database/sql"
+	"git.sxidc.com/go-tools/utils/strutils"
+)
+
+// Simple Bind参数
+type Simple struct {
+	// schema
+	Schema string
+}
+
+func (simple *Simple) bind(binder *binding.Binder) {
+	binding.GetBind(binder, &binding.SimpleBindItem[response.InfosData[task.Info]]{
+		Path:             "/task/get",
+		SendResponseFunc: response.SendInfosResponse[task.Info],
+		RequestParams:    &task.GetTaskQueryParams{},
+		Objects:          []domain.Object{&task.Entity{}},
+		ServiceFunc: func(c *api.Context, params request.Params, objects []domain.Object, i *infrastructure.Infrastructure) (response.InfosData[task.Info], error) {
+			errResponse := response.InfosData[task.Info]{
+				Infos:      make([]task.Info, 0),
+				TotalCount: 0,
+				PageNo:     0,
+			}
+
+			queryParams, err := request.ToConcrete[*task.GetTaskQueryParams](params)
+			if err != nil {
+				return errResponse, err
+			}
+
+			conditions := sql.NewConditions()
+
+			if strutils.IsStringNotEmpty(queryParams.Group) {
+				conditions.Equal(task.ColumnGroup, queryParams.Group)
+			}
+
+			if strutils.IsStringNotEmpty(queryParams.Status) {
+				e := task.Entity{Status: queryParams.Status}
+				err := e.TransferStatus2StatusCode()
+				if err != nil {
+					return errResponse, err
+				}
+
+				conditions.Equal(task.ColumnStatusCode, e.StatusCode)
+			}
+
+			if strutils.IsStringNotEmpty(queryParams.StartCreatedTime) {
+				conditions.GreaterThanAndEqual(entity.ColumnCreatedTime, queryParams.StartCreatedTime)
+			}
+
+			if strutils.IsStringNotEmpty(queryParams.EndCreatedTime) {
+				conditions.LessThanAndEqual(entity.ColumnCreatedTime, queryParams.EndCreatedTime)
+			}
+
+			results, totalCount, err := database.Query(i.DBExecutor(), &sql.QueryExecuteParams{
+				TableName:  domain.TableName(simple.Schema, &task.Entity{}),
+				Conditions: conditions,
+				PageNo:     queryParams.PageNo,
+				PageSize:   queryParams.PageSize,
+			})
+			if err != nil {
+				return errResponse, err
+			}
+
+			infos, err := task.FormInfos(results)
+			if err != nil {
+				return errResponse, err
+			}
+
+			return response.InfosData[task.Info]{
+				Infos:      infos,
+				TotalCount: totalCount,
+				PageNo:     queryParams.PageNo,
+			}, nil
+		},
+	})
+}
+
+func Bind(app *application.App, simple *Simple) {
+	binder := binding.NewBinder(app.Api().ChooseRouter(api.RouterPrefix, ""), app.Infrastructure())
+	simple.bind(binder)
+}

+ 165 - 0
convenient/domain/task_runner/task/entity.go

@@ -0,0 +1,165 @@
+package task
+
+import (
+	"encoding/json"
+	"git.sxidc.com/go-framework/baize/framework/core/domain"
+	"git.sxidc.com/go-framework/baize/framework/core/domain/entity"
+	"github.com/pkg/errors"
+)
+
+const (
+	StatusCodeCreated = iota + 1
+	StatusCodeRunning
+	StatusCodeCompleted
+	StatusCodeStop
+	StatusCodeError
+)
+
+const (
+	statusCreated   = "已创建"
+	statusRunning   = "运行中"
+	statusCompleted = "已完成"
+	statusStop      = "已停止"
+	statusError     = "错误"
+)
+
+var (
+	statusMap = map[int]string{
+		StatusCodeCreated:   statusCreated,
+		StatusCodeRunning:   statusRunning,
+		StatusCodeCompleted: statusCompleted,
+		StatusCodeStop:      statusStop,
+		StatusCodeError:     statusError,
+	}
+
+	statusCodeMap = map[string]int{
+		statusCreated:   StatusCodeCreated,
+		statusRunning:   StatusCodeRunning,
+		statusCompleted: StatusCodeCompleted,
+		statusStop:      StatusCodeStop,
+		statusError:     StatusCodeError,
+	}
+)
+
+const (
+	FieldGroup      = "Group"
+	FieldContext    = "Context"
+	FieldStatus     = "Status"
+	FieldStatusCode = "StatusCode"
+	FieldErrMsg     = "ErrMsg"
+)
+
+var (
+	ColumnGroup      = domain.ColumnName(FieldGroup)
+	ColumnContext    = domain.ColumnName(FieldContext)
+	ColumnStatusCode = domain.ColumnName(FieldStatusCode)
+	ColumnErrMsg     = domain.ColumnName(FieldErrMsg)
+)
+
+var fieldMap = map[string]string{
+	FieldGroup:   "组",
+	FieldContext: "上下文",
+	FieldStatus:  "状态",
+	FieldErrMsg:  "错误信息",
+}
+
+type Entity struct {
+	entity.Base
+	Group        string `sqlmapping:"column:group;notUpdate;" sqlresult:"column:group;" check:"required,lte=256,when=run/restart"`
+	Context      string `sqlmapping:"column:context;notUpdate;" sqlresult:"column:context;"`
+	StatusCode   int    `sqlmapping:"column:status_code;" sqlresult:"column:status_code;" check:"required,when=run/restart"`
+	Status       string `sqlmapping:"-" sqlresult:"-"`
+	ErrMsg       string `sqlmapping:"column:err_msg;" sqlresult:"column:err_msg;"`
+	CreateUserID string `sqlmapping:"column:create_user_id;" sqlresult:"column:create_user_id;" check:"required,len=32,when=run/restart"`
+	entity.TimeFields
+}
+
+func (e *Entity) DomainCNName() string {
+	return "任务"
+}
+
+func (e *Entity) DomainCamelName() string {
+	return "Task"
+}
+
+func (e *Entity) GetFieldMap() map[string]string {
+	return fieldMap
+}
+
+func (e *Entity) GetMapContext() (map[string]any, error) {
+	ctx := make(map[string]any)
+	err := json.Unmarshal([]byte(e.Context), &ctx)
+	if err != nil {
+		return nil, err
+	}
+
+	return ctx, nil
+}
+
+func (e *Entity) SetStatusCreated() {
+	e.StatusCode = StatusCodeCreated
+	e.Status = statusCreated
+}
+
+func (e *Entity) SetStatusRunning() {
+	e.StatusCode = StatusCodeRunning
+	e.Status = statusRunning
+}
+
+func (e *Entity) SetStatusCompleted() {
+	e.StatusCode = StatusCodeCompleted
+	e.Status = statusCompleted
+}
+
+func (e *Entity) SetStatusStop() {
+	e.StatusCode = StatusCodeStop
+	e.Status = statusStop
+}
+
+func (e *Entity) SetStatusError(errMsg string) {
+	e.StatusCode = StatusCodeError
+	e.Status = statusError
+	e.ErrMsg = errMsg
+}
+
+func (e *Entity) IsStatusCreated() bool {
+	return e.StatusCode == StatusCodeCreated
+}
+
+func (e *Entity) IsStatusRunning() bool {
+	return e.StatusCode == StatusCodeRunning
+}
+
+func (e *Entity) IsStatusCompleted() bool {
+	return e.StatusCode == StatusCodeCompleted
+}
+
+func (e *Entity) IsStatusStop() bool {
+	return e.StatusCode == StatusCodeStop
+}
+
+func (e *Entity) IsStatusError() bool {
+	return e.StatusCode == StatusCodeError
+}
+
+func (e *Entity) TransferStatus2StatusCode() error {
+	statusCode, ok := statusCodeMap[e.Status]
+	if !ok {
+		return errors.Errorf("状态对应的状态码不存在: %v\n", e.Status)
+	}
+
+	e.StatusCode = statusCode
+
+	return nil
+}
+
+func (e *Entity) TransferStatusCode2Status() error {
+	status, ok := statusMap[e.StatusCode]
+	if !ok {
+		return errors.Errorf("状态码对应的状态不存在: %v", e.StatusCode)
+	}
+
+	e.Status = status
+
+	return nil
+}

+ 61 - 0
convenient/domain/task_runner/task/info.go

@@ -0,0 +1,61 @@
+package task
+
+import (
+	"git.sxidc.com/go-framework/baize/framework/core/application"
+	"git.sxidc.com/go-framework/baize/framework/core/domain/entity"
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/database/sql"
+	"github.com/pkg/errors"
+)
+
+type Info struct {
+	application.InfoIDField
+	Group        string `sqlresult:"column:group;"`
+	Context      string `sqlresult:"column:context;"`
+	Status       string `sqlresult:"-"`
+	ErrMsg       string `sqlresult:"column:err_msg;"`
+	CreateUserID string `json:"createUserId" sqlresult:"column:create_user_id;"`
+	application.InfoTimeFields
+}
+
+func FormInfo(result *sql.Result) (*Info, error) {
+	if result == nil {
+		return new(Info), nil
+	}
+
+	statusCode := result.ColumnValueInt(ColumnStatusCode)
+	statusStr, ok := statusMap[statusCode]
+	if !ok {
+		return nil, errors.Errorf("无效的状态码: %v\n", statusCode)
+	}
+
+	return &Info{
+		InfoIDField:  application.InfoIDField{ID: result.ColumnValueString(entity.ColumnID)},
+		Group:        result.ColumnValueString(ColumnGroup),
+		Context:      result.ColumnValueString(ColumnContext),
+		Status:       statusStr,
+		ErrMsg:       result.ColumnValueString(ColumnErrMsg),
+		CreateUserID: result.ColumnValueString(entity.ColumnCreateUserID),
+		InfoTimeFields: application.InfoTimeFields{
+			CreatedTime:     result.ColumnValueTime(entity.ColumnCreatedTime).Format("2006-01-02 15:04:05"),
+			LastUpdatedTime: result.ColumnValueTime(entity.ColumnLastUpdatedTime).Format("2006-01-02 15:04:05"),
+		},
+	}, nil
+}
+
+func FormInfos(results []sql.Result) ([]Info, error) {
+	if results == nil || len(results) == 0 {
+		return make([]Info, 0), nil
+	}
+
+	infos := make([]Info, 0)
+	for _, result := range results {
+		info, err := FormInfo(&result)
+		if err != nil {
+			return nil, err
+		}
+
+		infos = append(infos, *info)
+	}
+
+	return infos, nil
+}

+ 12 - 0
convenient/domain/task_runner/task/request_params.go

@@ -0,0 +1,12 @@
+package task
+
+import "git.sxidc.com/go-framework/baize/framework/core/api/request"
+
+type GetTaskQueryParams struct {
+	request.BaseQueryParams
+	Group            string `form:"group" assign:"-"`
+	Status           string `form:"status" assign:"-"`
+	CreateUserID     string `form:"createUserId" assign:"-"`
+	StartCreatedTime string `form:"startCreatedTime" assign:"-"`
+	EndCreatedTime   string `form:"endCreatedTime" assign:"-"`
+}

+ 325 - 0
convenient/domain/task_runner/task_runner.go

@@ -0,0 +1,325 @@
+package task_runner
+
+import (
+	"encoding/json"
+	"git.sxidc.com/go-framework/baize/convenient/domain/task_runner/task"
+	"git.sxidc.com/go-framework/baize/framework/core/domain"
+	"git.sxidc.com/go-framework/baize/framework/core/domain/entity"
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/database"
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/database/sql"
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger"
+	"git.sxidc.com/go-tools/utils/strutils"
+	"github.com/pkg/errors"
+	"sync"
+)
+
+var runnerRegister = new(sync.Map)
+
+type RunTaskParams struct {
+	Group      string
+	OperatorID string
+	Context    map[string]any
+	DBSchema   string
+	DBExecutor database.Executor
+}
+
+func (params *RunTaskParams) check() error {
+	if strutils.IsStringEmpty(params.Group) {
+		return errors.New("没有传递任务组")
+	}
+
+	if strutils.IsStringEmpty(params.OperatorID) {
+		return errors.New("没有传递操作人ID")
+	}
+
+	if strutils.IsStringEmpty(params.DBSchema) {
+		return errors.New("没有传递数据库schema")
+	}
+
+	if params.DBExecutor == nil {
+		return errors.New("没有传递数据库执行器")
+	}
+
+	return nil
+}
+
+func RunTask(runner Runner, params *RunTaskParams) (string, error) {
+	err := params.check()
+	if err != nil {
+		return "", err
+	}
+
+	ctxJsonBytes, err := json.Marshal(params.Context)
+	if err != nil {
+		return "", err
+	}
+
+	ctxJsonStr := string(ctxJsonBytes)
+
+	// 创建任务,任务状态为已创建
+	taskEntity := &task.Entity{
+		Group:        params.Group,
+		Context:      ctxJsonStr,
+		CreateUserID: params.OperatorID,
+	}
+
+	err = createTaskDB(taskEntity, params.DBSchema, params.DBExecutor)
+	if err != nil {
+		return "", err
+	}
+
+	loaded, _ := runnerRegister.LoadOrStore(params.Group, runner)
+	loadedRunner := loaded.(Runner)
+
+	go runTask(taskEntity, loadedRunner, params.DBSchema, params.DBExecutor, func(ctx map[string]any, runner Runner) (bool, error) {
+		return runner.Run(taskEntity.ID, ctx)
+	})
+
+	return taskEntity.ID, nil
+}
+
+type RestartTaskParams struct {
+	Group      string
+	DBSchema   string
+	DBExecutor database.Executor
+}
+
+func (params *RestartTaskParams) check() error {
+	if strutils.IsStringEmpty(params.Group) {
+		return errors.New("没有传递任务组")
+	}
+
+	if strutils.IsStringEmpty(params.DBSchema) {
+		return errors.New("没有传递数据库schema")
+	}
+
+	if params.DBExecutor == nil {
+		return errors.New("没有传递数据库执行器")
+	}
+
+	return nil
+}
+
+func RestartTask(runner Runner, params *RestartTaskParams) error {
+	err := params.check()
+	if err != nil {
+		return err
+	}
+
+	runningResults, _, err := database.Query(params.DBExecutor, &sql.QueryExecuteParams{
+		TableName: domain.TableName(params.DBSchema, &task.Entity{}),
+		Conditions: sql.NewConditions().Equal(task.ColumnGroup, params.Group).
+			Equal(task.ColumnStatusCode, task.StatusCodeRunning),
+		PageNo:   0,
+		PageSize: 0,
+	})
+	if err != nil {
+		return err
+	}
+
+	taskEntities := make([]task.Entity, 0)
+	err = sql.ParseSqlResult(runningResults, &taskEntities)
+	if err != nil {
+		return err
+	}
+
+	for _, taskEntity := range taskEntities {
+		loaded, _ := runnerRegister.LoadOrStore(params.Group, runner)
+		loadedRunner := loaded.(Runner)
+
+		go runTask(&taskEntity, loadedRunner, params.DBSchema, params.DBExecutor, func(ctx map[string]any, runner Runner) (bool, error) {
+			return runner.Restart(taskEntity.ID, ctx)
+		})
+	}
+
+	return nil
+}
+
+type StopTaskParams struct {
+	ID         string
+	DBSchema   string
+	DBExecutor database.Executor
+}
+
+func (params *StopTaskParams) check() error {
+	if strutils.IsStringEmpty(params.ID) {
+		return errors.New("没有传递任务ID")
+	}
+
+	if strutils.IsStringEmpty(params.DBSchema) {
+		return errors.New("没有传递数据库schema")
+	}
+
+	if params.DBExecutor == nil {
+		return errors.New("没有传递数据库执行器")
+	}
+
+	return nil
+}
+func StopTask(params *StopTaskParams) error {
+	err := params.check()
+	if err != nil {
+		return err
+	}
+
+	result, err := database.QueryOne(params.DBExecutor, &sql.QueryOneExecuteParams{
+		TableName: domain.TableName(params.DBSchema, &task.Entity{}),
+		Conditions: sql.NewConditions().Equal(entity.ColumnID, params.ID).
+			Equal(task.ColumnStatusCode, task.StatusCodeRunning),
+	})
+	if err != nil {
+		if database.IsErrorDBRecordNotExist(err) {
+			return errors.New("任务不存在")
+		}
+
+		return err
+	}
+
+	taskEntity := new(task.Entity)
+	err = sql.ParseSqlResult(result, taskEntity)
+	if err != nil {
+		return err
+	}
+
+	ctx, err := taskEntity.GetMapContext()
+	if err != nil {
+		updateErr := updateTaskStatusErrorDB(taskEntity, err.Error(), params.DBSchema, params.DBExecutor)
+		if updateErr != nil {
+			return updateErr
+		}
+
+		return err
+	}
+
+	loaded, ok := runnerRegister.Load(taskEntity.Group)
+	if !ok {
+		return errors.New("没有找到任务对应的执行器")
+	}
+
+	loadedRunner := loaded.(Runner)
+
+	err = database.Transaction(params.DBExecutor, func(tx database.Executor) error {
+		err := updateTaskStatusStopDBTx(taskEntity, params.DBSchema, tx)
+		if err != nil {
+			return err
+		}
+
+		err = loadedRunner.Stop(taskEntity.ID, ctx)
+		if err != nil {
+			return err
+		}
+
+		return nil
+	})
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func createTaskDB(taskEntity *task.Entity, dbSchema string, dbExecutor database.Executor) error {
+	err := taskEntity.GenerateID()
+	if err != nil {
+		return err
+	}
+
+	taskEntity.SetStatusCreated()
+
+	err = domain.CheckWhen(taskEntity, taskEntity.GetFieldMap(), "run")
+	if err != nil {
+		return err
+	}
+
+	err = database.InsertEntity(dbExecutor, domain.TableName(dbSchema, taskEntity), taskEntity)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func updateTaskStatusRunningDB(taskEntity *task.Entity, dbSchema string, dbExecutor database.Executor) error {
+	taskEntity.SetStatusRunning()
+
+	err := database.UpdateEntity(dbExecutor, domain.TableName(dbSchema, taskEntity), taskEntity)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func updateTaskStatusCompleteDB(taskEntity *task.Entity, dbSchema string, dbExecutor database.Executor) error {
+	taskEntity.SetStatusCompleted()
+
+	err := database.UpdateEntity(dbExecutor, domain.TableName(dbSchema, taskEntity), taskEntity)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func updateTaskStatusStopDBTx(taskEntity *task.Entity, dbSchema string, tx database.Executor) error {
+	taskEntity.SetStatusStop()
+
+	err := database.UpdateEntity(tx, domain.TableName(dbSchema, taskEntity), taskEntity)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func updateTaskStatusErrorDB(taskEntity *task.Entity, errMsg string, dbSchema string, dbExecutor database.Executor) error {
+	taskEntity.SetStatusError(errMsg)
+
+	err := database.UpdateEntity(dbExecutor, domain.TableName(dbSchema, taskEntity), taskEntity)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func runTask(taskEntity *task.Entity, runner Runner, dbSchema string, dbExecutor database.Executor, executeFunc func(ctx map[string]any, runner Runner) (bool, error)) {
+	ctx, err := taskEntity.GetMapContext()
+	if err != nil {
+		err = updateTaskStatusErrorDB(taskEntity, err.Error(), dbSchema, dbExecutor)
+		if err != nil {
+			logger.GetInstance().Error("解析任务上下文失败 ", err)
+			return
+		}
+
+		return
+	}
+
+	err = updateTaskStatusRunningDB(taskEntity, dbSchema, dbExecutor)
+	if err != nil {
+		logger.GetInstance().Error("更新任务运行状态失败 ", err)
+		return
+	}
+
+	// 执行任务
+	stopped, err := executeFunc(ctx, runner)
+	if err != nil {
+		err = updateTaskStatusErrorDB(taskEntity, err.Error(), dbSchema, dbExecutor)
+		if err != nil {
+			logger.GetInstance().Error("更新任务错误状态失败 ", err)
+			return
+		}
+
+		return
+	}
+
+	if stopped {
+		return
+	}
+
+	err = updateTaskStatusCompleteDB(taskEntity, dbSchema, dbExecutor)
+	if err != nil {
+		logger.GetInstance().Error("更新任务完成状态失败 ", err)
+		return
+	}
+}

+ 237 - 0
convenient/domain/task_runner/task_runner_test.go

@@ -0,0 +1,237 @@
+package task_runner
+
+import (
+	"fmt"
+	"git.sxidc.com/go-framework/baize"
+	"git.sxidc.com/go-framework/baize/framework/binding"
+	"git.sxidc.com/go-framework/baize/framework/core/api"
+	"git.sxidc.com/go-framework/baize/framework/core/api/request"
+	"git.sxidc.com/go-framework/baize/framework/core/api/response"
+	"git.sxidc.com/go-framework/baize/framework/core/application"
+	"git.sxidc.com/go-framework/baize/framework/core/domain"
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure"
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/database/operations"
+	"git.sxidc.com/go-tools/utils/strutils"
+	"sync"
+	"testing"
+	"time"
+)
+
+// curl -X POST -H "Content-Type: application/json" -d '{"message": "World"}' "http://localhost:30001/test/api/v1/task/start"
+// curl -X POST -H "Content-Type: application/json" -d '{"id": "0f89bc3ac6364280a535d86ce57a561b"}' "http://localhost:30001/test/api/v1/task/stop"
+// curl -X GET "http://localhost:30001/test/api/task/get?group=test&pageNo=0&pageSize=0"
+
+type StartTaskJsonBody struct {
+	Message string `json:"message"`
+}
+
+type StopTaskJsonBody struct {
+	ID string `json:"id"`
+}
+
+type TestRunner struct {
+	doneChanMap      map[string]chan any
+	doneChanMapMutex *sync.Mutex
+}
+
+func NewTestRunner() *TestRunner {
+	return &TestRunner{
+		doneChanMap:      make(map[string]chan any),
+		doneChanMapMutex: new(sync.Mutex),
+	}
+}
+
+func DestroyTestRunner(runner *TestRunner) {
+	if runner == nil {
+		return
+	}
+
+	runner.doneChanMapMutex.Lock()
+	defer runner.doneChanMapMutex.Unlock()
+
+	for taskID, doneChan := range runner.doneChanMap {
+		doneChan <- nil
+		close(doneChan)
+		delete(runner.doneChanMap, taskID)
+	}
+}
+
+func (runner *TestRunner) Run(taskID string, ctx map[string]any) (bool, error) {
+	messageCtx, ok := ctx["message"]
+	if !ok {
+		return false, fmt.Errorf("message not found")
+	}
+
+	message := messageCtx.(string)
+
+	doneChan := make(chan any)
+	runner.doneChanMapMutex.Lock()
+	runner.doneChanMap[taskID] = doneChan
+	runner.doneChanMapMutex.Unlock()
+
+	ticker := time.NewTicker(time.Second * 1)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case <-ticker.C:
+			fmt.Println("Hello " + message)
+		case <-doneChan:
+			return true, nil
+		}
+	}
+}
+
+func (runner *TestRunner) Restart(taskID string, ctx map[string]any) (bool, error) {
+	messageCtx, ok := ctx["message"]
+	if !ok {
+		return false, fmt.Errorf("message not found")
+	}
+
+	message := messageCtx.(string)
+
+	doneChan := make(chan any)
+	runner.doneChanMapMutex.Lock()
+	runner.doneChanMap[taskID] = doneChan
+	runner.doneChanMapMutex.Unlock()
+
+	ticker := time.NewTicker(time.Second * 1)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case <-ticker.C:
+			fmt.Println("Hello " + message)
+		case <-doneChan:
+			return true, nil
+		}
+	}
+}
+
+func (runner *TestRunner) Stop(taskID string, _ map[string]any) error {
+	runner.doneChanMapMutex.Lock()
+
+	doneChan, ok := runner.doneChanMap[taskID]
+	if !ok {
+		runner.doneChanMapMutex.Unlock()
+		return fmt.Errorf("task not found")
+	}
+
+	doneChan <- nil
+	delete(runner.doneChanMap, taskID)
+	close(doneChan)
+	doneChan = nil
+
+	runner.doneChanMapMutex.Unlock()
+
+	return nil
+}
+
+const (
+	taskGroup = "test"
+	dbSchema  = "test"
+)
+
+func TestTaskRunner(t *testing.T) {
+	appInstance := baize.NewApplication(application.Config{
+		ApiConfig: application.ApiConfig{
+			UrlPrefix: "/test/api",
+			Port:      "30001",
+		},
+		InfrastructureConfig: application.InfrastructureConfig{
+			Database: infrastructure.DatabaseConfig{
+				Operations: &operations.Config{
+					UserName:           "test",
+					Password:           "123456",
+					Address:            "10.0.0.84",
+					Port:               "30432",
+					Database:           "test",
+					MaxConnections:     20,
+					MaxIdleConnections: 5,
+				},
+			},
+		},
+	})
+
+	appInstance.Api().PrefixRouter().RegisterVersionedRouter("v1")
+
+	runner := NewTestRunner()
+	defer DestroyTestRunner(runner)
+
+	err := RestartTask(runner, &RestartTaskParams{
+		Group:      taskGroup,
+		DBSchema:   dbSchema,
+		DBExecutor: appInstance.Infrastructure().DBExecutor(),
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	Bind(appInstance, &Simple{
+		Schema: dbSchema,
+	})
+
+	v1Binder := binding.NewBinder(appInstance.ChooseRouter(api.RouterPrefix, "v1"), appInstance.Infrastructure())
+
+	binding.PostBind(v1Binder, &binding.SimpleBindItem[string]{
+		Path:             "/task/start",
+		SendResponseFunc: response.SendIDResponse,
+		RequestParams:    &StartTaskJsonBody{},
+		ServiceFunc: func(c *api.Context, params request.Params, objects []domain.Object, i *infrastructure.Infrastructure) (string, error) {
+			jsonBody, err := request.ToConcrete[*StartTaskJsonBody](params)
+			if err != nil {
+				return "", err
+			}
+
+			taskID, err := RunTask(runner, &RunTaskParams{
+				Group:      taskGroup,
+				OperatorID: strutils.SimpleUUID(),
+				Context: map[string]any{
+					"message": jsonBody.Message,
+				},
+				DBSchema:   dbSchema,
+				DBExecutor: i.DBExecutor(),
+			})
+			if err != nil {
+				return "", err
+			}
+
+			return taskID, nil
+		},
+	})
+
+	binding.PostBind(v1Binder, &binding.SimpleBindItem[any]{
+		Path:             "/task/stop",
+		SendResponseFunc: response.SendMsgResponse,
+		RequestParams:    &StopTaskJsonBody{},
+		ServiceFunc: func(c *api.Context, params request.Params, objects []domain.Object, i *infrastructure.Infrastructure) (any, error) {
+			jsonBody, err := request.ToConcrete[*StopTaskJsonBody](params)
+			if err != nil {
+				return nil, err
+			}
+
+			err = StopTask(&StopTaskParams{
+				ID:         jsonBody.ID,
+				DBSchema:   dbSchema,
+				DBExecutor: i.DBExecutor(),
+			})
+			if err != nil {
+				return nil, err
+			}
+
+			return nil, nil
+		},
+	})
+
+	err = appInstance.Start()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	defer func(appInstance *application.App) {
+		err := appInstance.Finish()
+		if err != nil {
+			t.Fatal(err)
+		}
+	}(appInstance)
+}