package task_runner import ( "fmt" "git.sxidc.com/go-framework/baize" "git.sxidc.com/go-framework/baize/framework/binding" "git.sxidc.com/go-framework/baize/framework/core/api" "git.sxidc.com/go-framework/baize/framework/core/api/request" "git.sxidc.com/go-framework/baize/framework/core/api/response" "git.sxidc.com/go-framework/baize/framework/core/application" "git.sxidc.com/go-framework/baize/framework/core/domain" "git.sxidc.com/go-framework/baize/framework/core/infrastructure" "git.sxidc.com/go-framework/baize/framework/core/infrastructure/database/operations" "git.sxidc.com/go-tools/utils/strutils" "sync" "testing" "time" ) // curl -X POST -H "Content-Type: application/json" -d '{"message": "World"}' "http://localhost:30001/test/api/v1/task/start" // curl -X POST -H "Content-Type: application/json" -d '{"id": "0f89bc3ac6364280a535d86ce57a561b"}' "http://localhost:30001/test/api/v1/task/stop" // curl -X GET "http://localhost:30001/test/api/task/get?group=test&pageNo=0&pageSize=0" type StartTaskJsonBody struct { Message string `json:"message"` } type StopTaskJsonBody struct { ID string `json:"id"` } type TestRunner struct { doneChanMap map[string]chan any doneChanMapMutex *sync.Mutex } func NewTestRunner() *TestRunner { return &TestRunner{ doneChanMap: make(map[string]chan any), doneChanMapMutex: new(sync.Mutex), } } func DestroyTestRunner(runner *TestRunner) { if runner == nil { return } runner.doneChanMapMutex.Lock() defer runner.doneChanMapMutex.Unlock() for taskID, doneChan := range runner.doneChanMap { doneChan <- nil close(doneChan) delete(runner.doneChanMap, taskID) } } func (runner *TestRunner) Run(taskID string, ctx map[string]any) (bool, error) { messageCtx, ok := ctx["message"] if !ok { return false, fmt.Errorf("message not found") } message := messageCtx.(string) doneChan := make(chan any) runner.doneChanMapMutex.Lock() runner.doneChanMap[taskID] = doneChan runner.doneChanMapMutex.Unlock() ticker := time.NewTicker(time.Second * 1) defer ticker.Stop() for { select { case <-ticker.C: fmt.Println("Hello " + message) case <-doneChan: return true, nil } } } func (runner *TestRunner) Restart(taskID string, ctx map[string]any) (bool, error) { messageCtx, ok := ctx["message"] if !ok { return false, fmt.Errorf("message not found") } message := messageCtx.(string) doneChan := make(chan any) runner.doneChanMapMutex.Lock() runner.doneChanMap[taskID] = doneChan runner.doneChanMapMutex.Unlock() ticker := time.NewTicker(time.Second * 1) defer ticker.Stop() for { select { case <-ticker.C: fmt.Println("Hello " + message) case <-doneChan: return true, nil } } } func (runner *TestRunner) Stop(taskID string, _ map[string]any) error { runner.doneChanMapMutex.Lock() doneChan, ok := runner.doneChanMap[taskID] if !ok { runner.doneChanMapMutex.Unlock() return fmt.Errorf("task not found") } doneChan <- nil delete(runner.doneChanMap, taskID) close(doneChan) doneChan = nil runner.doneChanMapMutex.Unlock() return nil } const ( taskGroup = "test" dbSchema = "test" ) func TestTaskRunner(t *testing.T) { appInstance := baize.NewApplication(application.Config{ ApiConfig: application.ApiConfig{ UrlPrefix: "/test/api", Port: "30001", }, InfrastructureConfig: application.InfrastructureConfig{ Database: infrastructure.DatabaseConfig{ Operations: &operations.Config{ UserName: "test", Password: "123456", Address: "10.0.0.84", Port: "30432", Database: "test", MaxConnections: 20, MaxIdleConnections: 5, }, }, }, }) appInstance.Api().PrefixRouter().RegisterVersionedRouter("v1") runner := NewTestRunner() defer DestroyTestRunner(runner) err := RestartTask(runner, &RestartTaskParams{ Group: taskGroup, DBSchema: dbSchema, DBExecutor: appInstance.Infrastructure().DBExecutor(), }) if err != nil { t.Fatal(err) } Bind(appInstance, &Simple{ Schema: dbSchema, }) v1Binder := binding.NewBinder(appInstance.ChooseRouter(api.RouterPrefix, "v1"), appInstance.Infrastructure()) binding.PostBind(v1Binder, &binding.SimpleBindItem[string]{ Path: "/task/start", SendResponseFunc: response.SendIDResponse, RequestParams: &StartTaskJsonBody{}, ServiceFunc: func(c *api.Context, params request.Params, objects []domain.Object, i *infrastructure.Infrastructure) (string, error) { jsonBody, err := request.ToConcrete[*StartTaskJsonBody](params) if err != nil { return "", err } taskID, err := RunTask(runner, &RunTaskParams{ Group: taskGroup, OperatorID: strutils.SimpleUUID(), Context: map[string]any{ "message": jsonBody.Message, }, DBSchema: dbSchema, DBExecutor: i.DBExecutor(), }) if err != nil { return "", err } return taskID, nil }, }) binding.PostBind(v1Binder, &binding.SimpleBindItem[any]{ Path: "/task/stop", SendResponseFunc: response.SendMsgResponse, RequestParams: &StopTaskJsonBody{}, ServiceFunc: func(c *api.Context, params request.Params, objects []domain.Object, i *infrastructure.Infrastructure) (any, error) { jsonBody, err := request.ToConcrete[*StopTaskJsonBody](params) if err != nil { return nil, err } err = StopTask(&StopTaskParams{ ID: jsonBody.ID, DBSchema: dbSchema, DBExecutor: i.DBExecutor(), }) if err != nil { return nil, err } return nil, nil }, }) err = appInstance.Start() if err != nil { t.Fatal(err) } defer func(appInstance *application.App) { err := appInstance.Finish() if err != nil { t.Fatal(err) } }(appInstance) }