فهرست منبع

redis消息队列添加参数,可以配置重试机制

yjp 5 ماه پیش
والد
کامیت
b7bad399af

+ 11 - 7
framework/core/infrastructure/infrastructure.go

@@ -35,12 +35,14 @@ type MessageQueueConfig struct {
 }
 
 type RedisConfig struct {
-	Address     string `json:"address" yaml:"address"`
-	UserName    string `json:"user_name" yaml:"user_name"`
-	Password    string `json:"password" yaml:"password"`
-	DB          int    `json:"db" yaml:"db"`
-	MaxLen      int64  `json:"max_len" yaml:"max_len"`
-	ConsumerNum int    `json:"consumer_num" yaml:"consumer_num"`
+	Address            string `json:"address" yaml:"address"`
+	UserName           string `json:"user_name" yaml:"user_name"`
+	Password           string `json:"password" yaml:"password"`
+	DB                 int    `json:"db" yaml:"db"`
+	MaxLen             int64  `json:"max_len" yaml:"max_len"`
+	ConsumerNum        int    `json:"consumer_num" yaml:"consumer_num"`
+	VisibilityAgainSec int    `json:"visibility_again_sec" yaml:"visibility_again_sec"`
+	InFlightCount      int    `yaml:"in_flight_count" yaml:"in_flight_count"`
 }
 
 type MqttConfig struct {
@@ -102,7 +104,9 @@ func NewInfrastructure(config Config) *Infrastructure {
 
 		newRedisMessageQueue := redisMessageQueue.New(redisConf.Address, redisConf.UserName, redisConf.Password, redisConf.DB,
 			redisMessageQueue.WithMaxLen(redisConf.MaxLen),
-			redisMessageQueue.WithConsumerNum(redisConf.ConsumerNum))
+			redisMessageQueue.WithConsumerNum(redisConf.ConsumerNum),
+			redisMessageQueue.WithInFlightCount(redisConf.InFlightCount),
+			redisMessageQueue.WithVisibilityAgainSec(redisConf.VisibilityAgainSec))
 
 		i.redisMessageQueue = newRedisMessageQueue
 	}

+ 30 - 6
framework/core/infrastructure/message_queue/redis/redis.go

@@ -1,6 +1,7 @@
 package redis
 
 import (
+	"fmt"
 	"git.sxidc.com/go-framework/baize/framework/core/data_protocol"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common"
@@ -8,6 +9,7 @@ import (
 	"github.com/robinjoseph08/redisqueue/v2"
 	"strings"
 	"sync"
+	"time"
 )
 
 const (
@@ -17,8 +19,10 @@ const (
 type Option func(options *Options)
 
 type Options struct {
-	MaxLen      int64
-	ConsumerNum int
+	MaxLen             int64
+	ConsumerNum        int
+	VisibilityAgainSec int
+	InFlightCount      int
 }
 
 func WithMaxLen(maxLen int64) Option {
@@ -33,6 +37,18 @@ func WithConsumerNum(consumerNum int) Option {
 	}
 }
 
+func WithVisibilityAgainSec(visibilityAgainSec int) Option {
+	return func(options *Options) {
+		options.VisibilityAgainSec = visibilityAgainSec
+	}
+}
+
+func WithInFlightCount(inFlightCount int) Option {
+	return func(options *Options) {
+		options.InFlightCount = inFlightCount
+	}
+}
+
 type MessageQueue struct {
 	redisOptions *redisqueue.RedisOptions
 
@@ -103,10 +119,12 @@ func (messageQueue *MessageQueue) Subscribe(group string, topic string, handler
 	}
 
 	newConsumer, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{
-		GroupName:    group,
-		Concurrency:  messageQueue.options.ConsumerNum,
-		RedisClient:  nil,
-		RedisOptions: messageQueue.redisOptions,
+		VisibilityTimeout: time.Duration(messageQueue.options.VisibilityAgainSec) * time.Second,
+		BufferSize:        messageQueue.options.InFlightCount,
+		GroupName:         group,
+		Concurrency:       messageQueue.options.ConsumerNum,
+		RedisClient:       nil,
+		RedisOptions:      messageQueue.redisOptions,
 	})
 	if err != nil {
 		return err
@@ -135,6 +153,12 @@ func (messageQueue *MessageQueue) Subscribe(group string, topic string, handler
 		return nil
 	})
 
+	go func() {
+		for err := range newConsumer.Errors {
+			fmt.Printf("err: %+v\n", err)
+		}
+	}()
+
 	go newConsumer.Run()
 
 	messageQueue.consumerMap[groupTopic] = newConsumer

+ 10 - 6
test/message_queue_test.go

@@ -16,7 +16,9 @@ import (
 func TestRedisMessageQueue(t *testing.T) {
 	redisMessageQueue := redis.New("localhost:30379", "", "mtyzxhc", 1,
 		redis.WithMaxLen(1000),
-		redis.WithConsumerNum(1))
+		redis.WithConsumerNum(1),
+		redis.WithInFlightCount(100),
+		redis.WithVisibilityAgainSec(5))
 	defer redis.Destroy(redisMessageQueue)
 
 	testRedisMessageQueue(t, redisMessageQueue)
@@ -36,11 +38,13 @@ func TestMessageQueueInfrastructure(t *testing.T) {
 	i := infrastructure.NewInfrastructure(infrastructure.Config{
 		MessageQueueConfig: infrastructure.MessageQueueConfig{
 			Redis: &infrastructure.RedisConfig{
-				Address:     "localhost:30379",
-				Password:    "mtyzxhc",
-				DB:          1,
-				MaxLen:      1000,
-				ConsumerNum: 1,
+				Address:            "localhost:30379",
+				Password:           "mtyzxhc",
+				DB:                 1,
+				MaxLen:             1000,
+				ConsumerNum:        1,
+				VisibilityAgainSec: 5,
+				InFlightCount:      100,
 			},
 			Mqtt: &infrastructure.MqttConfig{
 				Address:  "localhost:1883",