yjp 3 kuukautta sitten
vanhempi
commit
000d178b93

+ 9 - 10
framework/core/infrastructure/infrastructure.go

@@ -29,16 +29,16 @@ type CacheConfig struct {
 }
 
 type MessageQueueConfig struct {
-	Redis       *RedisConfig `json:"redis" yaml:"redis"`
-	MaxLen      int64        `json:"max_len" yaml:"max_len"`
-	ConsumerNum int          `json:"consumer_num" yaml:"consumer_num"`
+	Redis *RedisConfig `json:"redis" yaml:"redis"`
 }
 
 type RedisConfig struct {
-	Address  string `json:"address" yaml:"address"`
-	UserName string `json:"user_name" yaml:"user_name"`
-	Password string `json:"password" yaml:"password"`
-	DB       int    `json:"db" yaml:"db"`
+	Address     string `json:"address" yaml:"address"`
+	UserName    string `json:"user_name" yaml:"user_name"`
+	Password    string `json:"password" yaml:"password"`
+	DB          int    `json:"db" yaml:"db"`
+	MaxLen      int64  `json:"max_len" yaml:"max_len"`
+	ConsumerNum int    `json:"consumer_num" yaml:"consumer_num"`
 }
 
 // Infrastructure 基础设施结构
@@ -90,11 +90,10 @@ func NewInfrastructure(config Config) *Infrastructure {
 	// 初始化Redis消息队列
 	if config.MessageQueueConfig.Redis != nil {
 		redisConf := config.MessageQueueConfig.Redis
-		maxLen := config.MessageQueueConfig.MaxLen
 
 		newRedisMessageQueue := redisMessageQueue.New(redisConf.Address, redisConf.UserName, redisConf.Password, redisConf.DB,
-			messageQueueCommon.WithMaxLen(maxLen),
-			messageQueueCommon.WithConsumerNum(config.ConsumerNum))
+			redisMessageQueue.WithMaxLen(redisConf.MaxLen),
+			redisMessageQueue.WithConsumerNum(redisConf.ConsumerNum))
 
 		i.redisMessageQueue = newRedisMessageQueue
 	}

+ 0 - 19
framework/core/infrastructure/message_queue/common/common.go

@@ -14,22 +14,3 @@ type MessageQueue interface {
 	UnSubscribe(group string, topic string) error
 	Publish(topic string, data string) error
 }
-
-type Option func(options *Options)
-
-type Options struct {
-	MaxLen      int64
-	ConsumerNum int
-}
-
-func WithMaxLen(maxLen int64) Option {
-	return func(options *Options) {
-		options.MaxLen = maxLen
-	}
-}
-
-func WithConsumerNum(consumerNum int) Option {
-	return func(options *Options) {
-		options.ConsumerNum = consumerNum
-	}
-}

+ 22 - 3
framework/core/infrastructure/message_queue/redis/redis.go

@@ -13,6 +13,25 @@ const (
 	messageValuesDataKey = "data"
 )
 
+type Option func(options *Options)
+
+type Options struct {
+	MaxLen      int64
+	ConsumerNum int
+}
+
+func WithMaxLen(maxLen int64) Option {
+	return func(options *Options) {
+		options.MaxLen = maxLen
+	}
+}
+
+func WithConsumerNum(consumerNum int) Option {
+	return func(options *Options) {
+		options.ConsumerNum = consumerNum
+	}
+}
+
 type MessageQueue struct {
 	redisOptions *redisqueue.RedisOptions
 
@@ -22,11 +41,11 @@ type MessageQueue struct {
 	consumerMapMutex *sync.Mutex
 	consumerMap      map[string]*redisqueue.Consumer
 
-	options *common.Options
+	options *Options
 }
 
-func New(address string, userName string, password string, db int, opts ...common.Option) *MessageQueue {
-	options := new(common.Options)
+func New(address string, userName string, password string, db int, opts ...Option) *MessageQueue {
+	options := new(Options)
 
 	for _, opt := range opts {
 		opt(options)

+ 7 - 7
test/message_queue_test.go

@@ -13,8 +13,8 @@ import (
 
 func TestRedisMessageQueue(t *testing.T) {
 	redisMessageQueue := redis.New("localhost:30379", "", "mtyzxhc", 1,
-		common.WithMaxLen(1000),
-		common.WithConsumerNum(1))
+		redis.WithMaxLen(1000),
+		redis.WithConsumerNum(1))
 	defer redis.Destroy(redisMessageQueue)
 
 	testRedisMessageQueue(t, redisMessageQueue)
@@ -24,12 +24,12 @@ func TestMessageQueueInfrastructure(t *testing.T) {
 	i := infrastructure.NewInfrastructure(infrastructure.Config{
 		MessageQueueConfig: infrastructure.MessageQueueConfig{
 			Redis: &infrastructure.RedisConfig{
-				Address:  "localhost:30379",
-				Password: "mtyzxhc",
-				DB:       1,
+				Address:     "localhost:30379",
+				Password:    "mtyzxhc",
+				DB:          1,
+				MaxLen:      1000,
+				ConsumerNum: 1,
 			},
-			MaxLen:      1000,
-			ConsumerNum: 1,
 		},
 	})
 	defer infrastructure.DestroyInfrastructure(i)