Переглянути джерело

初步完成redis消息队列

yjp 4 місяців тому
батько
коміт
dc796668dc

+ 9 - 1
framework/core/infrastructure/infrastructure.go

@@ -30,7 +30,7 @@ type CacheConfig struct {
 
 type MessageQueueConfig struct {
 	Redis  *RedisConfig `json:"redis" yaml:"redis"`
-	MaxLen int64
+	MaxLen int64        `json:"max_len" yaml:"max_len"`
 }
 
 type RedisConfig struct {
@@ -164,3 +164,11 @@ func (i Infrastructure) LocalCache() cache.Cache {
 func (i Infrastructure) RedisCache() cache.Cache {
 	return i.redisCache
 }
+
+// RedisMessageQueue 获取Redis消息队列基础设施
+// 参数: 无
+// 返回值:
+// - 消息队列基础设施的接口
+func (i Infrastructure) RedisMessageQueue() messageQueueCommon.MessageQueue {
+	return i.redisMessageQueue
+}

+ 10 - 3
framework/core/infrastructure/message_queue/common/common.go

@@ -6,19 +6,20 @@ package common
 // - topic: 主题
 // - data: 消息数据
 // 返回值: 无
-type MessageHandler func(queue MessageQueue, topic string, data []byte)
+type MessageHandler func(queue MessageQueue, topic string, data string)
 
 // MessageQueue 消息队列接口
 type MessageQueue interface {
 	Subscribe(group string, topic string, handler MessageHandler) error
 	UnSubscribe(group string, topic string) error
-	Publish(topic string, data []byte) error
+	Publish(topic string, data string) error
 }
 
 type Option func(options *Options)
 
 type Options struct {
-	MaxLen int64
+	MaxLen      int64
+	ConsumerNum int
 }
 
 func WithMaxLen(maxLen int64) Option {
@@ -26,3 +27,9 @@ func WithMaxLen(maxLen int64) Option {
 		options.MaxLen = maxLen
 	}
 }
+
+func WithConsumerNum(consumerNum int) Option {
+	return func(options *Options) {
+		options.ConsumerNum = consumerNum
+	}
+}

+ 1 - 1
framework/core/infrastructure/message_queue/message_queue.go

@@ -30,6 +30,6 @@ func UnSubscribe(messageQueue common.MessageQueue, group string, topic string) e
 // - data: 消息数据
 // 返回值:
 // - 错误
-func Publish(messageQueue common.MessageQueue, topic string, data []byte) error {
+func Publish(messageQueue common.MessageQueue, topic string, data string) error {
 	return messageQueue.Publish(topic, data)
 }

+ 14 - 4
framework/core/infrastructure/message_queue/redis/redis.go

@@ -32,6 +32,14 @@ func New(address string, userName string, password string, db int, opts ...commo
 		opt(options)
 	}
 
+	if options.MaxLen == 0 {
+		options.MaxLen = 1000
+	}
+
+	if options.ConsumerNum == 0 {
+		options.ConsumerNum = 10
+	}
+
 	return &MessageQueue{
 		redisOptions: &redisqueue.RedisOptions{
 			Addr:     address,
@@ -62,6 +70,7 @@ func Destroy(messageQueue *MessageQueue) {
 
 	for _, consumer := range messageQueue.consumerMap {
 		consumer.Shutdown()
+		consumer = nil
 	}
 
 	messageQueue.consumerMap = nil
@@ -80,16 +89,17 @@ func (messageQueue *MessageQueue) Subscribe(group string, topic string, handler
 
 	newConsumer, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{
 		GroupName:    group,
+		Concurrency:  messageQueue.options.ConsumerNum,
 		RedisOptions: messageQueue.redisOptions,
 	})
 	if err != nil {
 		return err
 	}
 
-	newConsumer.Register("topic", func(message *redisqueue.Message) error {
-		data, ok := message.Values[messageValuesDataKey].([]byte)
+	newConsumer.Register(topic, func(message *redisqueue.Message) error {
+		data, ok := message.Values[messageValuesDataKey].(string)
 		if !ok {
-			logger.GetInstance().Error(errors.New("消息数据格式错误"))
+			logger.GetInstance().Error(errors.New("数据不存在"))
 			return nil
 		}
 
@@ -120,7 +130,7 @@ func (messageQueue *MessageQueue) UnSubscribe(group string, topic string) error
 	return nil
 }
 
-func (messageQueue *MessageQueue) Publish(topic string, data []byte) error {
+func (messageQueue *MessageQueue) Publish(topic string, data string) error {
 	messageQueue.producerMapMutex.Lock()
 	defer messageQueue.producerMapMutex.Unlock()
 

+ 7 - 0
test/cache_test.go

@@ -20,6 +20,12 @@ func TestRedisCache(t *testing.T) {
 	if err != nil {
 		t.Fatalf("%+v\n", err)
 	}
+	defer func() {
+		err := redis.Destroy(redisCache)
+		if err != nil {
+			t.Fatalf("%+v\n", err)
+		}
+	}()
 
 	testRedisCache(t, redisCache)
 }
@@ -36,6 +42,7 @@ func TestCacheInfrastructure(t *testing.T) {
 			},
 		},
 	})
+	defer infrastructure.DestroyInfrastructure(i)
 
 	testCache(t, i.LocalCache())
 	testCache(t, i.RedisCache())

+ 106 - 0
test/message_queue_test.go

@@ -0,0 +1,106 @@
+package test
+
+import (
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure"
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue"
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common"
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/redis"
+	"github.com/pkg/errors"
+	"sync"
+	"testing"
+)
+
+func TestRedisMessageQueue(t *testing.T) {
+	redisMessageQueue := redis.New("localhost:30379", "", "mtyzxhc", 1, common.WithMaxLen(1000))
+	defer redis.Destroy(redisMessageQueue)
+
+	testRedisMessageQueue(t, redisMessageQueue)
+}
+
+func TestMessageQueueInfrastructure(t *testing.T) {
+	i := infrastructure.NewInfrastructure(infrastructure.Config{
+		MessageQueueConfig: infrastructure.MessageQueueConfig{
+			Redis: &infrastructure.RedisConfig{
+				Address:  "localhost:30379",
+				Password: "mtyzxhc",
+				DB:       1,
+			},
+			MaxLen: 1000,
+		},
+	})
+	defer infrastructure.DestroyInfrastructure(i)
+
+	testMessageQueue(t, i.RedisMessageQueue())
+}
+
+func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue) {
+	wg := sync.WaitGroup{}
+	wg.Add(2)
+
+	err := redisMessageQueue.Subscribe("test1", "test-redis",
+		func(queue common.MessageQueue, topic string, data string) {
+			if string(data) != "test-message" {
+				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
+			}
+
+			wg.Done()
+		})
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+
+	err = redisMessageQueue.Subscribe("test2", "test-redis",
+		func(queue common.MessageQueue, topic string, data string) {
+			if string(data) != "test-message" {
+				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
+			}
+
+			wg.Done()
+		})
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+
+	err = redisMessageQueue.Publish("test-redis", "test-message")
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+
+	wg.Wait()
+}
+
+func testMessageQueue(t *testing.T, redisMessageQueue common.MessageQueue) {
+	wg := sync.WaitGroup{}
+	wg.Add(2)
+
+	err := message_queue.Subscribe(redisMessageQueue, "test1", "test-redis",
+		func(queue common.MessageQueue, topic string, data string) {
+			if string(data) != "test-message" {
+				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
+			}
+
+			wg.Done()
+		})
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+
+	err = message_queue.Subscribe(redisMessageQueue, "test2", "test-redis",
+		func(queue common.MessageQueue, topic string, data string) {
+			if string(data) != "test-message" {
+				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
+			}
+
+			wg.Done()
+		})
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+
+	err = message_queue.Publish(redisMessageQueue, "test-redis", "test-message")
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+
+	wg.Wait()
+}