123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- package redis
- import (
- "git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger"
- "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common"
- "github.com/pkg/errors"
- "github.com/robinjoseph08/redisqueue/v2"
- "strings"
- "sync"
- )
- const (
- messageValuesDataKey = "data"
- )
- type Option func(options *Options)
- type Options struct {
- MaxLen int64
- ConsumerNum int
- }
- func WithMaxLen(maxLen int64) Option {
- return func(options *Options) {
- options.MaxLen = maxLen
- }
- }
- func WithConsumerNum(consumerNum int) Option {
- return func(options *Options) {
- options.ConsumerNum = consumerNum
- }
- }
- type MessageQueue struct {
- redisOptions *redisqueue.RedisOptions
- producerMapMutex *sync.Mutex
- producerMap map[string]*redisqueue.Producer
- consumerMapMutex *sync.Mutex
- consumerMap map[string]*redisqueue.Consumer
- options *Options
- }
- func New(address string, userName string, password string, db int, opts ...Option) *MessageQueue {
- options := new(Options)
- for _, opt := range opts {
- opt(options)
- }
- if options.MaxLen == 0 {
- options.MaxLen = 1000
- }
- if options.ConsumerNum == 0 {
- options.ConsumerNum = 10
- }
- return &MessageQueue{
- redisOptions: &redisqueue.RedisOptions{
- Addr: address,
- Username: userName,
- Password: password,
- DB: db,
- },
- producerMapMutex: &sync.Mutex{},
- producerMap: make(map[string]*redisqueue.Producer),
- consumerMapMutex: &sync.Mutex{},
- consumerMap: make(map[string]*redisqueue.Consumer),
- options: options,
- }
- }
- func Destroy(messageQueue *MessageQueue) {
- if messageQueue.redisOptions == nil {
- return
- }
- messageQueue.producerMapMutex.Lock()
- defer messageQueue.producerMapMutex.Unlock()
- messageQueue.producerMap = nil
- messageQueue.consumerMapMutex.Lock()
- defer messageQueue.consumerMapMutex.Unlock()
- for _, consumer := range messageQueue.consumerMap {
- consumer.Shutdown()
- consumer = nil
- }
- messageQueue.consumerMap = nil
- }
- func (messageQueue *MessageQueue) Subscribe(group string, topic string, handler common.MessageHandler) error {
- messageQueue.consumerMapMutex.Lock()
- defer messageQueue.consumerMapMutex.Unlock()
- groupTopic := messageQueue.formGroupTopic(group, topic)
- _, ok := messageQueue.consumerMap[groupTopic]
- if ok {
- return nil
- }
- newConsumer, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{
- GroupName: group,
- Concurrency: messageQueue.options.ConsumerNum,
- RedisClient: nil,
- RedisOptions: messageQueue.redisOptions,
- })
- if err != nil {
- return err
- }
- newConsumer.Register(topic, func(message *redisqueue.Message) error {
- data, ok := message.Values[messageValuesDataKey].(string)
- if !ok {
- logger.GetInstance().Error(errors.New("数据不存在"))
- return nil
- }
- handler(messageQueue, message.Stream, data)
- return nil
- })
- go newConsumer.Run()
- messageQueue.consumerMap[groupTopic] = newConsumer
- return nil
- }
- func (messageQueue *MessageQueue) UnSubscribe(group string, topic string) error {
- messageQueue.consumerMapMutex.Lock()
- defer messageQueue.consumerMapMutex.Unlock()
- groupTopic := messageQueue.formGroupTopic(group, topic)
- consumer, ok := messageQueue.consumerMap[groupTopic]
- if !ok {
- return nil
- }
- consumer.Shutdown()
- delete(messageQueue.consumerMap, groupTopic)
- return nil
- }
- func (messageQueue *MessageQueue) Publish(topic string, data string) error {
- messageQueue.producerMapMutex.Lock()
- defer messageQueue.producerMapMutex.Unlock()
- producer, err := messageQueue.getProducerNoLock(topic)
- if err != nil {
- return err
- }
- err = producer.Enqueue(&redisqueue.Message{
- Stream: topic,
- Values: map[string]any{
- messageValuesDataKey: data,
- },
- })
- if err != nil {
- return err
- }
- return nil
- }
- func (messageQueue *MessageQueue) getProducerNoLock(topic string) (*redisqueue.Producer, error) {
- producer, ok := messageQueue.producerMap[topic]
- if ok {
- return producer, nil
- }
- newProducer, err := redisqueue.NewProducerWithOptions(&redisqueue.ProducerOptions{
- StreamMaxLength: messageQueue.options.MaxLen,
- ApproximateMaxLength: true,
- RedisOptions: messageQueue.redisOptions,
- })
- if err != nil {
- return nil, err
- }
- return newProducer, nil
- }
- func (messageQueue *MessageQueue) formGroupTopic(group string, topic string) string {
- return strings.Join([]string{group, topic}, "::")
- }
|