|
@@ -60,7 +60,7 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
|
|
|
wg.Add(2)
|
|
|
|
|
|
err := redisMessageQueue.Subscribe("test1", "test-redis",
|
|
|
- func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) {
|
|
|
+ func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) error {
|
|
|
if event.ID != "1" {
|
|
|
t.Fatalf("%+v\n", errors.New("消息ID不一致"))
|
|
|
}
|
|
@@ -76,13 +76,15 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
|
|
|
fmt.Println("redis test1 consumed")
|
|
|
|
|
|
wg.Done()
|
|
|
+
|
|
|
+ return nil
|
|
|
})
|
|
|
if err != nil {
|
|
|
t.Fatalf("%+v\n", err)
|
|
|
}
|
|
|
|
|
|
err = redisMessageQueue.Subscribe("test2", "test-redis",
|
|
|
- func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) {
|
|
|
+ func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) error {
|
|
|
if event.ID != "1" {
|
|
|
t.Fatalf("%+v\n", errors.New("消息ID不一致"))
|
|
|
}
|
|
@@ -98,6 +100,8 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
|
|
|
fmt.Println("redis test2 consumed")
|
|
|
|
|
|
wg.Done()
|
|
|
+
|
|
|
+ return nil
|
|
|
})
|
|
|
if err != nil {
|
|
|
t.Fatalf("%+v\n", err)
|
|
@@ -117,7 +121,7 @@ func testMqttMessageQueue(t *testing.T, mqttMessageQueue *mqtt.MessageQueue) {
|
|
|
wg.Add(2)
|
|
|
|
|
|
err := mqttMessageQueue.Subscribe("test1", "test-mqtt",
|
|
|
- func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) {
|
|
|
+ func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) error {
|
|
|
if event.ID != "1" {
|
|
|
t.Fatalf("%+v\n", errors.New("消息ID不一致"))
|
|
|
}
|
|
@@ -133,13 +137,15 @@ func testMqttMessageQueue(t *testing.T, mqttMessageQueue *mqtt.MessageQueue) {
|
|
|
fmt.Println("mqtt test1 consumed")
|
|
|
|
|
|
wg.Done()
|
|
|
+
|
|
|
+ return nil
|
|
|
})
|
|
|
if err != nil {
|
|
|
t.Fatalf("%+v\n", err)
|
|
|
}
|
|
|
|
|
|
err = mqttMessageQueue.Subscribe("test2", "test-mqtt",
|
|
|
- func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) {
|
|
|
+ func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) error {
|
|
|
if event.ID != "1" {
|
|
|
t.Fatalf("%+v\n", errors.New("消息ID不一致"))
|
|
|
}
|
|
@@ -155,6 +161,8 @@ func testMqttMessageQueue(t *testing.T, mqttMessageQueue *mqtt.MessageQueue) {
|
|
|
fmt.Println("mqtt test2 consumed")
|
|
|
|
|
|
wg.Done()
|
|
|
+
|
|
|
+ return nil
|
|
|
})
|
|
|
if err != nil {
|
|
|
t.Fatalf("%+v\n", err)
|
|
@@ -174,7 +182,7 @@ func testMessageQueue(t *testing.T, messageQueue common.MessageQueue) {
|
|
|
wg.Add(2)
|
|
|
|
|
|
err := message_queue.Subscribe(messageQueue, "test1", "test-message-queue",
|
|
|
- func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) {
|
|
|
+ func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) error {
|
|
|
if event.ID != "1" {
|
|
|
t.Fatalf("%+v\n", errors.New("消息ID不一致"))
|
|
|
}
|
|
@@ -190,13 +198,15 @@ func testMessageQueue(t *testing.T, messageQueue common.MessageQueue) {
|
|
|
fmt.Println("test1 consumed")
|
|
|
|
|
|
wg.Done()
|
|
|
+
|
|
|
+ return nil
|
|
|
})
|
|
|
if err != nil {
|
|
|
t.Fatalf("%+v\n", err)
|
|
|
}
|
|
|
|
|
|
err = message_queue.Subscribe(messageQueue, "test2", "test-message-queue",
|
|
|
- func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) {
|
|
|
+ func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) error {
|
|
|
if event.ID != "1" {
|
|
|
t.Fatalf("%+v\n", errors.New("消息ID不一致"))
|
|
|
}
|
|
@@ -212,6 +222,8 @@ func testMessageQueue(t *testing.T, messageQueue common.MessageQueue) {
|
|
|
fmt.Println("test2 consumed")
|
|
|
|
|
|
wg.Done()
|
|
|
+
|
|
|
+ return nil
|
|
|
})
|
|
|
if err != nil {
|
|
|
t.Fatalf("%+v\n", err)
|