sdk.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package ups_sdk
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/robinjoseph08/redisqueue/v2"
  6. "time"
  7. )
  8. var (
  9. VisibilityTimeout = 30 * time.Second
  10. BlockingTimeout = 5 * time.Second
  11. ReclaimInterval = 1 * time.Second
  12. BufferSize = 1024
  13. Concurrency = 10
  14. )
  15. type SDK struct {
  16. consumer *redisqueue.Consumer
  17. }
  18. func New() *SDK {
  19. return &SDK{}
  20. }
  21. type MessageReceiverOptions struct {
  22. ReceiverName string
  23. MessageGroupName string
  24. RedisOptions
  25. }
  26. type RedisOptions struct {
  27. Address string
  28. Password string
  29. DB int
  30. MaxRetries int
  31. DialTimeout time.Duration
  32. ReadTimeout time.Duration
  33. WriteTimeout time.Duration
  34. }
  35. func (sdk *SDK) StartMessageReceiver(options *MessageReceiverOptions) error {
  36. if IsStringEmpty(options.ReceiverName) {
  37. return errors.New("没有传递接受者名称")
  38. }
  39. if IsStringEmpty(options.MessageGroupName) {
  40. return errors.New("没有传递消息组名称")
  41. }
  42. if IsStringEmpty(options.Address) {
  43. return errors.New("没有传递Redis地址")
  44. }
  45. if IsStringEmpty(options.Password) {
  46. return errors.New("没有传递Redis密码")
  47. }
  48. consumer, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{
  49. Name: options.ReceiverName,
  50. GroupName: options.MessageGroupName,
  51. VisibilityTimeout: VisibilityTimeout,
  52. BlockingTimeout: BlockingTimeout,
  53. ReclaimInterval: ReclaimInterval,
  54. BufferSize: BufferSize,
  55. Concurrency: Concurrency,
  56. RedisOptions: &redisqueue.RedisOptions{
  57. Addr: options.Address,
  58. Password: options.Password,
  59. DB: options.DB,
  60. MaxRetries: options.MaxRetries,
  61. DialTimeout: options.DialTimeout,
  62. ReadTimeout: options.ReadTimeout,
  63. WriteTimeout: options.WriteTimeout,
  64. },
  65. })
  66. if err != nil {
  67. return err
  68. }
  69. sdk.consumer = consumer
  70. sdk.consumer.Register("ups:user-registered", sdk.processMessage)
  71. sdk.consumer.Register("ups:user-username-updated", sdk.processMessage)
  72. sdk.consumer.Register("ups:user-deleted", sdk.processMessage)
  73. go func() {
  74. for err := range sdk.consumer.Errors {
  75. fmt.Printf("err: %+v\n", err)
  76. }
  77. }()
  78. go sdk.consumer.Run()
  79. return nil
  80. }
  81. func (sdk *SDK) StopMessageReceiver() {
  82. sdk.consumer.Shutdown()
  83. }
  84. func (sdk *SDK) processMessage(receivedMessage *redisqueue.Message) error {
  85. return nil
  86. }