package ups_sdk import ( "errors" "fmt" "github.com/robinjoseph08/redisqueue/v2" "time" ) var ( VisibilityTimeout = 30 * time.Second BlockingTimeout = 5 * time.Second ReclaimInterval = 1 * time.Second BufferSize = 1024 Concurrency = 10 ) type SDK struct { consumer *redisqueue.Consumer } func New() *SDK { return &SDK{} } type MessageReceiverOptions struct { ReceiverName string MessageGroupName string RedisOptions } type RedisOptions struct { Address string Password string DB int MaxRetries int DialTimeout time.Duration ReadTimeout time.Duration WriteTimeout time.Duration } func (sdk *SDK) StartMessageReceiver(options *MessageReceiverOptions) error { if IsStringEmpty(options.ReceiverName) { return errors.New("没有传递接受者名称") } if IsStringEmpty(options.MessageGroupName) { return errors.New("没有传递消息组名称") } if IsStringEmpty(options.Address) { return errors.New("没有传递Redis地址") } if IsStringEmpty(options.Password) { return errors.New("没有传递Redis密码") } consumer, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{ Name: options.ReceiverName, GroupName: options.MessageGroupName, VisibilityTimeout: VisibilityTimeout, BlockingTimeout: BlockingTimeout, ReclaimInterval: ReclaimInterval, BufferSize: BufferSize, Concurrency: Concurrency, RedisOptions: &redisqueue.RedisOptions{ Addr: options.Address, Password: options.Password, DB: options.DB, MaxRetries: options.MaxRetries, DialTimeout: options.DialTimeout, ReadTimeout: options.ReadTimeout, WriteTimeout: options.WriteTimeout, }, }) if err != nil { return err } sdk.consumer = consumer sdk.consumer.Register("ups:user-registered", sdk.processMessage) sdk.consumer.Register("ups:user-username-updated", sdk.processMessage) sdk.consumer.Register("ups:user-deleted", sdk.processMessage) go func() { for err := range sdk.consumer.Errors { fmt.Printf("err: %+v\n", err) } }() go sdk.consumer.Run() return nil } func (sdk *SDK) StopMessageReceiver() { sdk.consumer.Shutdown() } func (sdk *SDK) processMessage(receivedMessage *redisqueue.Message) error { return nil }