message_queue_test.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package test
  2. import (
  3. "git.sxidc.com/go-framework/baize/framework/core/infrastructure"
  4. "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue"
  5. "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common"
  6. "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/redis"
  7. "github.com/pkg/errors"
  8. "sync"
  9. "testing"
  10. )
  11. func TestRedisMessageQueue(t *testing.T) {
  12. redisMessageQueue := redis.New("localhost:30379", "", "mtyzxhc", 1, common.WithMaxLen(1000))
  13. defer redis.Destroy(redisMessageQueue)
  14. testRedisMessageQueue(t, redisMessageQueue)
  15. }
  16. func TestMessageQueueInfrastructure(t *testing.T) {
  17. i := infrastructure.NewInfrastructure(infrastructure.Config{
  18. MessageQueueConfig: infrastructure.MessageQueueConfig{
  19. Redis: &infrastructure.RedisConfig{
  20. Address: "localhost:30379",
  21. Password: "mtyzxhc",
  22. DB: 1,
  23. },
  24. MaxLen: 1000,
  25. },
  26. })
  27. defer infrastructure.DestroyInfrastructure(i)
  28. testMessageQueue(t, i.RedisMessageQueue())
  29. }
  30. func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue) {
  31. wg := sync.WaitGroup{}
  32. wg.Add(2)
  33. err := redisMessageQueue.Subscribe("test1", "test-redis",
  34. func(queue common.MessageQueue, topic string, data string) {
  35. if string(data) != "test-message" {
  36. t.Fatalf("%+v\n", errors.New("消息数据不一致"))
  37. }
  38. wg.Done()
  39. })
  40. if err != nil {
  41. t.Fatalf("%+v\n", err)
  42. }
  43. err = redisMessageQueue.Subscribe("test2", "test-redis",
  44. func(queue common.MessageQueue, topic string, data string) {
  45. if string(data) != "test-message" {
  46. t.Fatalf("%+v\n", errors.New("消息数据不一致"))
  47. }
  48. wg.Done()
  49. })
  50. if err != nil {
  51. t.Fatalf("%+v\n", err)
  52. }
  53. err = redisMessageQueue.Publish("test-redis", "test-message")
  54. if err != nil {
  55. t.Fatalf("%+v\n", err)
  56. }
  57. wg.Wait()
  58. }
  59. func testMessageQueue(t *testing.T, redisMessageQueue common.MessageQueue) {
  60. wg := sync.WaitGroup{}
  61. wg.Add(2)
  62. err := message_queue.Subscribe(redisMessageQueue, "test1", "test-redis",
  63. func(queue common.MessageQueue, topic string, data string) {
  64. if string(data) != "test-message" {
  65. t.Fatalf("%+v\n", errors.New("消息数据不一致"))
  66. }
  67. wg.Done()
  68. })
  69. if err != nil {
  70. t.Fatalf("%+v\n", err)
  71. }
  72. err = message_queue.Subscribe(redisMessageQueue, "test2", "test-redis",
  73. func(queue common.MessageQueue, topic string, data string) {
  74. if string(data) != "test-message" {
  75. t.Fatalf("%+v\n", errors.New("消息数据不一致"))
  76. }
  77. wg.Done()
  78. })
  79. if err != nil {
  80. t.Fatalf("%+v\n", err)
  81. }
  82. err = message_queue.Publish(redisMessageQueue, "test-redis", "test-message")
  83. if err != nil {
  84. t.Fatalf("%+v\n", err)
  85. }
  86. wg.Wait()
  87. }