瀏覽代碼

添加基础设施接口

yjp 4 月之前
父節點
當前提交
28c538786a

+ 3 - 2
framework/core/application/config.go

@@ -21,8 +21,9 @@ type ApiConfig struct {
 }
 
 type InfrastructureConfig struct {
-	Database infrastructure.DatabaseConfig `json:"database" yaml:"database"`
-	Cache    infrastructure.CacheConfig    `json:"cache" yaml:"cache"`
+	Database     infrastructure.DatabaseConfig     `json:"database" yaml:"database"`
+	Cache        infrastructure.CacheConfig        `json:"cache" yaml:"cache"`
+	MessageQueue infrastructure.MessageQueueConfig `json:"message_queue" yaml:"message_queue"`
 }
 
 func LoadFromJsonFile(jsonFilePath string) (Config, error) {

+ 31 - 7
framework/core/infrastructure/infrastructure.go

@@ -3,16 +3,19 @@ package infrastructure
 import (
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/cache"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/cache/local"
-	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/cache/redis"
+	redisCache "git.sxidc.com/go-framework/baize/framework/core/infrastructure/cache/redis"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/database"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/database/data_service"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/database/operations"
+	messageQueueCommon "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common"
+	redisMessageQueue "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/redis"
 	"git.sxidc.com/go-tools/utils/strutils"
 )
 
 type Config struct {
 	DatabaseConfig
 	CacheConfig
+	MessageQueueConfig
 }
 
 type DatabaseConfig struct {
@@ -25,6 +28,11 @@ type CacheConfig struct {
 	Redis     *RedisConfig `json:"redis" yaml:"redis"`
 }
 
+type MessageQueueConfig struct {
+	Redis  *RedisConfig `json:"redis" yaml:"redis"`
+	MaxLen int64
+}
+
 type RedisConfig struct {
 	Address  string `json:"address" yaml:"address"`
 	UserName string `json:"user_name" yaml:"user_name"`
@@ -34,9 +42,10 @@ type RedisConfig struct {
 
 // Infrastructure 基础设施结构
 type Infrastructure struct {
-	dbExecutor database.Executor
-	localCache cache.Cache
-	redisCache cache.Cache
+	dbExecutor        database.Executor
+	localCache        cache.Cache
+	redisCache        cache.Cache
+	redisMessageQueue messageQueueCommon.MessageQueue
 }
 
 func NewInfrastructure(config Config) *Infrastructure {
@@ -68,15 +77,26 @@ func NewInfrastructure(config Config) *Infrastructure {
 		if config.CacheConfig.Redis != nil {
 			redisConf := config.CacheConfig.Redis
 
-			redisCache, err := redis.New(redisConf.Address, redisConf.UserName, redisConf.Password, redisConf.DB, namespace)
+			newRedisCache, err := redisCache.New(redisConf.Address, redisConf.UserName, redisConf.Password, redisConf.DB, namespace)
 			if err != nil {
 				panic("初始化Redis缓存失败: " + err.Error())
 			}
 
-			i.redisCache = redisCache
+			i.redisCache = newRedisCache
 		}
 	}
 
+	// 初始化Redis消息队列
+	if config.MessageQueueConfig.Redis != nil {
+		redisConf := config.MessageQueueConfig.Redis
+		maxLen := config.MessageQueueConfig.MaxLen
+
+		newRedisMessageQueue := redisMessageQueue.New(redisConf.Address, redisConf.UserName, redisConf.Password, redisConf.DB,
+			messageQueueCommon.WithMaxLen(maxLen))
+
+		i.redisMessageQueue = newRedisMessageQueue
+	}
+
 	return i
 }
 
@@ -85,8 +105,12 @@ func DestroyInfrastructure(i *Infrastructure) {
 		return
 	}
 
+	if i.redisMessageQueue != nil {
+		redisMessageQueue.Destroy(i.redisMessageQueue.(*redisMessageQueue.MessageQueue))
+	}
+
 	if i.redisCache != nil {
-		err := redis.Destroy(i.redisCache.(*redis.Cache))
+		err := redisCache.Destroy(i.redisCache.(*redisCache.Cache))
 		if err != nil {
 			panic("销毁Redis缓存失败: " + err.Error())
 		}

+ 12 - 0
framework/core/infrastructure/message_queue/common/common.go

@@ -14,3 +14,15 @@ type MessageQueue interface {
 	UnSubscribe(group string, topic string) error
 	Publish(topic string, data []byte) error
 }
+
+type Option func(options *Options)
+
+type Options struct {
+	MaxLen int64
+}
+
+func WithMaxLen(maxLen int64) Option {
+	return func(options *Options) {
+		options.MaxLen = maxLen
+	}
+}

+ 0 - 13
framework/core/infrastructure/message_queue/redis/options.go

@@ -1,13 +0,0 @@
-package redis
-
-type Option func(options *Options)
-
-type Options struct {
-	MaxLen int64
-}
-
-func WithMaxLen(maxLen int64) Option {
-	return func(options *Options) {
-		options.MaxLen = maxLen
-	}
-}

+ 4 - 4
framework/core/infrastructure/message_queue/redis/redis.go

@@ -22,11 +22,11 @@ type MessageQueue struct {
 	consumerMapMutex *sync.Mutex
 	consumerMap      map[string]*redisqueue.Consumer
 
-	options *Options
+	options *common.Options
 }
 
-func NewMessageQueue(address string, userName string, password string, db int, opts ...Option) *MessageQueue {
-	options := new(Options)
+func New(address string, userName string, password string, db int, opts ...common.Option) *MessageQueue {
+	options := new(common.Options)
 
 	for _, opt := range opts {
 		opt(options)
@@ -47,7 +47,7 @@ func NewMessageQueue(address string, userName string, password string, db int, o
 	}
 }
 
-func DestroyMessageQueue(messageQueue *MessageQueue) {
+func Destroy(messageQueue *MessageQueue) {
 	if messageQueue.redisOptions == nil {
 		return
 	}