yjp преди 4 месеца
родител
ревизия
415f52e230
променени са 3 файла, в които са добавени 23 реда и са изтрити 8 реда
  1. 5 3
      framework/core/infrastructure/infrastructure.go
  2. 1 0
      framework/core/infrastructure/message_queue/redis/redis.go
  3. 17 5
      test/message_queue_test.go

+ 5 - 3
framework/core/infrastructure/infrastructure.go

@@ -29,8 +29,9 @@ type CacheConfig struct {
 }
 
 type MessageQueueConfig struct {
-	Redis  *RedisConfig `json:"redis" yaml:"redis"`
-	MaxLen int64        `json:"max_len" yaml:"max_len"`
+	Redis       *RedisConfig `json:"redis" yaml:"redis"`
+	MaxLen      int64        `json:"max_len" yaml:"max_len"`
+	ConsumerNum int          `json:"consumer_num" yaml:"consumer_num"`
 }
 
 type RedisConfig struct {
@@ -92,7 +93,8 @@ func NewInfrastructure(config Config) *Infrastructure {
 		maxLen := config.MessageQueueConfig.MaxLen
 
 		newRedisMessageQueue := redisMessageQueue.New(redisConf.Address, redisConf.UserName, redisConf.Password, redisConf.DB,
-			messageQueueCommon.WithMaxLen(maxLen))
+			messageQueueCommon.WithMaxLen(maxLen),
+			messageQueueCommon.WithConsumerNum(config.ConsumerNum))
 
 		i.redisMessageQueue = newRedisMessageQueue
 	}

+ 1 - 0
framework/core/infrastructure/message_queue/redis/redis.go

@@ -90,6 +90,7 @@ func (messageQueue *MessageQueue) Subscribe(group string, topic string, handler
 	newConsumer, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{
 		GroupName:    group,
 		Concurrency:  messageQueue.options.ConsumerNum,
+		RedisClient:  nil,
 		RedisOptions: messageQueue.redisOptions,
 	})
 	if err != nil {

+ 17 - 5
test/message_queue_test.go

@@ -1,6 +1,7 @@
 package test
 
 import (
+	"fmt"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common"
@@ -11,7 +12,9 @@ import (
 )
 
 func TestRedisMessageQueue(t *testing.T) {
-	redisMessageQueue := redis.New("localhost:30379", "", "mtyzxhc", 1, common.WithMaxLen(1000))
+	redisMessageQueue := redis.New("localhost:30379", "", "mtyzxhc", 1,
+		common.WithMaxLen(1000),
+		common.WithConsumerNum(1))
 	defer redis.Destroy(redisMessageQueue)
 
 	testRedisMessageQueue(t, redisMessageQueue)
@@ -25,7 +28,8 @@ func TestMessageQueueInfrastructure(t *testing.T) {
 				Password: "mtyzxhc",
 				DB:       1,
 			},
-			MaxLen: 1000,
+			MaxLen:      1000,
+			ConsumerNum: 1,
 		},
 	})
 	defer infrastructure.DestroyInfrastructure(i)
@@ -43,6 +47,8 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
 
+			fmt.Println("test1 consumed")
+
 			wg.Done()
 		})
 	if err != nil {
@@ -55,6 +61,8 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
 
+			fmt.Println("test2 consumed")
+
 			wg.Done()
 		})
 	if err != nil {
@@ -73,31 +81,35 @@ func testMessageQueue(t *testing.T, redisMessageQueue common.MessageQueue) {
 	wg := sync.WaitGroup{}
 	wg.Add(2)
 
-	err := message_queue.Subscribe(redisMessageQueue, "test1", "test-redis",
+	err := message_queue.Subscribe(redisMessageQueue, "test1", "test-message-queue",
 		func(queue common.MessageQueue, topic string, data string) {
 			if string(data) != "test-message" {
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
 
+			fmt.Println("test1 consumed")
+
 			wg.Done()
 		})
 	if err != nil {
 		t.Fatalf("%+v\n", err)
 	}
 
-	err = message_queue.Subscribe(redisMessageQueue, "test2", "test-redis",
+	err = message_queue.Subscribe(redisMessageQueue, "test2", "test-message-queue",
 		func(queue common.MessageQueue, topic string, data string) {
 			if string(data) != "test-message" {
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
 
+			fmt.Println("test2 consumed")
+
 			wg.Done()
 		})
 	if err != nil {
 		t.Fatalf("%+v\n", err)
 	}
 
-	err = message_queue.Publish(redisMessageQueue, "test-redis", "test-message")
+	err = message_queue.Publish(redisMessageQueue, "test-message-queue", "test-message")
 	if err != nil {
 		t.Fatalf("%+v\n", err)
 	}