123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- package ups_sdk
- import (
- "errors"
- "fmt"
- "github.com/robinjoseph08/redisqueue/v2"
- "time"
- )
- var (
- VisibilityTimeout = 30 * time.Second
- BlockingTimeout = 5 * time.Second
- ReclaimInterval = 1 * time.Second
- BufferSize = 1024
- Concurrency = 10
- )
- type SDK struct {
- consumer *redisqueue.Consumer
- }
- func New() *SDK {
- return &SDK{}
- }
- type MessageReceiverOptions struct {
- ReceiverName string
- MessageGroupName string
- RedisOptions
- }
- type RedisOptions struct {
- Address string
- Password string
- DB int
- MaxRetries int
- DialTimeout time.Duration
- ReadTimeout time.Duration
- WriteTimeout time.Duration
- }
- func (sdk *SDK) StartMessageReceiver(options *MessageReceiverOptions) error {
- if IsStringEmpty(options.ReceiverName) {
- return errors.New("没有传递接受者名称")
- }
- if IsStringEmpty(options.MessageGroupName) {
- return errors.New("没有传递消息组名称")
- }
- if IsStringEmpty(options.Address) {
- return errors.New("没有传递Redis地址")
- }
- if IsStringEmpty(options.Password) {
- return errors.New("没有传递Redis密码")
- }
- consumer, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{
- Name: options.ReceiverName,
- GroupName: options.MessageGroupName,
- VisibilityTimeout: VisibilityTimeout,
- BlockingTimeout: BlockingTimeout,
- ReclaimInterval: ReclaimInterval,
- BufferSize: BufferSize,
- Concurrency: Concurrency,
- RedisOptions: &redisqueue.RedisOptions{
- Addr: options.Address,
- Password: options.Password,
- DB: options.DB,
- MaxRetries: options.MaxRetries,
- DialTimeout: options.DialTimeout,
- ReadTimeout: options.ReadTimeout,
- WriteTimeout: options.WriteTimeout,
- },
- })
- if err != nil {
- return err
- }
- sdk.consumer = consumer
- sdk.consumer.Register("ups:user-registered", sdk.processMessage)
- sdk.consumer.Register("ups:user-username-updated", sdk.processMessage)
- sdk.consumer.Register("ups:user-deleted", sdk.processMessage)
- go func() {
- for err := range sdk.consumer.Errors {
- fmt.Printf("err: %+v\n", err)
- }
- }()
- go sdk.consumer.Run()
- return nil
- }
- func (sdk *SDK) StopMessageReceiver() {
- sdk.consumer.Shutdown()
- }
- func (sdk *SDK) processMessage(receivedMessage *redisqueue.Message) error {
- return nil
- }
|