Browse Source

修改消息队列配置bug

yjp 1 week ago
parent
commit
28bb578403

+ 1 - 0
baize.go

@@ -17,6 +17,7 @@ func NewApplication(conf application.Config) *application.App {
 	infrastructureConfig := new(infrastructure.Config)
 	infrastructureConfig.DatabaseConfig = conf.InfrastructureConfig.Database
 	infrastructureConfig.CacheConfig = conf.InfrastructureConfig.Cache
+	infrastructureConfig.MessageQueueConfig = conf.InfrastructureConfig.MessageQueue
 
 	infrastructureInstance := infrastructure.NewInfrastructure(*infrastructureConfig)
 

+ 3 - 2
framework/core/infrastructure/message_queue/common/common.go

@@ -5,8 +5,9 @@ package common
 // - queue: 消息队列
 // - topic: 主题
 // - data: 消息数据
-// 返回值: 无
-type MessageHandler func(queue MessageQueue, topic string, data string)
+// 返回值:
+// - error: 错误信息
+type MessageHandler func(queue MessageQueue, topic string, data string) error
 
 // MessageQueue 消息队列接口
 type MessageQueue interface {

+ 7 - 1
framework/core/infrastructure/message_queue/mqtt/mqtt.go

@@ -71,7 +71,13 @@ func (messageQueue *MessageQueue) Subscribe(group string, topic string, handler
 		defer messageQueue.topicGroupMapHandlerMutex.RUnlock()
 
 		for _, groupHandler := range messageQueue.topicGroupHandlerMap[topic] {
-			go groupHandler(messageQueue, message.Topic(), string(message.Payload()))
+			go func() {
+				err := groupHandler(messageQueue, message.Topic(), string(message.Payload()))
+				if err != nil {
+					logger.GetInstance().Error(err)
+					return
+				}
+			}()
 		}
 	})
 

+ 8 - 2
framework/core/infrastructure/message_queue/redis/redis.go

@@ -119,11 +119,17 @@ func (messageQueue *MessageQueue) Subscribe(group string, topic string, handler
 	newConsumer.Register(topic, func(message *redisqueue.Message) error {
 		data, ok := message.Values[messageValuesDataKey].(string)
 		if !ok {
-			logger.GetInstance().Error(errors.New("数据不存在"))
+			err := errors.New("数据不存在")
+			logger.GetInstance().Error(err)
+			return nil
+		}
+
+		err := handler(messageQueue, message.Stream, data)
+		if err != nil {
+			logger.GetInstance().Error(err)
 			return nil
 		}
 
-		handler(messageQueue, message.Stream, data)
 		return nil
 	})
 

+ 18 - 6
test/message_queue_test.go

@@ -59,7 +59,7 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
 	wg.Add(2)
 
 	err := redisMessageQueue.Subscribe("test1", "test-redis",
-		func(queue common.MessageQueue, topic string, data string) {
+		func(queue common.MessageQueue, topic string, data string) error {
 			if string(data) != "test-message" {
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
@@ -67,13 +67,15 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
 			fmt.Println("redis test1 consumed")
 
 			wg.Done()
+
+			return nil
 		})
 	if err != nil {
 		t.Fatalf("%+v\n", err)
 	}
 
 	err = redisMessageQueue.Subscribe("test2", "test-redis",
-		func(queue common.MessageQueue, topic string, data string) {
+		func(queue common.MessageQueue, topic string, data string) error {
 			if string(data) != "test-message" {
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
@@ -81,6 +83,8 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
 			fmt.Println("redis test2 consumed")
 
 			wg.Done()
+
+			return nil
 		})
 	if err != nil {
 		t.Fatalf("%+v\n", err)
@@ -99,7 +103,7 @@ func testMqttMessageQueue(t *testing.T, mqttMessageQueue *mqtt.MessageQueue) {
 	wg.Add(2)
 
 	err := mqttMessageQueue.Subscribe("test1", "test-mqtt",
-		func(queue common.MessageQueue, topic string, data string) {
+		func(queue common.MessageQueue, topic string, data string) error {
 			if string(data) != "test-message" {
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
@@ -107,13 +111,15 @@ func testMqttMessageQueue(t *testing.T, mqttMessageQueue *mqtt.MessageQueue) {
 			fmt.Println("mqtt test1 consumed")
 
 			wg.Done()
+
+			return nil
 		})
 	if err != nil {
 		t.Fatalf("%+v\n", err)
 	}
 
 	err = mqttMessageQueue.Subscribe("test2", "test-mqtt",
-		func(queue common.MessageQueue, topic string, data string) {
+		func(queue common.MessageQueue, topic string, data string) error {
 			if string(data) != "test-message" {
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
@@ -121,6 +127,8 @@ func testMqttMessageQueue(t *testing.T, mqttMessageQueue *mqtt.MessageQueue) {
 			fmt.Println("mqtt test2 consumed")
 
 			wg.Done()
+
+			return nil
 		})
 	if err != nil {
 		t.Fatalf("%+v\n", err)
@@ -139,7 +147,7 @@ func testMessageQueue(t *testing.T, messageQueue common.MessageQueue) {
 	wg.Add(2)
 
 	err := message_queue.Subscribe(messageQueue, "test1", "test-message-queue",
-		func(queue common.MessageQueue, topic string, data string) {
+		func(queue common.MessageQueue, topic string, data string) error {
 			if string(data) != "test-message" {
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
@@ -147,13 +155,15 @@ func testMessageQueue(t *testing.T, messageQueue common.MessageQueue) {
 			fmt.Println("test1 consumed")
 
 			wg.Done()
+
+			return nil
 		})
 	if err != nil {
 		t.Fatalf("%+v\n", err)
 	}
 
 	err = message_queue.Subscribe(messageQueue, "test2", "test-message-queue",
-		func(queue common.MessageQueue, topic string, data string) {
+		func(queue common.MessageQueue, topic string, data string) error {
 			if string(data) != "test-message" {
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
@@ -161,6 +171,8 @@ func testMessageQueue(t *testing.T, messageQueue common.MessageQueue) {
 			fmt.Println("test2 consumed")
 
 			wg.Done()
+
+			return nil
 		})
 	if err != nil {
 		t.Fatalf("%+v\n", err)