task_manager.go 5.6 KB


  1. package task_manager
  2. import (
  3. "encoding/json"
  4. "git.sxidc.com/go-framework/baize/convenient/domain/task_manager/task"
  5. "git.sxidc.com/go-framework/baize/framework/binding"
  6. "git.sxidc.com/go-framework/baize/framework/core/api"
  7. "git.sxidc.com/go-framework/baize/framework/core/application"
  8. "git.sxidc.com/go-framework/baize/framework/core/domain"
  9. "git.sxidc.com/go-framework/baize/framework/core/infrastructure/database"
  10. "git.sxidc.com/go-framework/baize/framework/core/infrastructure/database/sql"
  11. "git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger"
  12. "git.sxidc.com/go-tools/utils/strutils"
  13. "github.com/pkg/errors"
  14. )
  15. type RunTaskParams struct {
  16. Group string
  17. OperatorID string
  18. Context map[string]any
  19. DBSchema string
  20. DBExecutor database.Executor
  21. }
  22. func (params *RunTaskParams) check() error {
  23. if strutils.IsStringEmpty(params.Group) {
  24. return errors.New("没有传递任务组")
  25. }
  26. if strutils.IsStringEmpty(params.OperatorID) {
  27. return errors.New("没有传递操作人ID")
  28. }
  29. if strutils.IsStringEmpty(params.DBSchema) {
  30. return errors.New("没有传递数据库schema")
  31. }
  32. if params.DBExecutor == nil {
  33. return errors.New("没有传递数据库执行器")
  34. }
  35. return nil
  36. }
  37. func RunTask(runner task.Runner, params *RunTaskParams) error {
  38. ctxJsonBytes, err := json.Marshal(params.Context)
  39. if err != nil {
  40. return err
  41. }
  42. ctxJsonStr := string(ctxJsonBytes)
  43. // 创建任务,任务状态为已创建
  44. taskEntity := &task.Entity{
  45. Group: params.Group,
  46. Context: ctxJsonStr,
  47. CreateUserID: params.OperatorID,
  48. }
  49. err = createTaskDB(taskEntity, params.DBSchema, params.DBExecutor)
  50. if err != nil {
  51. return err
  52. }
  53. go runTask(taskEntity, runner, params.DBSchema, params.DBExecutor, func(ctx map[string]any, runner task.Runner) error {
  54. return runner.Run(ctx)
  55. })
  56. return nil
  57. }
  58. type RestartTaskParams struct {
  59. Group string
  60. DBSchema string
  61. DBExecutor database.Executor
  62. }
  63. func (params *RestartTaskParams) check() error {
  64. if strutils.IsStringEmpty(params.Group) {
  65. return errors.New("没有传递任务组")
  66. }
  67. if strutils.IsStringEmpty(params.DBSchema) {
  68. return errors.New("没有传递数据库schema")
  69. }
  70. if params.DBExecutor == nil {
  71. return errors.New("没有传递数据库执行器")
  72. }
  73. return nil
  74. }
  75. func RestartTask(runner task.Runner, params *RestartTaskParams) error {
  76. runningResults, _, err := database.Query(params.DBExecutor, &sql.QueryExecuteParams{
  77. TableName: domain.TableName(params.DBSchema, &task.Entity{}),
  78. Conditions: sql.NewConditions().Equal(task.ColumnGroup, params.Group).
  79. Equal(task.ColumnStatus, task.StatusCodeRunning),
  80. PageNo: 0,
  81. PageSize: 0,
  82. })
  83. if err != nil {
  84. return err
  85. }
  86. taskEntities := make([]task.Entity, 0)
  87. err = sql.ParseSqlResult(runningResults, &taskEntities)
  88. if err != nil {
  89. return err
  90. }
  91. for _, taskEntity := range taskEntities {
  92. go runTask(&taskEntity, runner, params.DBSchema, params.DBExecutor, func(ctx map[string]any, runner task.Runner) error {
  93. return runner.Restart(ctx)
  94. })
  95. }
  96. return nil
  97. }
  98. func createTaskDB(taskEntity *task.Entity, dbSchema string, dbExecutor database.Executor) error {
  99. taskEntity.SetStatusCreated()
  100. err := domain.CheckWhen(taskEntity, taskEntity.GetFieldMap(), "run")
  101. if err != nil {
  102. return err
  103. }
  104. err = database.InsertEntity(dbExecutor, domain.TableName(dbSchema, taskEntity), taskEntity)
  105. if err != nil {
  106. return err
  107. }
  108. return nil
  109. }
  110. func updateTaskStatusRunningDB(taskEntity *task.Entity, dbSchema string, dbExecutor database.Executor) error {
  111. taskEntity.SetStatusRunning()
  112. err := database.UpdateEntity(dbExecutor, domain.TableName(dbSchema, taskEntity), taskEntity)
  113. if err != nil {
  114. return err
  115. }
  116. return nil
  117. }
  118. func updateTaskStatusCompleteDB(taskEntity *task.Entity, dbSchema string, dbExecutor database.Executor) error {
  119. taskEntity.SetStatusCompleted()
  120. err := database.UpdateEntity(dbExecutor, domain.TableName(dbSchema, taskEntity), taskEntity)
  121. if err != nil {
  122. return err
  123. }
  124. return nil
  125. }
  126. func updateTaskStatusErrorDB(taskEntity *task.Entity, errMsg string, dbSchema string, dbExecutor database.Executor) error {
  127. taskEntity.SetStatusError(errMsg)
  128. err := database.UpdateEntity(dbExecutor, domain.TableName(dbSchema, taskEntity), taskEntity)
  129. if err != nil {
  130. return err
  131. }
  132. return nil
  133. }
  134. func runTask(taskEntity *task.Entity, runner task.Runner, dbSchema string, dbExecutor database.Executor, executeFunc func(ctx map[string]any, runner task.Runner) error) {
  135. ctx := make(map[string]any)
  136. err := json.Unmarshal([]byte(taskEntity.Context), &ctx)
  137. if err != nil {
  138. err = updateTaskStatusErrorDB(taskEntity, err.Error(), dbSchema, dbExecutor)
  139. if err != nil {
  140. logger.GetInstance().Error("解析任务上下文失败 ", err)
  141. return
  142. }
  143. return
  144. }
  145. err = updateTaskStatusRunningDB(taskEntity, dbSchema, dbExecutor)
  146. if err != nil {
  147. logger.GetInstance().Error("更新任务运行状态失败 ", err)
  148. return
  149. }
  150. // 执行任务
  151. err = executeFunc(ctx, runner)
  152. if err != nil {
  153. err = updateTaskStatusErrorDB(taskEntity, err.Error(), dbSchema, dbExecutor)
  154. if err != nil {
  155. logger.GetInstance().Error("更新任务错误状态失败 ", err)
  156. return
  157. }
  158. return
  159. }
  160. err = updateTaskStatusCompleteDB(taskEntity, dbSchema, dbExecutor)
  161. if err != nil {
  162. logger.GetInstance().Error("更新任务完成状态失败 ", err)
  163. return
  164. }
  165. }
  166. // Simple Bind参数
  167. type Simple struct {
  168. // schema
  169. Schema string
  170. }
  171. func (simple *Simple) bind(binder *binding.Binder) {
  172. // TODO 完成查询接口, 编写Info,数据库YAML
  173. }
  174. func Bind(app *application.App, simple *Simple) {
  175. binder := binding.NewBinder(app.Api().ChooseRouter(api.RouterPrefix, ""), app.Infrastructure())
  176. simple.bind(binder)
  177. }