task_runner_test.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. package task_runner
  2. import (
  3. "fmt"
  4. "git.sxidc.com/go-framework/baize"
  5. "git.sxidc.com/go-framework/baize/framework/binding"
  6. "git.sxidc.com/go-framework/baize/framework/core/api"
  7. "git.sxidc.com/go-framework/baize/framework/core/api/request"
  8. "git.sxidc.com/go-framework/baize/framework/core/api/response"
  9. "git.sxidc.com/go-framework/baize/framework/core/application"
  10. "git.sxidc.com/go-framework/baize/framework/core/domain"
  11. "git.sxidc.com/go-framework/baize/framework/core/infrastructure"
  12. "git.sxidc.com/go-framework/baize/framework/core/infrastructure/database/operations"
  13. "git.sxidc.com/go-tools/utils/strutils"
  14. "sync"
  15. "testing"
  16. "time"
  17. )
  18. // curl -X POST -H "Content-Type: application/json" -d '{"message": "World"}' "http://localhost:30001/test/api/v1/task/start"
  19. // curl -X POST -H "Content-Type: application/json" -d '{"id": "27264de7f4c24345ab0cc49fe9cf57a6"}' "http://localhost:30001/test/api/v1/task/stop"
  20. type StartTaskJsonBody struct {
  21. Message string `json:"message"`
  22. }
  23. type StopTaskJsonBody struct {
  24. ID string `json:"id"`
  25. }
  26. type TestRunner struct {
  27. doneChanMap map[string]chan any
  28. doneChanMapMutex *sync.Mutex
  29. }
  30. func NewTestRunner() *TestRunner {
  31. return &TestRunner{
  32. doneChanMap: make(map[string]chan any),
  33. doneChanMapMutex: new(sync.Mutex),
  34. }
  35. }
  36. func DestroyTestRunner(runner *TestRunner) {
  37. if runner == nil {
  38. return
  39. }
  40. runner.doneChanMapMutex.Lock()
  41. defer runner.doneChanMapMutex.Unlock()
  42. for taskID, doneChan := range runner.doneChanMap {
  43. doneChan <- nil
  44. close(doneChan)
  45. delete(runner.doneChanMap, taskID)
  46. }
  47. }
  48. func (runner *TestRunner) Run(taskID string, ctx map[string]any) error {
  49. messageCtx, ok := ctx["message"]
  50. if !ok {
  51. return fmt.Errorf("message not found")
  52. }
  53. message := messageCtx.(string)
  54. doneChan := make(chan any)
  55. runner.doneChanMapMutex.Lock()
  56. runner.doneChanMap[taskID] = doneChan
  57. runner.doneChanMapMutex.Unlock()
  58. ticker := time.NewTicker(time.Second * 1)
  59. defer ticker.Stop()
  60. for {
  61. select {
  62. case <-ticker.C:
  63. fmt.Println("Hello " + message)
  64. case <-doneChan:
  65. return nil
  66. }
  67. }
  68. }
  69. func (runner *TestRunner) Restart(taskID string, ctx map[string]any) error {
  70. messageCtx, ok := ctx["message"]
  71. if !ok {
  72. return fmt.Errorf("message not found")
  73. }
  74. message := messageCtx.(string)
  75. doneChan := make(chan any)
  76. runner.doneChanMapMutex.Lock()
  77. runner.doneChanMap[taskID] = doneChan
  78. runner.doneChanMapMutex.Unlock()
  79. ticker := time.NewTicker(time.Second * 1)
  80. defer ticker.Stop()
  81. for {
  82. select {
  83. case <-ticker.C:
  84. fmt.Println("Hello " + message)
  85. case <-doneChan:
  86. return nil
  87. }
  88. }
  89. }
  90. func (runner *TestRunner) Stop(taskID string, ctx map[string]any) error {
  91. runner.doneChanMapMutex.Lock()
  92. doneChan, ok := runner.doneChanMap[taskID]
  93. if !ok {
  94. runner.doneChanMapMutex.Unlock()
  95. return fmt.Errorf("task not found")
  96. }
  97. doneChan <- nil
  98. delete(runner.doneChanMap, taskID)
  99. doneChan = nil
  100. runner.doneChanMapMutex.Unlock()
  101. return nil
  102. }
  103. const (
  104. taskGroup = "test"
  105. dbSchema = "test"
  106. )
  107. func TestTaskRunner(t *testing.T) {
  108. appInstance := baize.NewApplication(application.Config{
  109. ApiConfig: application.ApiConfig{
  110. UrlPrefix: "/test/api",
  111. Port: "30001",
  112. },
  113. InfrastructureConfig: application.InfrastructureConfig{
  114. Database: infrastructure.DatabaseConfig{
  115. Operations: &operations.Config{
  116. UserName: "test",
  117. Password: "123456",
  118. Address: "10.0.0.84",
  119. Port: "30432",
  120. Database: "test",
  121. MaxConnections: 20,
  122. MaxIdleConnections: 5,
  123. },
  124. },
  125. },
  126. })
  127. appInstance.Api().PrefixRouter().RegisterVersionedRouter("v1")
  128. runner := NewTestRunner()
  129. err := RestartTask(runner, &RestartTaskParams{
  130. Group: taskGroup,
  131. DBSchema: dbSchema,
  132. DBExecutor: appInstance.Infrastructure().DBExecutor(),
  133. })
  134. if err != nil {
  135. t.Fatal(err)
  136. }
  137. Bind(appInstance, &Simple{
  138. Schema: dbSchema,
  139. })
  140. v1Binder := binding.NewBinder(appInstance.ChooseRouter(api.RouterPrefix, "v1"), appInstance.Infrastructure())
  141. binding.PostBind(v1Binder, &binding.SimpleBindItem[string]{
  142. Path: "/task/start",
  143. SendResponseFunc: response.SendIDResponse,
  144. RequestParams: &StartTaskJsonBody{},
  145. ServiceFunc: func(c *api.Context, params request.Params, objects []domain.Object, i *infrastructure.Infrastructure) (string, error) {
  146. jsonBody, err := request.ToConcrete[*StartTaskJsonBody](params)
  147. if err != nil {
  148. return "", err
  149. }
  150. taskID, err := RunTask(runner, &RunTaskParams{
  151. Group: taskGroup,
  152. OperatorID: strutils.SimpleUUID(),
  153. Context: map[string]any{
  154. "message": jsonBody.Message,
  155. },
  156. DBSchema: dbSchema,
  157. DBExecutor: i.DBExecutor(),
  158. })
  159. if err != nil {
  160. return "", err
  161. }
  162. return taskID, nil
  163. },
  164. })
  165. binding.PostBind(v1Binder, &binding.SimpleBindItem[any]{
  166. Path: "/task/stop",
  167. SendResponseFunc: response.SendMsgResponse,
  168. RequestParams: &StopTaskJsonBody{},
  169. ServiceFunc: func(c *api.Context, params request.Params, objects []domain.Object, i *infrastructure.Infrastructure) (any, error) {
  170. jsonBody, err := request.ToConcrete[*StopTaskJsonBody](params)
  171. if err != nil {
  172. return nil, err
  173. }
  174. err = StopTask(&StopTaskParams{
  175. ID: jsonBody.ID,
  176. DBSchema: dbSchema,
  177. DBExecutor: i.DBExecutor(),
  178. })
  179. if err != nil {
  180. return nil, err
  181. }
  182. return nil, nil
  183. },
  184. })
  185. err = appInstance.Start()
  186. if err != nil {
  187. t.Fatal(err)
  188. }
  189. defer func(appInstance *application.App) {
  190. err := appInstance.Finish()
  191. if err != nil {
  192. t.Fatal(err)
  193. }
  194. }(appInstance)
  195. }