Browse Source

添加任务领域

yjp 9 months ago
parent
commit
99a2b428ab

+ 140 - 0
convenient/domain/task_manager/task/entity.go

@@ -0,0 +1,140 @@
+package task
+
+import (
+	"git.sxidc.com/go-framework/baize/framework/core/domain"
+	"git.sxidc.com/go-framework/baize/framework/core/domain/entity"
+	"github.com/pkg/errors"
+)
+
+const (
+	StatusCodeCreated = iota + 1
+	StatusCodeRunning
+	StatusCodeCompleted
+	StatusCodeError
+)
+
+const (
+	statusCreated   = "已创建"
+	statusRunning   = "运行中"
+	statusCompleted = "已完成"
+	statusError     = "错误"
+)
+
+var (
+	statusMap = map[int]string{
+		StatusCodeCreated:   statusCreated,
+		StatusCodeRunning:   statusRunning,
+		StatusCodeCompleted: statusCompleted,
+		StatusCodeError:     statusError,
+	}
+
+	statusCodeMap = map[string]int{
+		statusCreated:   StatusCodeCreated,
+		statusRunning:   StatusCodeRunning,
+		statusCompleted: StatusCodeCompleted,
+		statusError:     StatusCodeError,
+	}
+)
+
+const (
+	FieldGroup   = "Group"
+	FieldContext = "Context"
+	FieldStatus  = "Status"
+	FieldErrMsg  = "ErrMsg"
+)
+
+var (
+	ColumnGroup   = domain.ColumnName(FieldGroup)
+	ColumnContext = domain.ColumnName(FieldContext)
+	ColumnStatus  = domain.ColumnName(FieldStatus)
+	ColumnErrMsg  = domain.ColumnName(FieldErrMsg)
+)
+
+var fieldMap = map[string]string{
+	FieldGroup:   "组",
+	FieldContext: "上下文",
+	FieldStatus:  "状态",
+	FieldErrMsg:  "错误信息",
+}
+
+type Entity struct {
+	entity.Base
+	Group        string `sqlmapping:"column:group;notUpdate;" sqlresult:"column:group;" check:"required,len=256,when=run/restart"`
+	Context      string `sqlmapping:"column:context;notUpdate;" sqlresult:"column:context;"`
+	StatusCode   int    `sqlmapping:"column:status_code;" sqlresult:"column:status_code;" check:"required,when=run/restart"`
+	Status       string `sqlmapping:"-" sqlresult:"-"`
+	ErrMsg       string `sqlmapping:"column:err_msg;" sqlresult:"column:err_msg;"`
+	CreateUserID string `sqlmapping:"column:create_user_id;" sqlresult:"column:create_user_id;" check:"required,len=32,when=run/restart"`
+	entity.TimeFields
+}
+
+func (e *Entity) DomainCNName() string {
+	return "任务"
+}
+
+func (e *Entity) DomainCamelName() string {
+	return "Task"
+}
+
+func (e *Entity) GetFieldMap() map[string]string {
+	return fieldMap
+}
+
+func (e *Entity) SetStatusCreated() {
+	e.StatusCode = StatusCodeCreated
+	e.Status = statusCreated
+}
+
+func (e *Entity) SetStatusRunning() {
+	e.StatusCode = StatusCodeRunning
+	e.Status = statusRunning
+}
+
+func (e *Entity) SetStatusCompleted() {
+	e.StatusCode = StatusCodeCompleted
+	e.Status = statusCompleted
+}
+
+func (e *Entity) SetStatusError(errMsg string) {
+	e.StatusCode = StatusCodeError
+	e.Status = statusError
+	e.ErrMsg = errMsg
+}
+
+func (e *Entity) IsStatusCreated() bool {
+	return e.StatusCode == StatusCodeCreated
+}
+
+func (e *Entity) IsStatusRunning() bool {
+	return e.StatusCode == StatusCodeRunning
+}
+
+func (e *Entity) IsStatusCompleted() bool {
+	return e.StatusCode == StatusCodeCompleted
+}
+
+func (e *Entity) IsStatusError() bool {
+	return e.StatusCode == StatusCodeError
+}
+
+func (e *Entity) TransferStatus2StatusCode() error {
+	statusCode, ok := statusCodeMap[e.Status]
+	if !ok {
+		return errors.New("状态对应的状态码不存在")
+	}
+
+	e.StatusCode = statusCode
+
+	return nil
+}
+
+func (e *Entity) TransferStatusCode2Status() error {
+	status, ok := statusMap[e.StatusCode]
+	if !ok {
+		return errors.New("状态码对应的状态不存在")
+	}
+
+	e.Status = status
+
+	return nil
+}

