task_runner_test.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. package task_runner
  2. import (
  3. "fmt"
  4. "git.sxidc.com/go-framework/baize"
  5. "git.sxidc.com/go-framework/baize/framework/binding"
  6. "git.sxidc.com/go-framework/baize/framework/core/api"
  7. "git.sxidc.com/go-framework/baize/framework/core/api/request"
  8. "git.sxidc.com/go-framework/baize/framework/core/api/response"
  9. "git.sxidc.com/go-framework/baize/framework/core/application"
  10. "git.sxidc.com/go-framework/baize/framework/core/domain"
  11. "git.sxidc.com/go-framework/baize/framework/core/infrastructure"
  12. "git.sxidc.com/go-framework/baize/framework/core/infrastructure/database/operations"
  13. "git.sxidc.com/go-tools/utils/strutils"
  14. "sync"
  15. "testing"
  16. "time"
  17. )
  18. // curl -X POST -H "Content-Type: application/json" -d '{"message": "World"}' "http://localhost:30001/test/api/v1/task/start"
  19. // curl -X POST -H "Content-Type: application/json" -d '{"id": "0f89bc3ac6364280a535d86ce57a561b"}' "http://localhost:30001/test/api/v1/task/stop"
  20. // curl -X GET "http://localhost:30001/test/api/task/get?group=test&pageNo=0&pageSize=0"
  21. type StartTaskJsonBody struct {
  22. Message string `json:"message"`
  23. }
  24. type StopTaskJsonBody struct {
  25. ID string `json:"id"`
  26. }
  27. type TestRunner struct {
  28. doneChanMap map[string]chan any
  29. doneChanMapMutex *sync.Mutex
  30. }
  31. func NewTestRunner() *TestRunner {
  32. return &TestRunner{
  33. doneChanMap: make(map[string]chan any),
  34. doneChanMapMutex: new(sync.Mutex),
  35. }
  36. }
  37. func DestroyTestRunner(runner *TestRunner) {
  38. if runner == nil {
  39. return
  40. }
  41. runner.doneChanMapMutex.Lock()
  42. defer runner.doneChanMapMutex.Unlock()
  43. for taskID, doneChan := range runner.doneChanMap {
  44. doneChan <- nil
  45. close(doneChan)
  46. delete(runner.doneChanMap, taskID)
  47. }
  48. }
  49. func (runner *TestRunner) Run(taskID string, ctx map[string]any) (bool, error) {
  50. messageCtx, ok := ctx["message"]
  51. if !ok {
  52. return false, fmt.Errorf("message not found")
  53. }
  54. message := messageCtx.(string)
  55. doneChan := make(chan any)
  56. runner.doneChanMapMutex.Lock()
  57. runner.doneChanMap[taskID] = doneChan
  58. runner.doneChanMapMutex.Unlock()
  59. ticker := time.NewTicker(time.Second * 1)
  60. defer ticker.Stop()
  61. for {
  62. select {
  63. case <-ticker.C:
  64. fmt.Println("Hello " + message)
  65. case <-doneChan:
  66. return true, nil
  67. }
  68. }
  69. }
  70. func (runner *TestRunner) Restart(taskID string, ctx map[string]any) (bool, error) {
  71. messageCtx, ok := ctx["message"]
  72. if !ok {
  73. return false, fmt.Errorf("message not found")
  74. }
  75. message := messageCtx.(string)
  76. doneChan := make(chan any)
  77. runner.doneChanMapMutex.Lock()
  78. runner.doneChanMap[taskID] = doneChan
  79. runner.doneChanMapMutex.Unlock()
  80. ticker := time.NewTicker(time.Second * 1)
  81. defer ticker.Stop()
  82. for {
  83. select {
  84. case <-ticker.C:
  85. fmt.Println("Hello " + message)
  86. case <-doneChan:
  87. return true, nil
  88. }
  89. }
  90. }
  91. func (runner *TestRunner) Stop(taskID string, _ map[string]any) error {
  92. runner.doneChanMapMutex.Lock()
  93. doneChan, ok := runner.doneChanMap[taskID]
  94. if !ok {
  95. runner.doneChanMapMutex.Unlock()
  96. return fmt.Errorf("task not found")
  97. }
  98. doneChan <- nil
  99. delete(runner.doneChanMap, taskID)
  100. close(doneChan)
  101. doneChan = nil
  102. runner.doneChanMapMutex.Unlock()
  103. return nil
  104. }
  105. const (
  106. taskGroup = "test"
  107. dbSchema = "test"
  108. )
  109. func TestTaskRunner(t *testing.T) {
  110. appInstance := baize.NewApplication(application.Config{
  111. ApiConfig: application.ApiConfig{
  112. UrlPrefix: "/test/api",
  113. Port: "30001",
  114. },
  115. InfrastructureConfig: application.InfrastructureConfig{
  116. Database: infrastructure.DatabaseConfig{
  117. Operations: &operations.Config{
  118. UserName: "test",
  119. Password: "123456",
  120. Address: "10.0.0.84",
  121. Port: "30432",
  122. Database: "test",
  123. MaxConnections: 20,
  124. MaxIdleConnections: 5,
  125. },
  126. },
  127. },
  128. })
  129. appInstance.Api().PrefixRouter().RegisterVersionedRouter("v1")
  130. runner := NewTestRunner()
  131. defer DestroyTestRunner(runner)
  132. err := RestartTask(runner, &RestartTaskParams{
  133. Group: taskGroup,
  134. DBSchema: dbSchema,
  135. DBExecutor: appInstance.Infrastructure().DBExecutor(),
  136. })
  137. if err != nil {
  138. t.Fatal(err)
  139. }
  140. Bind(appInstance, &Simple{
  141. Schema: dbSchema,
  142. })
  143. v1Binder := binding.NewBinder(appInstance.ChooseRouter(api.RouterPrefix, "v1"), appInstance.Infrastructure())
  144. binding.PostBind(v1Binder, &binding.SimpleBindItem[string]{
  145. Path: "/task/start",
  146. SendResponseFunc: response.SendIDResponse,
  147. RequestParams: &StartTaskJsonBody{},
  148. ServiceFunc: func(c *api.Context, params request.Params, objects []domain.Object, i *infrastructure.Infrastructure) (string, error) {
  149. jsonBody, err := request.ToConcrete[*StartTaskJsonBody](params)
  150. if err != nil {
  151. return "", err
  152. }
  153. taskID, err := RunTask(runner, &RunTaskParams{
  154. Group: taskGroup,
  155. OperatorID: strutils.SimpleUUID(),
  156. Context: map[string]any{
  157. "message": jsonBody.Message,
  158. },
  159. DBSchema: dbSchema,
  160. DBExecutor: i.DBExecutor(),
  161. })
  162. if err != nil {
  163. return "", err
  164. }
  165. return taskID, nil
  166. },
  167. })
  168. binding.PostBind(v1Binder, &binding.SimpleBindItem[any]{
  169. Path: "/task/stop",
  170. SendResponseFunc: response.SendMsgResponse,
  171. RequestParams: &StopTaskJsonBody{},
  172. ServiceFunc: func(c *api.Context, params request.Params, objects []domain.Object, i *infrastructure.Infrastructure) (any, error) {
  173. jsonBody, err := request.ToConcrete[*StopTaskJsonBody](params)
  174. if err != nil {
  175. return nil, err
  176. }
  177. err = StopTask(&StopTaskParams{
  178. ID: jsonBody.ID,
  179. DBSchema: dbSchema,
  180. DBExecutor: i.DBExecutor(),
  181. })
  182. if err != nil {
  183. return nil, err
  184. }
  185. return nil, nil
  186. },
  187. })
  188. err = appInstance.Start()
  189. if err != nil {
  190. t.Fatal(err)
  191. }
  192. defer func(appInstance *application.App) {
  193. err := appInstance.Finish()
  194. if err != nil {
  195. t.Fatal(err)
  196. }
  197. }(appInstance)
  198. }