123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- package ups_sdk
- import (
- "encoding/json"
- "errors"
- "github.com/robinjoseph08/redisqueue/v2"
- "time"
- )
- const (
- topicUserRegistered = "ups:user-registered"
- topicUserUserNameUpdated = "ups:user-username-updated"
- topicUserDeleted = "ups:user-deleted"
- )
- type UserRegisteredCallback func(data *UserRegisteredMessageData, err error) error
- type UserUserNameUpdatedCallback func(data *UserUserNameUpdatedMessageData, err error) error
- type UserDeletedCallback func(data *UserDeletedMessageData, err error) error
- type MessageReceiver struct {
- consumer *redisqueue.Consumer
- userRegisteredCallback UserRegisteredCallback
- userUserNameUpdatedCallback UserUserNameUpdatedCallback
- userDeletedCallback UserDeletedCallback
- }
- type MessageReceiverOptions struct {
- // 接收者名称,必传
- ReceiverName string
- // 消息组名称,必传
- MessageGroupName string
- // 用户注册消息回调,必传
- UserRegisteredCallback UserRegisteredCallback
- // 用户名更新消息回调,必传
- UserUserNameUpdatedCallback UserUserNameUpdatedCallback
- // 用户删除消息回调,必传
- UserDeletedCallback UserDeletedCallback
- // Redis配置
- RedisOptions
- }
- type RedisOptions struct {
- Address string
- Password string
- DB int
- MaxRetries int
- DialTimeout time.Duration
- ReadTimeout time.Duration
- WriteTimeout time.Duration
- }
- // NewMessageReceiver 创建MessageReceiver
- func NewMessageReceiver(options *MessageReceiverOptions) (*MessageReceiver, error) {
- if IsStringEmpty(options.ReceiverName) {
- return nil, errors.New("没有传递接受者名称")
- }
- if IsStringEmpty(options.MessageGroupName) {
- return nil, errors.New("没有传递消息组名称")
- }
- if IsStringEmpty(options.Address) {
- return nil, errors.New("没有传递Redis地址")
- }
- if IsStringEmpty(options.Password) {
- return nil, errors.New("没有传递Redis密码")
- }
- if options.UserRegisteredCallback == nil ||
- options.UserUserNameUpdatedCallback == nil ||
- options.UserDeletedCallback == nil {
- return nil, errors.New("没有传递需要的回调函数")
- }
- consumer, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{
- Name: options.ReceiverName,
- GroupName: options.MessageGroupName,
- VisibilityTimeout: MessageReceiverVisibilityTimeout,
- BlockingTimeout: MessageReceiverBlockingTimeout,
- ReclaimInterval: MessageReceiverReclaimInterval,
- BufferSize: MessageReceiverBufferSize,
- Concurrency: MessageReceiverConcurrency,
- RedisOptions: &redisqueue.RedisOptions{
- Addr: options.Address,
- Password: options.Password,
- DB: options.DB,
- MaxRetries: options.MaxRetries,
- DialTimeout: options.DialTimeout,
- ReadTimeout: options.ReadTimeout,
- WriteTimeout: options.WriteTimeout,
- },
- })
- if err != nil {
- return nil, err
- }
- return &MessageReceiver{
- consumer: consumer,
- userRegisteredCallback: options.UserRegisteredCallback,
- userUserNameUpdatedCallback: options.UserUserNameUpdatedCallback,
- userDeletedCallback: options.UserDeletedCallback,
- }, nil
- }
- // DestroyMessageReceiver 销毁MessageReceiver
- func DestroyMessageReceiver(messageReceiver *MessageReceiver) {
- messageReceiver.userDeletedCallback = nil
- messageReceiver.userUserNameUpdatedCallback = nil
- messageReceiver.userRegisteredCallback = nil
- messageReceiver.consumer = nil
- messageReceiver = nil
- }
- // StartMessageReceiver 启动消息接收
- func (receiver *MessageReceiver) StartMessageReceiver() error {
- receiver.consumer.Register(topicUserRegistered, receiver.processMessage)
- receiver.consumer.Register(topicUserUserNameUpdated, receiver.processMessage)
- receiver.consumer.Register(topicUserDeleted, receiver.processMessage)
- go receiver.consumer.Run()
- return nil
- }
- // StopMessageReceiver 停止消息接收
- func (receiver *MessageReceiver) StopMessageReceiver() {
- receiver.consumer.Shutdown()
- }
- func (receiver *MessageReceiver) processMessage(receivedMessage *redisqueue.Message) error {
- msgStr := receivedMessage.Values["msg"].(string)
- msg := new(Message)
- err := json.Unmarshal([]byte(msgStr), msg)
- if err != nil {
- return err
- }
- switch receivedMessage.Stream {
- case topicUserRegistered:
- data := new(UserRegisteredMessageData)
- err := json.Unmarshal([]byte(msg.Data), data)
- if err != nil {
- return receiver.userRegisteredCallback(nil, err)
- }
- return receiver.userRegisteredCallback(data, nil)
- case topicUserUserNameUpdated:
- data := new(UserUserNameUpdatedMessageData)
- err := json.Unmarshal([]byte(msg.Data), data)
- if err != nil {
- return receiver.userUserNameUpdatedCallback(nil, err)
- }
- return receiver.userUserNameUpdatedCallback(data, nil)
- case topicUserDeleted:
- data := new(UserDeletedMessageData)
- err := json.Unmarshal([]byte(msg.Data), data)
- if err != nil {
- return receiver.userDeletedCallback(nil, err)
- }
- return receiver.userDeletedCallback(data, nil)
- default:
- return errors.New("不存在的主题: " + receivedMessage.Stream)
- }
- }
|