Browse Source

修改bug

yjp 4 weeks ago
parent
commit
d2f369bd81

+ 39 - 0
convenient/data_containers/task_runner/task.yaml

@@ -0,0 +1,39 @@
+kind: DataContainer
+spec:
+  namespace: # 替换
+  data_source: # 替换
+  name: # 替换.sql_execute_logs
+  spec:
+    table_name: # 替换.sql_execute_logs
+    columns:
+      - name: id
+        type: varchar(32)
+        comment: id
+        primary_key: true
+      - name: group
+        type: varchar(256)
+        comment: 任务组
+        index: true
+      - name: context
+        type: text
+        comment: 上下文
+      - name: status_code
+        type: integer
+        comment: 状态码
+        index: true
+      - name: err_msg
+        type: text
+        comment: 错误信息
+      - name: create_user_id
+        type: varchar(32)
+        comment: 创建者ID
+        index: true
+        not_null: true
+      - name: created_time
+        type: "timestamp with time zone"
+        comment: 创建时间
+        not_null: true
+      - name: last_updated_time
+        type: "timestamp with time zone"
+        comment: 最近更新时间
+        not_null: true

+ 3 - 3
convenient/domain/task_runner/runner.go

@@ -1,7 +1,7 @@
 package task_runner
 
 type Runner interface {
-	Run(ctx map[string]any) error
-	Restart(ctx map[string]any) error
-	Stop(ctx map[string]any) error
+	Run(taskID string, ctx map[string]any) error
+	Restart(taskID string, ctx map[string]any) error
+	Stop(taskID string, ctx map[string]any) error
 }

+ 7 - 1
convenient/domain/task_runner/simple.go

