message_queue_test.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. package test
  2. import (
  3. "fmt"
  4. "git.sxidc.com/go-framework/baize/framework/core/data_protocol"
  5. "git.sxidc.com/go-framework/baize/framework/core/infrastructure"
  6. "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue"
  7. "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common"
  8. "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/mqtt"
  9. "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/redis"
  10. "github.com/pkg/errors"
  11. "sync"
  12. "testing"
  13. )
  14. func TestRedisMessageQueue(t *testing.T) {
  15. redisMessageQueue := redis.New("localhost:30379", "", "mtyzxhc", 1,
  16. redis.WithMaxLen(1000),
  17. redis.WithConsumerNum(1),
  18. redis.WithInFlightCount(100),
  19. redis.WithVisibilityAgainSec(5))
  20. defer redis.Destroy(redisMessageQueue)
  21. testRedisMessageQueue(t, redisMessageQueue)
  22. }
  23. func TestMqttMessageQueue(t *testing.T) {
  24. mqttMessageQueue, err := mqtt.New("localhost:1883", "admin", "mtyzxhc")
  25. if err != nil {
  26. t.Fatalf("%+v\n", err)
  27. }
  28. defer mqtt.Destroy(mqttMessageQueue)
  29. testMqttMessageQueue(t, mqttMessageQueue)
  30. }
  31. func TestMessageQueueInfrastructure(t *testing.T) {
  32. i := infrastructure.NewInfrastructure(infrastructure.Config{
  33. MessageQueueConfig: infrastructure.MessageQueueConfig{
  34. Redis: &infrastructure.RedisConfig{
  35. Address: "localhost:30379",
  36. Password: "mtyzxhc",
  37. DB: 1,
  38. MaxLen: 1000,
  39. ConsumerNum: 1,
  40. VisibilityAgainSec: 5,
  41. InFlightCount: 100,
  42. },
  43. Mqtt: &infrastructure.MqttConfig{
  44. Address: "localhost:1883",
  45. UserName: "admin",
  46. Password: "mtyzxhc",
  47. },
  48. },
  49. })
  50. defer infrastructure.DestroyInfrastructure(i)
  51. testMessageQueue(t, i.RedisMessageQueue())
  52. testMessageQueue(t, i.MqttMessageQueue())
  53. }
  54. func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue) {
  55. wg := sync.WaitGroup{}
  56. wg.Add(2)
  57. err := redisMessageQueue.Subscribe("test1", "test-redis",
  58. func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) error {
  59. if event.ID != "1" {
  60. t.Fatalf("%+v\n", errors.New("消息ID不一致"))
  61. }
  62. if event.Type != "test" {
  63. t.Fatalf("%+v\n", errors.New("消息类型不一致"))
  64. }
  65. if string(event.Data) != "test-message" {
  66. t.Fatalf("%+v\n", errors.New("消息数据不一致"))
  67. }
  68. fmt.Println("redis test1 consumed")
  69. wg.Done()
  70. return nil
  71. })
  72. if err != nil {
  73. t.Fatalf("%+v\n", err)
  74. }
  75. err = redisMessageQueue.Subscribe("test2", "test-redis",
  76. func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) error {
  77. if event.ID != "1" {
  78. t.Fatalf("%+v\n", errors.New("消息ID不一致"))
  79. }
  80. if event.Type != "test" {
  81. t.Fatalf("%+v\n", errors.New("消息类型不一致"))
  82. }
  83. if string(event.Data) != "test-message" {
  84. t.Fatalf("%+v\n", errors.New("消息数据不一致"))
  85. }
  86. fmt.Println("redis test2 consumed")
  87. wg.Done()
  88. return nil
  89. })
  90. if err != nil {
  91. t.Fatalf("%+v\n", err)
  92. }
  93. err = redisMessageQueue.Publish("test-redis",
  94. data_protocol.NewCloudEvent("1", "test", "baize-test.com", "application/text", []byte("test-message")))
  95. if err != nil {
  96. t.Fatalf("%+v\n", err)
  97. }
  98. wg.Wait()
  99. }
  100. func testMqttMessageQueue(t *testing.T, mqttMessageQueue *mqtt.MessageQueue) {
  101. wg := sync.WaitGroup{}
  102. wg.Add(2)
  103. err := mqttMessageQueue.Subscribe("test1", "test-mqtt",
  104. func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) error {
  105. if event.ID != "1" {
  106. t.Fatalf("%+v\n", errors.New("消息ID不一致"))
  107. }
  108. if event.Type != "test" {
  109. t.Fatalf("%+v\n", errors.New("消息类型不一致"))
  110. }
  111. if string(event.Data) != "test-message" {
  112. t.Fatalf("%+v\n", errors.New("消息数据不一致"))
  113. }
  114. fmt.Println("mqtt test1 consumed")
  115. wg.Done()
  116. return nil
  117. })
  118. if err != nil {
  119. t.Fatalf("%+v\n", err)
  120. }
  121. err = mqttMessageQueue.Subscribe("test2", "test-mqtt",
  122. func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) error {
  123. if event.ID != "1" {
  124. t.Fatalf("%+v\n", errors.New("消息ID不一致"))
  125. }
  126. if event.Type != "test" {
  127. t.Fatalf("%+v\n", errors.New("消息类型不一致"))
  128. }
  129. if string(event.Data) != "test-message" {
  130. t.Fatalf("%+v\n", errors.New("消息数据不一致"))
  131. }
  132. fmt.Println("mqtt test2 consumed")
  133. wg.Done()
  134. return nil
  135. })
  136. if err != nil {
  137. t.Fatalf("%+v\n", err)
  138. }
  139. err = mqttMessageQueue.Publish("test-mqtt",
  140. data_protocol.NewCloudEvent("1", "test", "baize-test.com", "application/text", []byte("test-message")))
  141. if err != nil {
  142. t.Fatalf("%+v\n", err)
  143. }
  144. wg.Wait()
  145. }
  146. func testMessageQueue(t *testing.T, messageQueue common.MessageQueue) {
  147. wg := sync.WaitGroup{}
  148. wg.Add(2)
  149. err := message_queue.Subscribe(messageQueue, "test1", "test-message-queue",
  150. func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) error {
  151. if event.ID != "1" {
  152. t.Fatalf("%+v\n", errors.New("消息ID不一致"))
  153. }
  154. if event.Type != "test" {
  155. t.Fatalf("%+v\n", errors.New("消息类型不一致"))
  156. }
  157. if string(event.Data) != "test-message" {
  158. t.Fatalf("%+v\n", errors.New("消息数据不一致"))
  159. }
  160. fmt.Println("test1 consumed")
  161. wg.Done()
  162. return nil
  163. })
  164. if err != nil {
  165. t.Fatalf("%+v\n", err)
  166. }
  167. err = message_queue.Subscribe(messageQueue, "test2", "test-message-queue",
  168. func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) error {
  169. if event.ID != "1" {
  170. t.Fatalf("%+v\n", errors.New("消息ID不一致"))
  171. }
  172. if event.Type != "test" {
  173. t.Fatalf("%+v\n", errors.New("消息类型不一致"))
  174. }
  175. if string(event.Data) != "test-message" {
  176. t.Fatalf("%+v\n", errors.New("消息数据不一致"))
  177. }
  178. fmt.Println("test2 consumed")
  179. wg.Done()
  180. return nil
  181. })
  182. if err != nil {
  183. t.Fatalf("%+v\n", err)
  184. }
  185. err = message_queue.Publish(messageQueue, "test-message-queue",
  186. data_protocol.NewCloudEvent("1", "test", "baize-test.com", "application/text", []byte("test-message")))
  187. if err != nil {
  188. t.Fatalf("%+v\n", err)
  189. }
  190. wg.Wait()
  191. }