package redis import ( "git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger" "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common" "github.com/pkg/errors" "github.com/robinjoseph08/redisqueue/v2" "strings" "sync" ) const ( messageValuesDataKey = "data" ) type Option func(options *Options) type Options struct { MaxLen int64 ConsumerNum int } func WithMaxLen(maxLen int64) Option { return func(options *Options) { options.MaxLen = maxLen } } func WithConsumerNum(consumerNum int) Option { return func(options *Options) { options.ConsumerNum = consumerNum } } type MessageQueue struct { redisOptions *redisqueue.RedisOptions producerMapMutex *sync.Mutex producerMap map[string]*redisqueue.Producer consumerMapMutex *sync.Mutex consumerMap map[string]*redisqueue.Consumer options *Options } func New(address string, userName string, password string, db int, opts ...Option) *MessageQueue { options := new(Options) for _, opt := range opts { opt(options) } if options.MaxLen == 0 { options.MaxLen = 1000 } if options.ConsumerNum == 0 { options.ConsumerNum = 10 } return &MessageQueue{ redisOptions: &redisqueue.RedisOptions{ Addr: address, Username: userName, Password: password, DB: db, }, producerMapMutex: &sync.Mutex{}, producerMap: make(map[string]*redisqueue.Producer), consumerMapMutex: &sync.Mutex{}, consumerMap: make(map[string]*redisqueue.Consumer), options: options, } } func Destroy(messageQueue *MessageQueue) { if messageQueue.redisOptions == nil { return } messageQueue.producerMapMutex.Lock() defer messageQueue.producerMapMutex.Unlock() messageQueue.producerMap = nil messageQueue.consumerMapMutex.Lock() defer messageQueue.consumerMapMutex.Unlock() for _, consumer := range messageQueue.consumerMap { consumer.Shutdown() consumer = nil } messageQueue.consumerMap = nil } func (messageQueue *MessageQueue) Subscribe(group string, topic string, handler common.MessageHandler) error { messageQueue.consumerMapMutex.Lock() defer messageQueue.consumerMapMutex.Unlock() groupTopic := messageQueue.formGroupTopic(group, topic) _, ok := messageQueue.consumerMap[groupTopic] if ok { return nil } newConsumer, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{ GroupName: group, Concurrency: messageQueue.options.ConsumerNum, RedisClient: nil, RedisOptions: messageQueue.redisOptions, }) if err != nil { return err } newConsumer.Register(topic, func(message *redisqueue.Message) error { data, ok := message.Values[messageValuesDataKey].(string) if !ok { logger.GetInstance().Error(errors.New("数据不存在")) return nil } handler(messageQueue, message.Stream, data) return nil }) go newConsumer.Run() messageQueue.consumerMap[groupTopic] = newConsumer return nil } func (messageQueue *MessageQueue) UnSubscribe(group string, topic string) error { messageQueue.consumerMapMutex.Lock() defer messageQueue.consumerMapMutex.Unlock() groupTopic := messageQueue.formGroupTopic(group, topic) consumer, ok := messageQueue.consumerMap[groupTopic] if !ok { return nil } consumer.Shutdown() delete(messageQueue.consumerMap, groupTopic) return nil } func (messageQueue *MessageQueue) Publish(topic string, data string) error { messageQueue.producerMapMutex.Lock() defer messageQueue.producerMapMutex.Unlock() producer, err := messageQueue.getProducerNoLock(topic) if err != nil { return err } err = producer.Enqueue(&redisqueue.Message{ Stream: topic, Values: map[string]any{ messageValuesDataKey: data, }, }) if err != nil { return err } return nil } func (messageQueue *MessageQueue) getProducerNoLock(topic string) (*redisqueue.Producer, error) { producer, ok := messageQueue.producerMap[topic] if ok { return producer, nil } newProducer, err := redisqueue.NewProducerWithOptions(&redisqueue.ProducerOptions{ StreamMaxLength: messageQueue.options.MaxLen, ApproximateMaxLength: true, RedisOptions: messageQueue.redisOptions, }) if err != nil { return nil, err } return newProducer, nil } func (messageQueue *MessageQueue) formGroupTopic(group string, topic string) string { return strings.Join([]string{group, topic}, "::") }