message_receiver.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package ups_sdk
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "github.com/robinjoseph08/redisqueue/v2"
  6. "time"
  7. )
  8. const (
  9. topicUserRegistered = "ups:user-registered"
  10. topicUserUserNameUpdated = "ups:user-username-updated"
  11. topicUserDeleted = "ups:user-deleted"
  12. )
  13. type UserRegisteredCallback func(data *UserRegisteredMessageData, err error) error
  14. type UserUserNameUpdatedCallback func(data *UserUserNameUpdatedMessageData, err error) error
  15. type UserDeletedCallback func(data *UserDeletedMessageData, err error) error
  16. type MessageReceiver struct {
  17. consumer *redisqueue.Consumer
  18. userRegisteredCallback UserRegisteredCallback
  19. userUserNameUpdatedCallback UserUserNameUpdatedCallback
  20. userDeletedCallback UserDeletedCallback
  21. }
  22. type MessageReceiverOptions struct {
  23. // 接收者名称,必传
  24. ReceiverName string
  25. // 消息组名称,必传
  26. MessageGroupName string
  27. // 用户注册消息回调,必传
  28. UserRegisteredCallback UserRegisteredCallback
  29. // 用户名更新消息回调,必传
  30. UserUserNameUpdatedCallback UserUserNameUpdatedCallback
  31. // 用户删除消息回调,必传
  32. UserDeletedCallback UserDeletedCallback
  33. // Redis配置
  34. RedisOptions
  35. }
  36. type RedisOptions struct {
  37. Address string
  38. Password string
  39. DB int
  40. MaxRetries int
  41. DialTimeout time.Duration
  42. ReadTimeout time.Duration
  43. WriteTimeout time.Duration
  44. }
  45. // NewMessageReceiver 创建MessageReceiver
  46. func NewMessageReceiver(options *MessageReceiverOptions) (*MessageReceiver, error) {
  47. if IsStringEmpty(options.ReceiverName) {
  48. return nil, errors.New("没有传递接受者名称")
  49. }
  50. if IsStringEmpty(options.MessageGroupName) {
  51. return nil, errors.New("没有传递消息组名称")
  52. }
  53. if IsStringEmpty(options.Address) {
  54. return nil, errors.New("没有传递Redis地址")
  55. }
  56. if IsStringEmpty(options.Password) {
  57. return nil, errors.New("没有传递Redis密码")
  58. }
  59. if options.UserRegisteredCallback == nil ||
  60. options.UserUserNameUpdatedCallback == nil ||
  61. options.UserDeletedCallback == nil {
  62. return nil, errors.New("没有传递需要的回调函数")
  63. }
  64. consumer, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{
  65. Name: options.ReceiverName,
  66. GroupName: options.MessageGroupName,
  67. VisibilityTimeout: MessageReceiverVisibilityTimeout,
  68. BlockingTimeout: MessageReceiverBlockingTimeout,
  69. ReclaimInterval: MessageReceiverReclaimInterval,
  70. BufferSize: MessageReceiverBufferSize,
  71. Concurrency: MessageReceiverConcurrency,
  72. RedisOptions: &redisqueue.RedisOptions{
  73. Addr: options.Address,
  74. Password: options.Password,
  75. DB: options.DB,
  76. MaxRetries: options.MaxRetries,
  77. DialTimeout: options.DialTimeout,
  78. ReadTimeout: options.ReadTimeout,
  79. WriteTimeout: options.WriteTimeout,
  80. },
  81. })
  82. if err != nil {
  83. return nil, err
  84. }
  85. return &MessageReceiver{
  86. consumer: consumer,
  87. userRegisteredCallback: options.UserRegisteredCallback,
  88. userUserNameUpdatedCallback: options.UserUserNameUpdatedCallback,
  89. userDeletedCallback: options.UserDeletedCallback,
  90. }, nil
  91. }
  92. // DestroyMessageReceiver 销毁MessageReceiver
  93. func DestroyMessageReceiver(messageReceiver *MessageReceiver) {
  94. messageReceiver.userDeletedCallback = nil
  95. messageReceiver.userUserNameUpdatedCallback = nil
  96. messageReceiver.userRegisteredCallback = nil
  97. messageReceiver.consumer = nil
  98. messageReceiver = nil
  99. }
  100. // StartMessageReceiver 启动消息接收
  101. func (receiver *MessageReceiver) StartMessageReceiver() error {
  102. receiver.consumer.Register(topicUserRegistered, receiver.processMessage)
  103. receiver.consumer.Register(topicUserUserNameUpdated, receiver.processMessage)
  104. receiver.consumer.Register(topicUserDeleted, receiver.processMessage)
  105. go receiver.consumer.Run()
  106. return nil
  107. }
  108. // StopMessageReceiver 停止消息接收
  109. func (receiver *MessageReceiver) StopMessageReceiver() {
  110. receiver.consumer.Shutdown()
  111. }
  112. func (receiver *MessageReceiver) processMessage(receivedMessage *redisqueue.Message) error {
  113. msgStr := receivedMessage.Values["msg"].(string)
  114. msg := new(Message)
  115. err := json.Unmarshal([]byte(msgStr), msg)
  116. if err != nil {
  117. return err
  118. }
  119. switch receivedMessage.Stream {
  120. case topicUserRegistered:
  121. data := new(UserRegisteredMessageData)
  122. err := json.Unmarshal([]byte(msg.Data), data)
  123. if err != nil {
  124. return receiver.userRegisteredCallback(nil, err)
  125. }
  126. return receiver.userRegisteredCallback(data, nil)
  127. case topicUserUserNameUpdated:
  128. data := new(UserUserNameUpdatedMessageData)
  129. err := json.Unmarshal([]byte(msg.Data), data)
  130. if err != nil {
  131. return receiver.userUserNameUpdatedCallback(nil, err)
  132. }
  133. return receiver.userUserNameUpdatedCallback(data, nil)
  134. case topicUserDeleted:
  135. data := new(UserDeletedMessageData)
  136. err := json.Unmarshal([]byte(msg.Data), data)
  137. if err != nil {
  138. return receiver.userDeletedCallback(nil, err)
  139. }
  140. return receiver.userDeletedCallback(data, nil)
  141. default:
  142. return errors.New("不存在的主题: " + receivedMessage.Stream)
  143. }
  144. }