Browse Source

修改任务的bug

yjp 3 weeks ago
parent
commit
0e65a0444f

+ 2 - 2
convenient/domain/task_runner/runner.go

@@ -1,7 +1,7 @@
 package task_runner
 
 type Runner interface {
-	Run(taskID string, ctx map[string]any) error
-	Restart(taskID string, ctx map[string]any) error
+	Run(taskID string, ctx map[string]any) (bool, error)
+	Restart(taskID string, ctx map[string]any) (bool, error)
 	Stop(taskID string, ctx map[string]any) error
 }

+ 2 - 2
convenient/domain/task_runner/simple.go

@@ -34,7 +34,7 @@ func (simple *Simple) bind(binder *binding.Binder) {
 				PageNo:     0,
 			}
 
-			queryParams, err := request.ToConcrete[task.GetTaskQueryParams](params)
+			queryParams, err := request.ToConcrete[*task.GetTaskQueryParams](params)
 			if err != nil {
 				return errResponse, err
 			}
@@ -64,7 +64,7 @@ func (simple *Simple) bind(binder *binding.Binder) {
 			}
 
 			results, totalCount, err := database.Query(i.DBExecutor(), &sql.QueryExecuteParams{
-				TableName:  simple.Schema,
+				TableName:  domain.TableName(simple.Schema, &task.Entity{}),
 				Conditions: conditions,
 				PageNo:     queryParams.PageNo,
 				PageSize:   queryParams.PageSize,

+ 8 - 4
convenient/domain/task_runner/task_runner.go

@@ -71,7 +71,7 @@ func RunTask(runner Runner, params *RunTaskParams) (string, error) {
 	loaded, _ := runnerRegister.LoadOrStore(params.Group, runner)
 	loadedRunner := loaded.(Runner)
 
-	go runTask(taskEntity, loadedRunner, params.DBSchema, params.DBExecutor, func(ctx map[string]any, runner Runner) error {
+	go runTask(taskEntity, loadedRunner, params.DBSchema, params.DBExecutor, func(ctx map[string]any, runner Runner) (bool, error) {
 		return runner.Run(taskEntity.ID, ctx)
 	})
 
@@ -127,7 +127,7 @@ func RestartTask(runner Runner, params *RestartTaskParams) error {
 		loaded, _ := runnerRegister.LoadOrStore(params.Group, runner)
 		loadedRunner := loaded.(Runner)
 
-		go runTask(&taskEntity, loadedRunner, params.DBSchema, params.DBExecutor, func(ctx map[string]any, runner Runner) error {
+		go runTask(&taskEntity, loadedRunner, params.DBSchema, params.DBExecutor, func(ctx map[string]any, runner Runner) (bool, error) {
 			return runner.Restart(taskEntity.ID, ctx)
 		})
 	}
@@ -283,7 +283,7 @@ func updateTaskStatusErrorDB(taskEntity *task.Entity, errMsg string, dbSchema st
 	return nil
 }
 
-func runTask(taskEntity *task.Entity, runner Runner, dbSchema string, dbExecutor database.Executor, executeFunc func(ctx map[string]any, runner Runner) error) {
+func runTask(taskEntity *task.Entity, runner Runner, dbSchema string, dbExecutor database.Executor, executeFunc func(ctx map[string]any, runner Runner) (bool, error)) {
 	ctx, err := taskEntity.GetMapContext()
 	if err != nil {
 		err = updateTaskStatusErrorDB(taskEntity, err.Error(), dbSchema, dbExecutor)
@@ -302,7 +302,7 @@ func runTask(taskEntity *task.Entity, runner Runner, dbSchema string, dbExecutor
 	}
 
 	// 执行任务
-	err = executeFunc(ctx, runner)
+	stopped, err := executeFunc(ctx, runner)
 	if err != nil {
 		err = updateTaskStatusErrorDB(taskEntity, err.Error(), dbSchema, dbExecutor)
 		if err != nil {
@@ -313,6 +313,10 @@ func runTask(taskEntity *task.Entity, runner Runner, dbSchema string, dbExecutor
 		return
 	}
 
+	if stopped {
+		return
+	}
+
 	err = updateTaskStatusCompleteDB(taskEntity, dbSchema, dbExecutor)
 	if err != nil {
 		logger.GetInstance().Error("更新任务完成状态失败 ", err)

+ 11 - 8
convenient/domain/task_runner/task_runner_test.go

@@ -18,7 +18,8 @@ import (
 )
 
 // curl -X POST -H "Content-Type: application/json" -d '{"message": "World"}' "http://localhost:30001/test/api/v1/task/start"
-// curl -X POST -H "Content-Type: application/json" -d '{"id": "27264de7f4c24345ab0cc49fe9cf57a6"}' "http://localhost:30001/test/api/v1/task/stop"
+// curl -X POST -H "Content-Type: application/json" -d '{"id": "0f89bc3ac6364280a535d86ce57a561b"}' "http://localhost:30001/test/api/v1/task/stop"
+// curl -X GET "http://localhost:30001/test/api/task/get?group=test&pageNo=0&pageSize=0"
 
 type StartTaskJsonBody struct {
 	Message string `json:"message"`
@@ -55,10 +56,10 @@ func DestroyTestRunner(runner *TestRunner) {
 	}
 }
 
-func (runner *TestRunner) Run(taskID string, ctx map[string]any) error {
+func (runner *TestRunner) Run(taskID string, ctx map[string]any) (bool, error) {
 	messageCtx, ok := ctx["message"]
 	if !ok {
-		return fmt.Errorf("message not found")
+		return false, fmt.Errorf("message not found")
 	}
 
 	message := messageCtx.(string)
@@ -76,15 +77,15 @@ func (runner *TestRunner) Run(taskID string, ctx map[string]any) error {
 		case <-ticker.C:
 			fmt.Println("Hello " + message)
 		case <-doneChan:
-			return nil
+			return true, nil
 		}
 	}
 }
 
-func (runner *TestRunner) Restart(taskID string, ctx map[string]any) error {
+func (runner *TestRunner) Restart(taskID string, ctx map[string]any) (bool, error) {
 	messageCtx, ok := ctx["message"]
 	if !ok {
-		return fmt.Errorf("message not found")
+		return false, fmt.Errorf("message not found")
 	}
 
 	message := messageCtx.(string)
@@ -102,12 +103,12 @@ func (runner *TestRunner) Restart(taskID string, ctx map[string]any) error {
 		case <-ticker.C:
 			fmt.Println("Hello " + message)
 		case <-doneChan:
-			return nil
+			return true, nil
 		}
 	}
 }
 
-func (runner *TestRunner) Stop(taskID string, ctx map[string]any) error {
+func (runner *TestRunner) Stop(taskID string, _ map[string]any) error {
 	runner.doneChanMapMutex.Lock()
 
 	doneChan, ok := runner.doneChanMap[taskID]
@@ -118,6 +119,7 @@ func (runner *TestRunner) Stop(taskID string, ctx map[string]any) error {
 
 	doneChan <- nil
 	delete(runner.doneChanMap, taskID)
+	close(doneChan)
 	doneChan = nil
 
 	runner.doneChanMapMutex.Unlock()
@@ -154,6 +156,7 @@ func TestTaskRunner(t *testing.T) {
 	appInstance.Api().PrefixRouter().RegisterVersionedRouter("v1")
 
 	runner := NewTestRunner()
+	defer DestroyTestRunner(runner)
 
 	err := RestartTask(runner, &RestartTaskParams{
 		Group:      taskGroup,