+ 6 - 0
convenient/domain/task_manager/task/runner.go

@@ -0,0 +1,6 @@
+package task
+
+type Runner interface {
+	Run(ctx map[string]any) error
+	Restart(ctx map[string]any) error
+}

+ 221 - 0
convenient/domain/task_manager/task_manager.go

@@ -0,0 +1,221 @@
+package task_manager
+
+import (
+	"encoding/json"
+	"git.sxidc.com/go-framework/baize/convenient/domain/task_manager/task"
+	"git.sxidc.com/go-framework/baize/framework/binding"
+	"git.sxidc.com/go-framework/baize/framework/core/api"
+	"git.sxidc.com/go-framework/baize/framework/core/application"
+	"git.sxidc.com/go-framework/baize/framework/core/domain"
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/database"
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/database/sql"
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger"
+	"git.sxidc.com/go-tools/utils/strutils"
+	"github.com/pkg/errors"
+)
+
+type RunTaskParams struct {
+	Group      string
+	OperatorID string
+	Context    map[string]any
+	DBSchema   string
+	DBExecutor database.Executor
+}
+
+func (params *RunTaskParams) check() error {
+	if strutils.IsStringEmpty(params.Group) {
+		return errors.New("没有传递任务组")
+	}
+
+	if strutils.IsStringEmpty(params.OperatorID) {
+		return errors.New("没有传递操作人ID")
+	}
+
+	if strutils.IsStringEmpty(params.DBSchema) {
+		return errors.New("没有传递数据库schema")
+	}
+
+	if params.DBExecutor == nil {
+		return errors.New("没有传递数据库执行器")
+	}
+
+	return nil
+}
+
+func RunTask(runner task.Runner, params *RunTaskParams) error {
+	ctxJsonBytes, err := json.Marshal(params.Context)
+	if err != nil {
+		return err
+	}
+
+	ctxJsonStr := string(ctxJsonBytes)
+
+	// 创建任务,任务状态为已创建
+	taskEntity := &task.Entity{
+		Group:        params.Group,
+		Context:      ctxJsonStr,
+		CreateUserID: params.OperatorID,
+	}
+
+	err = createTaskDB(taskEntity, params.DBSchema, params.DBExecutor)
+	if err != nil {
+		return err
+	}
+
+	go runTask(taskEntity, runner, params.DBSchema, params.DBExecutor, func(ctx map[string]any, runner task.Runner) error {
+		return runner.Run(ctx)
+	})
+
+	return nil
+}
+
+type RestartTaskParams struct {
+	Group      string
+	DBSchema   string
+	DBExecutor database.Executor
+}
+
+func (params *RestartTaskParams) check() error {
+	if strutils.IsStringEmpty(params.Group) {
+		return errors.New("没有传递任务组")
+	}
+
+	if strutils.IsStringEmpty(params.DBSchema) {
+		return errors.New("没有传递数据库schema")
+	}
+
+	if params.DBExecutor == nil {
+		return errors.New("没有传递数据库执行器")
+	}
+
+	return nil
+}
+
+func RestartTask(runner task.Runner, params *RestartTaskParams) error {
+	runningResults, _, err := database.Query(params.DBExecutor, &sql.QueryExecuteParams{
+		TableName: domain.TableName(params.DBSchema, &task.Entity{}),
+		Conditions: sql.NewConditions().Equal(task.ColumnGroup, params.Group).
+			Equal(task.ColumnStatus, task.StatusCodeRunning),
+		PageNo:   0,
+		PageSize: 0,
+	})
+	if err != nil {
+		return err
+	}
+
+	taskEntities := make([]task.Entity, 0)
+	err = sql.ParseSqlResult(runningResults, &taskEntities)
+	if err != nil {
+		return err
+	}
+
+	for _, taskEntity := range taskEntities {
+		go runTask(&taskEntity, runner, params.DBSchema, params.DBExecutor, func(ctx map[string]any, runner task.Runner) error {
+			return runner.Restart(ctx)
+		})
+	}
+
+	return nil
+}
+
+func createTaskDB(taskEntity *task.Entity, dbSchema string, dbExecutor database.Executor) error {
+	taskEntity.SetStatusCreated()
+
+	err := domain.CheckWhen(taskEntity, taskEntity.GetFieldMap(), "run")
+	if err != nil {
+		return err
+	}
+
+	err = database.InsertEntity(dbExecutor, domain.TableName(dbSchema, taskEntity), taskEntity)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func updateTaskStatusRunningDB(taskEntity *task.Entity, dbSchema string, dbExecutor database.Executor) error {
+	taskEntity.SetStatusRunning()
+
+	err := database.UpdateEntity(dbExecutor, domain.TableName(dbSchema, taskEntity), taskEntity)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func updateTaskStatusCompleteDB(taskEntity *task.Entity, dbSchema string, dbExecutor database.Executor) error {
+	taskEntity.SetStatusCompleted()
+
+	err := database.UpdateEntity(dbExecutor, domain.TableName(dbSchema, taskEntity), taskEntity)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func updateTaskStatusErrorDB(taskEntity *task.Entity, errMsg string, dbSchema string, dbExecutor database.Executor) error {
+	taskEntity.SetStatusError(errMsg)
+
+	err := database.UpdateEntity(dbExecutor, domain.TableName(dbSchema, taskEntity), taskEntity)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func runTask(taskEntity *task.Entity, runner task.Runner, dbSchema string, dbExecutor database.Executor, executeFunc func(ctx map[string]any, runner task.Runner) error) {
+	ctx := make(map[string]any)
+	err := json.Unmarshal([]byte(taskEntity.Context), &ctx)
+	if err != nil {
+		err = updateTaskStatusErrorDB(taskEntity, err.Error(), dbSchema, dbExecutor)
+		if err != nil {
+			logger.GetInstance().Error("解析任务上下文失败 ", err)
+			return
+		}
+
+		return
+	}
+
+	err = updateTaskStatusRunningDB(taskEntity, dbSchema, dbExecutor)
+	if err != nil {
+		logger.GetInstance().Error("更新任务运行状态失败 ", err)
+		return
+	}
+
+	// 执行任务
+	err = executeFunc(ctx, runner)
+	if err != nil {
+		err = updateTaskStatusErrorDB(taskEntity, err.Error(), dbSchema, dbExecutor)
+		if err != nil {
+			logger.GetInstance().Error("更新任务错误状态失败 ", err)
+			return
+		}
+
+		return
+	}
+
+	err = updateTaskStatusCompleteDB(taskEntity, dbSchema, dbExecutor)
+	if err != nil {
+		logger.GetInstance().Error("更新任务完成状态失败 ", err)
+		return
+	}
+}
+
+// Simple Bind参数
+type Simple struct {
+	// schema
+	Schema string
+}
+
+func (simple *Simple) bind(binder *binding.Binder) {
+	// TODO 完成查询接口
+}
+
+func Bind(app *application.App, simple *Simple) {
+	binder := binding.NewBinder(app.Api().ChooseRouter(api.RouterPrefix, ""), app.Infrastructure())
+	simple.bind(binder)
+}