@@ -46,7 +46,13 @@ func (simple *Simple) bind(binder *binding.Binder) {
 			}
 
 			if strutils.IsStringNotEmpty(queryParams.Status) {
-				conditions.Equal(task.ColumnStatus, queryParams.Status)
+				e := task.Entity{Status: queryParams.Status}
+				err := e.TransferStatus2StatusCode()
+				if err != nil {
+					return errResponse, err
+				}
+
+				conditions.Equal(task.ColumnStatusCode, e.StatusCode)
 			}
 
 			if strutils.IsStringNotEmpty(queryParams.StartCreatedTime) {

+ 10 - 9
convenient/domain/task_runner/task/entity.go

@@ -42,17 +42,18 @@ var (
 )
 
 const (
-	FieldGroup   = "Group"
-	FieldContext = "Context"
-	FieldStatus  = "Status"
-	FieldErrMsg  = "ErrMsg"
+	FieldGroup      = "Group"
+	FieldContext    = "Context"
+	FieldStatus     = "Status"
+	FieldStatusCode = "StatusCode"
+	FieldErrMsg     = "ErrMsg"
 )
 
 var (
-	ColumnGroup   = domain.ColumnName(FieldGroup)
-	ColumnContext = domain.ColumnName(FieldContext)
-	ColumnStatus  = domain.ColumnName(FieldStatus)
-	ColumnErrMsg  = domain.ColumnName(FieldErrMsg)
+	ColumnGroup      = domain.ColumnName(FieldGroup)
+	ColumnContext    = domain.ColumnName(FieldContext)
+	ColumnStatusCode = domain.ColumnName(FieldStatusCode)
+	ColumnErrMsg     = domain.ColumnName(FieldErrMsg)
 )
 
 var fieldMap = map[string]string{
@@ -64,7 +65,7 @@ var fieldMap = map[string]string{
 
 type Entity struct {
 	entity.Base
-	Group        string `sqlmapping:"column:group;notUpdate;" sqlresult:"column:group;" check:"required,len=256,when=run/restart"`
+	Group        string `sqlmapping:"column:group;notUpdate;" sqlresult:"column:group;" check:"required,lte=256,when=run/restart"`
 	Context      string `sqlmapping:"column:context;notUpdate;" sqlresult:"column:context;"`
 	StatusCode   int    `sqlmapping:"column:status_code;" sqlresult:"column:status_code;" check:"required,when=run/restart"`
 	Status       string `sqlmapping:"-" sqlresult:"-"`

+ 1 - 1
convenient/domain/task_runner/task/info.go

@@ -22,7 +22,7 @@ func FormInfo(result *sql.Result) (*Info, error) {
 		return new(Info), nil
 	}
 
-	statusCode := result.ColumnValueInt(ColumnStatus)
+	statusCode := result.ColumnValueInt(ColumnStatusCode)
 	statusStr, ok := statusMap[statusCode]
 	if !ok {
 		return nil, errors.Errorf("无效的状态码: %v\n", statusCode)

+ 11 - 6
convenient/domain/task_runner/task_runner.go

@@ -72,7 +72,7 @@ func RunTask(runner Runner, params *RunTaskParams) (string, error) {
 	loadedRunner := loaded.(Runner)
 
 	go runTask(taskEntity, loadedRunner, params.DBSchema, params.DBExecutor, func(ctx map[string]any, runner Runner) error {
-		return runner.Run(ctx)
+		return runner.Run(taskEntity.ID, ctx)
 	})
 
 	return taskEntity.ID, nil
@@ -109,7 +109,7 @@ func RestartTask(runner Runner, params *RestartTaskParams) error {
 	runningResults, _, err := database.Query(params.DBExecutor, &sql.QueryExecuteParams{
 		TableName: domain.TableName(params.DBSchema, &task.Entity{}),
 		Conditions: sql.NewConditions().Equal(task.ColumnGroup, params.Group).
-			Equal(task.ColumnStatus, task.StatusCodeRunning),
+			Equal(task.ColumnStatusCode, task.StatusCodeRunning),
 		PageNo:   0,
 		PageSize: 0,
 	})
@@ -128,7 +128,7 @@ func RestartTask(runner Runner, params *RestartTaskParams) error {
 		loadedRunner := loaded.(Runner)
 
 		go runTask(&taskEntity, loadedRunner, params.DBSchema, params.DBExecutor, func(ctx map[string]any, runner Runner) error {
-			return runner.Restart(ctx)
+			return runner.Restart(taskEntity.ID, ctx)
 		})
 	}
 
@@ -165,7 +165,7 @@ func StopTask(params *StopTaskParams) error {
 	result, err := database.QueryOne(params.DBExecutor, &sql.QueryOneExecuteParams{
 		TableName: domain.TableName(params.DBSchema, &task.Entity{}),
 		Conditions: sql.NewConditions().Equal(entity.ColumnID, params.ID).
-			Equal(task.ColumnStatus, task.StatusCodeRunning),
+			Equal(task.ColumnStatusCode, task.StatusCodeRunning),
 	})
 	if err != nil {
 		if database.IsErrorDBRecordNotExist(err) {
@@ -204,7 +204,7 @@ func StopTask(params *StopTaskParams) error {
 			return err
 		}
 
-		err = loadedRunner.Stop(ctx)
+		err = loadedRunner.Stop(taskEntity.ID, ctx)
 		if err != nil {
 			return err
 		}
@@ -219,9 +219,14 @@ func StopTask(params *StopTaskParams) error {
 }
 
 func createTaskDB(taskEntity *task.Entity, dbSchema string, dbExecutor database.Executor) error {
+	err := taskEntity.GenerateID()
+	if err != nil {
+		return err
+	}
+
 	taskEntity.SetStatusCreated()
 
-	err := domain.CheckWhen(taskEntity, taskEntity.GetFieldMap(), "run")
+	err = domain.CheckWhen(taskEntity, taskEntity.GetFieldMap(), "run")
 	if err != nil {
 		return err
 	}

+ 234 - 0
convenient/domain/task_runner/task_runner_test.go

@@ -0,0 +1,234 @@
+package task_runner
+
+import (
+	"fmt"
+	"git.sxidc.com/go-framework/baize"
+	"git.sxidc.com/go-framework/baize/framework/binding"
+	"git.sxidc.com/go-framework/baize/framework/core/api"
+	"git.sxidc.com/go-framework/baize/framework/core/api/request"
+	"git.sxidc.com/go-framework/baize/framework/core/api/response"
+	"git.sxidc.com/go-framework/baize/framework/core/application"
+	"git.sxidc.com/go-framework/baize/framework/core/domain"
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure"
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/database/operations"
+	"git.sxidc.com/go-tools/utils/strutils"
+	"sync"
+	"testing"
+	"time"
+)
+
+// curl -X POST -H "Content-Type: application/json" -d '{"message": "World"}' "http://localhost:30001/test/api/v1/task/start"
+// curl -X POST -H "Content-Type: application/json" -d '{"id": "27264de7f4c24345ab0cc49fe9cf57a6"}' "http://localhost:30001/test/api/v1/task/stop"
+
+type StartTaskJsonBody struct {
+	Message string `json:"message"`
+}
+
+type StopTaskJsonBody struct {
+	ID string `json:"id"`
+}
+
+type TestRunner struct {
+	doneChanMap      map[string]chan any
+	doneChanMapMutex *sync.Mutex
+}
+
+func NewTestRunner() *TestRunner {
+	return &TestRunner{
+		doneChanMap:      make(map[string]chan any),
+		doneChanMapMutex: new(sync.Mutex),
+	}
+}
+
+func DestroyTestRunner(runner *TestRunner) {
+	if runner == nil {
+		return
+	}
+
+	runner.doneChanMapMutex.Lock()
+	defer runner.doneChanMapMutex.Unlock()
+
+	for taskID, doneChan := range runner.doneChanMap {
+		doneChan <- nil
+		close(doneChan)
+		delete(runner.doneChanMap, taskID)
+	}
+}
+
+func (runner *TestRunner) Run(taskID string, ctx map[string]any) error {
+	messageCtx, ok := ctx["message"]
+	if !ok {
+		return fmt.Errorf("message not found")
+	}
+
+	message := messageCtx.(string)
+
+	doneChan := make(chan any)
+	runner.doneChanMapMutex.Lock()
+	runner.doneChanMap[taskID] = doneChan
+	runner.doneChanMapMutex.Unlock()
+
+	ticker := time.NewTicker(time.Second * 1)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case <-ticker.C:
+			fmt.Println("Hello " + message)
+		case <-doneChan:
+			return nil
+		}
+	}
+}
+
+func (runner *TestRunner) Restart(taskID string, ctx map[string]any) error {
+	messageCtx, ok := ctx["message"]
+	if !ok {
+		return fmt.Errorf("message not found")
+	}
+
+	message := messageCtx.(string)
+
+	doneChan := make(chan any)
+	runner.doneChanMapMutex.Lock()
+	runner.doneChanMap[taskID] = doneChan
+	runner.doneChanMapMutex.Unlock()
+
+	ticker := time.NewTicker(time.Second * 1)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case <-ticker.C:
+			fmt.Println("Hello " + message)
+		case <-doneChan:
+			return nil
+		}
+	}
+}
+
+func (runner *TestRunner) Stop(taskID string, ctx map[string]any) error {
+	runner.doneChanMapMutex.Lock()
+
+	doneChan, ok := runner.doneChanMap[taskID]
+	if !ok {
+		runner.doneChanMapMutex.Unlock()
+		return fmt.Errorf("task not found")
+	}
+
+	doneChan <- nil
+	delete(runner.doneChanMap, taskID)
+	doneChan = nil
+
+	runner.doneChanMapMutex.Unlock()
+
+	return nil
+}
+
+const (
+	taskGroup = "test"
+	dbSchema  = "test"
+)
+
+func TestTaskRunner(t *testing.T) {
+	appInstance := baize.NewApplication(application.Config{
+		ApiConfig: application.ApiConfig{
+			UrlPrefix: "/test/api",
+			Port:      "30001",
+		},
+		InfrastructureConfig: application.InfrastructureConfig{
+			Database: infrastructure.DatabaseConfig{
+				Operations: &operations.Config{
+					UserName:           "test",
+					Password:           "123456",
+					Address:            "10.0.0.84",
+					Port:               "30432",
+					Database:           "test",
+					MaxConnections:     20,
+					MaxIdleConnections: 5,
+				},
+			},
+		},
+	})
+
+	appInstance.Api().PrefixRouter().RegisterVersionedRouter("v1")
+
+	runner := NewTestRunner()
+
+	err := RestartTask(runner, &RestartTaskParams{
+		Group:      taskGroup,
+		DBSchema:   dbSchema,
+		DBExecutor: appInstance.Infrastructure().DBExecutor(),
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	Bind(appInstance, &Simple{
+		Schema: dbSchema,
+	})
+
+	v1Binder := binding.NewBinder(appInstance.ChooseRouter(api.RouterPrefix, "v1"), appInstance.Infrastructure())
+
+	binding.PostBind(v1Binder, &binding.SimpleBindItem[string]{
+		Path:             "/task/start",
+		SendResponseFunc: response.SendIDResponse,
+		RequestParams:    &StartTaskJsonBody{},
+		ServiceFunc: func(c *api.Context, params request.Params, objects []domain.Object, i *infrastructure.Infrastructure) (string, error) {
+			jsonBody, err := request.ToConcrete[*StartTaskJsonBody](params)
+			if err != nil {
+				return "", err
+			}
+
+			taskID, err := RunTask(runner, &RunTaskParams{
+				Group:      taskGroup,
+				OperatorID: strutils.SimpleUUID(),
+				Context: map[string]any{
+					"message": jsonBody.Message,
+				},
+				DBSchema:   dbSchema,
+				DBExecutor: i.DBExecutor(),
+			})
+			if err != nil {
+				return "", err
+			}
+
+			return taskID, nil
+		},
+	})
+
+	binding.PostBind(v1Binder, &binding.SimpleBindItem[any]{
+		Path:             "/task/stop",
+		SendResponseFunc: response.SendMsgResponse,
+		RequestParams:    &StopTaskJsonBody{},
+		ServiceFunc: func(c *api.Context, params request.Params, objects []domain.Object, i *infrastructure.Infrastructure) (any, error) {
+			jsonBody, err := request.ToConcrete[*StopTaskJsonBody](params)
+			if err != nil {
+				return nil, err
+			}
+
+			err = StopTask(&StopTaskParams{
+				ID:         jsonBody.ID,
+				DBSchema:   dbSchema,
+				DBExecutor: i.DBExecutor(),
+			})
+			if err != nil {
+				return nil, err
+			}
+
+			return nil, nil
+		},
+	})
+
+	err = appInstance.Start()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	defer func(appInstance *application.App) {
+		err := appInstance.Finish()
+		if err != nil {
+			t.Fatal(err)
+		}
+	}(appInstance)
+}