123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 |
- package task_runner
- import (
- "fmt"
- "git.sxidc.com/go-framework/baize"
- "git.sxidc.com/go-framework/baize/framework/binding"
- "git.sxidc.com/go-framework/baize/framework/core/api"
- "git.sxidc.com/go-framework/baize/framework/core/api/request"
- "git.sxidc.com/go-framework/baize/framework/core/api/response"
- "git.sxidc.com/go-framework/baize/framework/core/application"
- "git.sxidc.com/go-framework/baize/framework/core/domain"
- "git.sxidc.com/go-framework/baize/framework/core/infrastructure"
- "git.sxidc.com/go-framework/baize/framework/core/infrastructure/database/operations"
- "git.sxidc.com/go-tools/utils/strutils"
- "sync"
- "testing"
- "time"
- )
- type StartTaskJsonBody struct {
- Message string `json:"message"`
- }
- type StopTaskJsonBody struct {
- ID string `json:"id"`
- }
- type TestRunner struct {
- doneChanMap map[string]chan any
- doneChanMapMutex *sync.Mutex
- }
- func NewTestRunner() *TestRunner {
- return &TestRunner{
- doneChanMap: make(map[string]chan any),
- doneChanMapMutex: new(sync.Mutex),
- }
- }
- func DestroyTestRunner(runner *TestRunner) {
- if runner == nil {
- return
- }
- runner.doneChanMapMutex.Lock()
- defer runner.doneChanMapMutex.Unlock()
- for taskID, doneChan := range runner.doneChanMap {
- doneChan <- nil
- close(doneChan)
- delete(runner.doneChanMap, taskID)
- }
- }
- func (runner *TestRunner) Run(taskID string, ctx map[string]any) (bool, error) {
- messageCtx, ok := ctx["message"]
- if !ok {
- return false, fmt.Errorf("message not found")
- }
- message := messageCtx.(string)
- doneChan := make(chan any)
- runner.doneChanMapMutex.Lock()
- runner.doneChanMap[taskID] = doneChan
- runner.doneChanMapMutex.Unlock()
- ticker := time.NewTicker(time.Second * 1)
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- fmt.Println("Hello " + message)
- case <-doneChan:
- return true, nil
- }
- }
- }
- func (runner *TestRunner) Restart(taskID string, ctx map[string]any) (bool, error) {
- messageCtx, ok := ctx["message"]
- if !ok {
- return false, fmt.Errorf("message not found")
- }
- message := messageCtx.(string)
- doneChan := make(chan any)
- runner.doneChanMapMutex.Lock()
- runner.doneChanMap[taskID] = doneChan
- runner.doneChanMapMutex.Unlock()
- ticker := time.NewTicker(time.Second * 1)
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- fmt.Println("Hello " + message)
- case <-doneChan:
- return true, nil
- }
- }
- }
- func (runner *TestRunner) Stop(taskID string, _ map[string]any) error {
- runner.doneChanMapMutex.Lock()
- doneChan, ok := runner.doneChanMap[taskID]
- if !ok {
- runner.doneChanMapMutex.Unlock()
- return fmt.Errorf("task not found")
- }
- doneChan <- nil
- delete(runner.doneChanMap, taskID)
- close(doneChan)
- doneChan = nil
- runner.doneChanMapMutex.Unlock()
- return nil
- }
- const (
- taskGroup = "test"
- dbSchema = "test"
- )
- func TestTaskRunner(t *testing.T) {
- appInstance := baize.NewApplication(application.Config{
- ApiConfig: application.ApiConfig{
- UrlPrefix: "/test/api",
- Port: "30001",
- },
- InfrastructureConfig: application.InfrastructureConfig{
- Database: infrastructure.DatabaseConfig{
- Operations: &operations.Config{
- UserName: "test",
- Password: "123456",
- Address: "10.0.0.84",
- Port: "30432",
- Database: "test",
- MaxConnections: 20,
- MaxIdleConnections: 5,
- },
- },
- },
- })
- appInstance.Api().PrefixRouter().RegisterVersionedRouter("v1")
- runner := NewTestRunner()
- defer DestroyTestRunner(runner)
- err := RestartTask(runner, &RestartTaskParams{
- Group: taskGroup,
- DBSchema: dbSchema,
- DBExecutor: appInstance.Infrastructure().DBExecutor(),
- })
- if err != nil {
- t.Fatal(err)
- }
- Bind(appInstance, &Simple{
- Schema: dbSchema,
- })
- v1Binder := binding.NewBinder(appInstance.ChooseRouter(api.RouterPrefix, "v1"), appInstance.Infrastructure())
- binding.PostBind(v1Binder, &binding.SimpleBindItem[string]{
- Path: "/task/start",
- SendResponseFunc: response.SendIDResponse,
- RequestParams: &StartTaskJsonBody{},
- ServiceFunc: func(c *api.Context, params request.Params, objects []domain.Object, i *infrastructure.Infrastructure) (string, error) {
- jsonBody, err := request.ToConcrete[*StartTaskJsonBody](params)
- if err != nil {
- return "", err
- }
- taskID, err := RunTask(runner, &RunTaskParams{
- Group: taskGroup,
- OperatorID: strutils.SimpleUUID(),
- Context: map[string]any{
- "message": jsonBody.Message,
- },
- DBSchema: dbSchema,
- DBExecutor: i.DBExecutor(),
- })
- if err != nil {
- return "", err
- }
- return taskID, nil
- },
- })
- binding.PostBind(v1Binder, &binding.SimpleBindItem[any]{
- Path: "/task/stop",
- SendResponseFunc: response.SendMsgResponse,
- RequestParams: &StopTaskJsonBody{},
- ServiceFunc: func(c *api.Context, params request.Params, objects []domain.Object, i *infrastructure.Infrastructure) (any, error) {
- jsonBody, err := request.ToConcrete[*StopTaskJsonBody](params)
- if err != nil {
- return nil, err
- }
- err = StopTask(&StopTaskParams{
- ID: jsonBody.ID,
- DBSchema: dbSchema,
- DBExecutor: i.DBExecutor(),
- })
- if err != nil {
- return nil, err
- }
- return nil, nil
- },
- })
- err = appInstance.Start()
- if err != nil {
- t.Fatal(err)
- }
- defer func(appInstance *application.App) {
- err := appInstance.Finish()
- if err != nil {
- t.Fatal(err)
- }
- }(appInstance)
- }
|