infrastructure.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. package infrastructure
  2. import (
  3. "git.sxidc.com/go-framework/baize/framework/core/infrastructure/cache"
  4. "git.sxidc.com/go-framework/baize/framework/core/infrastructure/cache/local"
  5. redisCache "git.sxidc.com/go-framework/baize/framework/core/infrastructure/cache/redis"
  6. "git.sxidc.com/go-framework/baize/framework/core/infrastructure/database"
  7. "git.sxidc.com/go-framework/baize/framework/core/infrastructure/database/data_service"
  8. "git.sxidc.com/go-framework/baize/framework/core/infrastructure/database/operations"
  9. messageQueueCommon "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common"
  10. mqttMessageQueue "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/mqtt"
  11. redisMessageQueue "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/redis"
  12. "git.sxidc.com/go-tools/utils/strutils"
  13. )
  14. type Config struct {
  15. DatabaseConfig
  16. CacheConfig
  17. MessageQueueConfig
  18. }
  19. type DatabaseConfig struct {
  20. Operations *operations.Config `json:"operations" yaml:"operations"`
  21. DataService *data_service.Config `json:"data_service" yaml:"data_service"`
  22. }
  23. type CacheConfig struct {
  24. Namespace string `json:"namespace" yaml:"namespace"`
  25. Redis *RedisConfig `json:"redis" yaml:"redis"`
  26. }
  27. type MessageQueueConfig struct {
  28. Redis *RedisConfig `json:"redis" yaml:"redis"`
  29. Mqtt *MqttConfig `json:"mqtt" yaml:"mqtt"`
  30. }
  31. type RedisConfig struct {
  32. Address string `json:"address" yaml:"address"`
  33. UserName string `json:"user_name" yaml:"user_name"`
  34. Password string `json:"password" yaml:"password"`
  35. DB int `json:"db" yaml:"db"`
  36. MaxLen int64 `json:"max_len" yaml:"max_len"`
  37. ConsumerNum int `json:"consumer_num" yaml:"consumer_num"`
  38. VisibilityAgainSec int `json:"visibility_again_sec" yaml:"visibility_again_sec"`
  39. InFlightCount int `json:"in_flight_count" yaml:"in_flight_count"`
  40. }
  41. type MqttConfig struct {
  42. Address string `json:"address" yaml:"address"`
  43. UserName string `json:"user_name" yaml:"user_name"`
  44. Password string `json:"password" yaml:"password"`
  45. }
  46. // Infrastructure 基础设施结构
  47. type Infrastructure struct {
  48. dbExecutor database.Executor
  49. localCache cache.Cache
  50. redisCache cache.Cache
  51. redisMessageQueue messageQueueCommon.MessageQueue
  52. mqttMessageQueue messageQueueCommon.MessageQueue
  53. }
  54. func NewInfrastructure(config Config) *Infrastructure {
  55. i := new(Infrastructure)
  56. // 数据库执行器多选一
  57. if config.DatabaseConfig.Operations != nil {
  58. op, err := operations.NewOperations(config.DatabaseConfig.Operations)
  59. if err != nil {
  60. panic("创建数据库操作失败: " + err.Error())
  61. }
  62. i.dbExecutor = op
  63. } else if config.DatabaseConfig.DataService != nil {
  64. executor, err := data_service.NewExecutor(config.DatabaseConfig.DataService)
  65. if err != nil {
  66. panic("创建数据服务执行器失败: " + err.Error())
  67. }
  68. i.dbExecutor = executor
  69. }
  70. // 初始化缓存
  71. if strutils.IsStringNotEmpty(config.CacheConfig.Namespace) {
  72. namespace := config.CacheConfig.Namespace
  73. i.localCache = local.New(namespace)
  74. if config.CacheConfig.Redis != nil {
  75. redisConf := config.CacheConfig.Redis
  76. newRedisCache, err := redisCache.New(redisConf.Address, redisConf.UserName, redisConf.Password, redisConf.DB, namespace)
  77. if err != nil {
  78. panic("初始化Redis缓存失败: " + err.Error())
  79. }
  80. i.redisCache = newRedisCache
  81. }
  82. }
  83. // 初始化Redis消息队列
  84. if config.MessageQueueConfig.Redis != nil {
  85. redisConf := config.MessageQueueConfig.Redis
  86. newRedisMessageQueue := redisMessageQueue.New(redisConf.Address, redisConf.UserName, redisConf.Password, redisConf.DB,
  87. redisMessageQueue.WithMaxLen(redisConf.MaxLen),
  88. redisMessageQueue.WithConsumerNum(redisConf.ConsumerNum),
  89. redisMessageQueue.WithInFlightCount(redisConf.InFlightCount),
  90. redisMessageQueue.WithVisibilityAgainSec(redisConf.VisibilityAgainSec))
  91. i.redisMessageQueue = newRedisMessageQueue
  92. }
  93. // 初始化Mqtt消息队列
  94. if config.MessageQueueConfig.Mqtt != nil {
  95. mqttConf := config.MessageQueueConfig.Mqtt
  96. newMqttMessageQueue, err := mqttMessageQueue.New(mqttConf.Address, mqttConf.UserName, mqttConf.Password)
  97. if err != nil {
  98. panic("初始化MQTT消息队列失败: " + err.Error())
  99. }
  100. i.mqttMessageQueue = newMqttMessageQueue
  101. }
  102. return i
  103. }
  104. func DestroyInfrastructure(i *Infrastructure) {
  105. if i == nil {
  106. return
  107. }
  108. if i.redisMessageQueue != nil {
  109. redisMessageQueue.Destroy(i.redisMessageQueue.(*redisMessageQueue.MessageQueue))
  110. }
  111. if i.redisCache != nil {
  112. err := redisCache.Destroy(i.redisCache.(*redisCache.Cache))
  113. if err != nil {
  114. panic("销毁Redis缓存失败: " + err.Error())
  115. }
  116. }
  117. if i.localCache != nil {
  118. local.Destroy(i.localCache.(*local.Cache))
  119. }
  120. if i.dbExecutor != nil {
  121. switch dbExecutor := i.dbExecutor.(type) {
  122. case *operations.Operations:
  123. err := operations.DestroyOperation(dbExecutor)
  124. if err != nil {
  125. panic("销毁数据库操作失败: " + err.Error())
  126. }
  127. case *data_service.Executor:
  128. err := data_service.DestroyExecutor(dbExecutor)
  129. if err != nil {
  130. panic("销毁数据库操作失败: " + err.Error())
  131. }
  132. default:
  133. panic("不支持的数据库执行器类型")
  134. }
  135. }
  136. return
  137. }
  138. // DBExecutor 获取数据库基础设施
  139. // 参数: 无
  140. // 返回值:
  141. // - 数据库基础设施的接口
  142. func (i Infrastructure) DBExecutor() database.Executor {
  143. return i.dbExecutor
  144. }
  145. // LocalCache 获取本地缓存基础设施
  146. // 参数: 无
  147. // 返回值:
  148. // - 缓存基础设施的接口
  149. func (i Infrastructure) LocalCache() cache.Cache {
  150. return i.localCache
  151. }
  152. // RedisCache 获取Redis缓存基础设施
  153. // 参数: 无
  154. // 返回值:
  155. // - 缓存基础设施的接口
  156. func (i Infrastructure) RedisCache() cache.Cache {
  157. return i.redisCache
  158. }
  159. // RedisMessageQueue 获取Redis消息队列基础设施
  160. // 参数: 无
  161. // 返回值:
  162. // - 消息队列基础设施的接口
  163. func (i Infrastructure) RedisMessageQueue() messageQueueCommon.MessageQueue {
  164. return i.redisMessageQueue
  165. }
  166. // MqttMessageQueue 获取Mqtt消息队列基础设施
  167. // 参数: 无
  168. // 返回值:
  169. // - 消息队列基础设施的接口
  170. func (i Infrastructure) MqttMessageQueue() messageQueueCommon.MessageQueue {
  171. return i.mqttMessageQueue
  172. }