package ups_sdk import ( "encoding/json" "errors" "github.com/robinjoseph08/redisqueue/v2" "time" ) const ( topicUserRegistered = "ups:user-registered" topicUserUserNameUpdated = "ups:user-username-updated" topicUserDeleted = "ups:user-deleted" ) type UserRegisteredCallback func(data *UserRegisteredMessageData, err error) error type UserUserNameUpdatedCallback func(data *UserUserNameUpdatedMessageData, err error) error type UserDeletedCallback func(data *UserDeletedMessageData, err error) error type MessageReceiver struct { consumer *redisqueue.Consumer userRegisteredCallback UserRegisteredCallback userUserNameUpdatedCallback UserUserNameUpdatedCallback userDeletedCallback UserDeletedCallback } type MessageReceiverOptions struct { // 接收者名称,必传 ReceiverName string // 消息组名称,必传 MessageGroupName string // 用户注册消息回调,必传 UserRegisteredCallback UserRegisteredCallback // 用户名更新消息回调,必传 UserUserNameUpdatedCallback UserUserNameUpdatedCallback // 用户删除消息回调,必传 UserDeletedCallback UserDeletedCallback // Redis配置 RedisOptions } type RedisOptions struct { Address string Password string DB int MaxRetries int DialTimeout time.Duration ReadTimeout time.Duration WriteTimeout time.Duration } // NewMessageReceiver 创建MessageReceiver func NewMessageReceiver(options *MessageReceiverOptions) (*MessageReceiver, error) { if IsStringEmpty(options.ReceiverName) { return nil, errors.New("没有传递接受者名称") } if IsStringEmpty(options.MessageGroupName) { return nil, errors.New("没有传递消息组名称") } if IsStringEmpty(options.Address) { return nil, errors.New("没有传递Redis地址") } if IsStringEmpty(options.Password) { return nil, errors.New("没有传递Redis密码") } if options.UserRegisteredCallback == nil || options.UserUserNameUpdatedCallback == nil || options.UserDeletedCallback == nil { return nil, errors.New("没有传递需要的回调函数") } consumer, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{ Name: options.ReceiverName, GroupName: options.MessageGroupName, VisibilityTimeout: MessageReceiverVisibilityTimeout, BlockingTimeout: MessageReceiverBlockingTimeout, ReclaimInterval: MessageReceiverReclaimInterval, BufferSize: MessageReceiverBufferSize, Concurrency: MessageReceiverConcurrency, RedisOptions: &redisqueue.RedisOptions{ Addr: options.Address, Password: options.Password, DB: options.DB, MaxRetries: options.MaxRetries, DialTimeout: options.DialTimeout, ReadTimeout: options.ReadTimeout, WriteTimeout: options.WriteTimeout, }, }) if err != nil { return nil, err } return &MessageReceiver{ consumer: consumer, userRegisteredCallback: options.UserRegisteredCallback, userUserNameUpdatedCallback: options.UserUserNameUpdatedCallback, userDeletedCallback: options.UserDeletedCallback, }, nil } // DestroyMessageReceiver 销毁MessageReceiver func DestroyMessageReceiver(messageReceiver *MessageReceiver) { messageReceiver.userDeletedCallback = nil messageReceiver.userUserNameUpdatedCallback = nil messageReceiver.userRegisteredCallback = nil messageReceiver.consumer = nil messageReceiver = nil } // StartMessageReceiver 启动消息接收 func (receiver *MessageReceiver) StartMessageReceiver() error { receiver.consumer.Register(topicUserRegistered, receiver.processMessage) receiver.consumer.Register(topicUserUserNameUpdated, receiver.processMessage) receiver.consumer.Register(topicUserDeleted, receiver.processMessage) go receiver.consumer.Run() return nil } // StopMessageReceiver 停止消息接收 func (receiver *MessageReceiver) StopMessageReceiver() { receiver.consumer.Shutdown() } func (receiver *MessageReceiver) processMessage(receivedMessage *redisqueue.Message) error { msgStr := receivedMessage.Values["msg"].(string) msg := new(Message) err := json.Unmarshal([]byte(msgStr), msg) if err != nil { return err } switch receivedMessage.Stream { case topicUserRegistered: data := new(UserRegisteredMessageData) err := json.Unmarshal([]byte(msg.Data), data) if err != nil { return receiver.userRegisteredCallback(nil, err) } return receiver.userRegisteredCallback(data, nil) case topicUserUserNameUpdated: data := new(UserUserNameUpdatedMessageData) err := json.Unmarshal([]byte(msg.Data), data) if err != nil { return receiver.userUserNameUpdatedCallback(nil, err) } return receiver.userUserNameUpdatedCallback(data, nil) case topicUserDeleted: data := new(UserDeletedMessageData) err := json.Unmarshal([]byte(msg.Data), data) if err != nil { return receiver.userDeletedCallback(nil, err) } return receiver.userDeletedCallback(data, nil) default: return errors.New("不存在的主题: " + receivedMessage.Stream) } }