|
@@ -59,7 +59,7 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
|
|
|
wg.Add(2)
|
|
|
|
|
|
err := redisMessageQueue.Subscribe("test1", "test-redis",
|
|
|
- func(queue common.MessageQueue, topic string, data string) {
|
|
|
+ func(queue common.MessageQueue, topic string, data string) error {
|
|
|
if string(data) != "test-message" {
|
|
|
t.Fatalf("%+v\n", errors.New("消息数据不一致"))
|
|
|
}
|
|
@@ -67,13 +67,15 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
|
|
|
fmt.Println("redis test1 consumed")
|
|
|
|
|
|
wg.Done()
|
|
|
+
|
|
|
+ return nil
|
|
|
})
|
|
|
if err != nil {
|
|
|
t.Fatalf("%+v\n", err)
|
|
|
}
|
|
|
|
|
|
err = redisMessageQueue.Subscribe("test2", "test-redis",
|
|
|
- func(queue common.MessageQueue, topic string, data string) {
|
|
|
+ func(queue common.MessageQueue, topic string, data string) error {
|
|
|
if string(data) != "test-message" {
|
|
|
t.Fatalf("%+v\n", errors.New("消息数据不一致"))
|
|
|
}
|
|
@@ -81,6 +83,8 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
|
|
|
fmt.Println("redis test2 consumed")
|
|
|
|
|
|
wg.Done()
|
|
|
+
|
|
|
+ return nil
|
|
|
})
|
|
|
if err != nil {
|
|
|
t.Fatalf("%+v\n", err)
|
|
@@ -99,7 +103,7 @@ func testMqttMessageQueue(t *testing.T, mqttMessageQueue *mqtt.MessageQueue) {
|
|
|
wg.Add(2)
|
|
|
|
|
|
err := mqttMessageQueue.Subscribe("test1", "test-mqtt",
|
|
|
- func(queue common.MessageQueue, topic string, data string) {
|
|
|
+ func(queue common.MessageQueue, topic string, data string) error {
|
|
|
if string(data) != "test-message" {
|
|
|
t.Fatalf("%+v\n", errors.New("消息数据不一致"))
|
|
|
}
|
|
@@ -107,13 +111,15 @@ func testMqttMessageQueue(t *testing.T, mqttMessageQueue *mqtt.MessageQueue) {
|
|
|
fmt.Println("mqtt test1 consumed")
|
|
|
|
|
|
wg.Done()
|
|
|
+
|
|
|
+ return nil
|
|
|
})
|
|
|
if err != nil {
|
|
|
t.Fatalf("%+v\n", err)
|
|
|
}
|
|
|
|
|
|
err = mqttMessageQueue.Subscribe("test2", "test-mqtt",
|
|
|
- func(queue common.MessageQueue, topic string, data string) {
|
|
|
+ func(queue common.MessageQueue, topic string, data string) error {
|
|
|
if string(data) != "test-message" {
|
|
|
t.Fatalf("%+v\n", errors.New("消息数据不一致"))
|
|
|
}
|
|
@@ -121,6 +127,8 @@ func testMqttMessageQueue(t *testing.T, mqttMessageQueue *mqtt.MessageQueue) {
|
|
|
fmt.Println("mqtt test2 consumed")
|
|
|
|
|
|
wg.Done()
|
|
|
+
|
|
|
+ return nil
|
|
|
})
|
|
|
if err != nil {
|
|
|
t.Fatalf("%+v\n", err)
|
|
@@ -139,7 +147,7 @@ func testMessageQueue(t *testing.T, messageQueue common.MessageQueue) {
|
|
|
wg.Add(2)
|
|
|
|
|
|
err := message_queue.Subscribe(messageQueue, "test1", "test-message-queue",
|
|
|
- func(queue common.MessageQueue, topic string, data string) {
|
|
|
+ func(queue common.MessageQueue, topic string, data string) error {
|
|
|
if string(data) != "test-message" {
|
|
|
t.Fatalf("%+v\n", errors.New("消息数据不一致"))
|
|
|
}
|
|
@@ -147,13 +155,15 @@ func testMessageQueue(t *testing.T, messageQueue common.MessageQueue) {
|
|
|
fmt.Println("test1 consumed")
|
|
|
|
|
|
wg.Done()
|
|
|
+
|
|
|
+ return nil
|
|
|
})
|
|
|
if err != nil {
|
|
|
t.Fatalf("%+v\n", err)
|
|
|
}
|
|
|
|
|
|
err = message_queue.Subscribe(messageQueue, "test2", "test-message-queue",
|
|
|
- func(queue common.MessageQueue, topic string, data string) {
|
|
|
+ func(queue common.MessageQueue, topic string, data string) error {
|
|
|
if string(data) != "test-message" {
|
|
|
t.Fatalf("%+v\n", errors.New("消息数据不一致"))
|
|
|
}
|
|
@@ -161,6 +171,8 @@ func testMessageQueue(t *testing.T, messageQueue common.MessageQueue) {
|
|
|
fmt.Println("test2 consumed")
|
|
|
|
|
|
wg.Done()
|
|
|
+
|
|
|
+ return nil
|
|
|
})
|
|
|
if err != nil {
|
|
|
t.Fatalf("%+v\n", err)
|