Browse Source

修改消息队列

yjp 8 months ago
parent
commit
faf328aa40

+ 74 - 2
framework/core/infrastructure/message_queue/common/common.go

@@ -1,16 +1,88 @@
 package common
 
+import (
+	"encoding/json"
+	"git.sxidc.com/go-tools/utils/strutils"
+	"github.com/pkg/errors"
+	"time"
+)
+
+type CloudEvent struct {
+	SpecVersion     string `json:"specversion"`
+	ID              string `json:"id"`
+	Type            string `json:"type"`
+	Source          string `json:"source"`
+	DataContentType string `json:"datacontenttype"`
+	Time            string `json:"time"`
+	Data            string `json:"data"`
+}
+
+func NewCloudEvent(eventID string, eventType string, data string) *CloudEvent {
+	return &CloudEvent{
+		SpecVersion:     "v1.0",
+		ID:              eventID,
+		Type:            eventType,
+		Source:          "baize.sxidc.com",
+		DataContentType: "application/json",
+		Time:            time.Now().Format(time.RFC3339),
+		Data:            data,
+	}
+}
+
+func UnmarshalCloudEvent(data []byte) (*CloudEvent, error) {
+	event := new(CloudEvent)
+	err := json.Unmarshal(data, event)
+	if err != nil {
+		return nil, errors.New(err.Error())
+	}
+
+	return event, nil
+}
+
+func (event *CloudEvent) Marshal() ([]byte, error) {
+	if strutils.IsStringEmpty(event.SpecVersion) {
+		return nil, errors.New("没有传递事件规范版本")
+	}
+
+	if strutils.IsStringEmpty(event.ID) {
+		return nil, errors.New("没有传递事件ID")
+	}
+
+	if strutils.IsStringEmpty(event.Type) {
+		return nil, errors.New("没有传递事件类型")
+	}
+
+	if strutils.IsStringEmpty(event.Source) {
+		return nil, errors.New("没有传递事件源")
+	}
+
+	if strutils.IsStringEmpty(event.DataContentType) {
+		return nil, errors.New("没有传递事件数据类型")
+	}
+
+	if strutils.IsStringEmpty(event.Time) {
+		return nil, errors.New("没有传递事件时间")
+	}
+
+	eventJson, err := json.Marshal(event)
+	if err != nil {
+		return nil, errors.New(err.Error())
+	}
+
+	return eventJson, nil
+}
+
 // MessageHandler 消息处理函数
 // 参数:
 // - queue: 消息队列
 // - topic: 主题
 // - data: 消息数据
 // 返回值: 无
-type MessageHandler func(queue MessageQueue, topic string, data string)
+type MessageHandler func(queue MessageQueue, topic string, event *CloudEvent)
 
 // MessageQueue 消息队列接口
 type MessageQueue interface {
 	Subscribe(group string, topic string, handler MessageHandler) error
 	UnSubscribe(group string, topic string) error
-	Publish(topic string, data string) error
+	Publish(topic string, event *CloudEvent) error
 }

+ 16 - 3
framework/core/infrastructure/message_queue/message_queue.go

@@ -1,6 +1,19 @@
 package message_queue
 
-import "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common"
+import (
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common"
+)
+
+// NewCloudEvent 创建CloudEventType
+// 参数:
+// - eventID: 事件ID,可以使用实体ID
+// - eventType事件类型: 一般为领域.操作,如project.create
+// - data: 事件的数据
+// 返回值:
+// - CloudEvent
+func NewCloudEvent(eventID string, eventType string, data string) *common.CloudEvent {
+	return common.NewCloudEvent(eventID, eventType, data)
+}
 
 // Subscribe 订阅
 // 参数:
@@ -32,6 +45,6 @@ func UnSubscribe(messageQueue common.MessageQueue, group string, topic string) e
 // - data: 消息数据
 // 返回值:
 // - 错误
-func Publish(messageQueue common.MessageQueue, topic string, data string) error {
-	return messageQueue.Publish(topic, data)
+func Publish(messageQueue common.MessageQueue, topic string, event *common.CloudEvent) error {
+	return messageQueue.Publish(topic, event)
 }

+ 14 - 3
framework/core/infrastructure/message_queue/mqtt/mqtt.go

@@ -70,8 +70,14 @@ func (messageQueue *MessageQueue) Subscribe(group string, topic string, handler
 		messageQueue.topicGroupMapHandlerMutex.RLock()
 		defer messageQueue.topicGroupMapHandlerMutex.RUnlock()
 
+		event, err := common.UnmarshalCloudEvent([]byte(message.Payload()))
+		if err != nil {
+			logger.GetInstance().Error(err)
+			return
+		}
+
 		for _, groupHandler := range messageQueue.topicGroupHandlerMap[topic] {
-			go groupHandler(messageQueue, message.Topic(), string(message.Payload()))
+			go groupHandler(messageQueue, message.Topic(), event)
 		}
 	})
 
@@ -134,8 +140,13 @@ func (messageQueue *MessageQueue) UnSubscribe(group string, topic string) error
 	return nil
 }
 
-func (messageQueue *MessageQueue) Publish(topic string, data string) error {
-	token := messageQueue.mqttClient.Publish(topic, 1, false, data)
+func (messageQueue *MessageQueue) Publish(topic string, event *common.CloudEvent) error {
+	eventBytes, err := event.Marshal()
+	if err != nil {
+		return err
+	}
+
+	token := messageQueue.mqttClient.Publish(topic, 1, false, eventBytes)
 
 	if !token.WaitTimeout(20 * time.Second) {
 		return errors.New("发布订阅超时")

+ 14 - 3
framework/core/infrastructure/message_queue/redis/redis.go

@@ -123,7 +123,13 @@ func (messageQueue *MessageQueue) Subscribe(group string, topic string, handler
 			return nil
 		}
 
-		handler(messageQueue, message.Stream, data)
+		event, err := common.UnmarshalCloudEvent([]byte(data))
+		if err != nil {
+			logger.GetInstance().Error(err)
+			return nil
+		}
+
+		handler(messageQueue, message.Stream, event)
 		return nil
 	})
 
@@ -150,7 +156,7 @@ func (messageQueue *MessageQueue) UnSubscribe(group string, topic string) error
 	return nil
 }
 
-func (messageQueue *MessageQueue) Publish(topic string, data string) error {
+func (messageQueue *MessageQueue) Publish(topic string, event *common.CloudEvent) error {
 	messageQueue.producerMapMutex.Lock()
 	defer messageQueue.producerMapMutex.Unlock()
 
@@ -159,10 +165,15 @@ func (messageQueue *MessageQueue) Publish(topic string, data string) error {
 		return err
 	}
 
+	eventBytes, err := event.Marshal()
+	if err != nil {
+		return err
+	}
+
 	err = producer.Enqueue(&redisqueue.Message{
 		Stream: topic,
 		Values: map[string]any{
-			messageValuesDataKey: data,
+			messageValuesDataKey: string(eventBytes),
 		},
 	})
 	if err != nil {

+ 66 - 15
test/message_queue_test.go

@@ -59,8 +59,16 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
 	wg.Add(2)
 
 	err := redisMessageQueue.Subscribe("test1", "test-redis",
-		func(queue common.MessageQueue, topic string, data string) {
-			if string(data) != "test-message" {
+		func(queue common.MessageQueue, topic string, event *common.CloudEvent) {
+			if event.ID != "1" {
+				t.Fatalf("%+v\n", errors.New("消息ID不一致"))
+			}
+
+			if event.Type != "test" {
+				t.Fatalf("%+v\n", errors.New("消息类型不一致"))
+			}
+
+			if event.Data != "test-message" {
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
 
@@ -73,8 +81,16 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
 	}
 
 	err = redisMessageQueue.Subscribe("test2", "test-redis",
-		func(queue common.MessageQueue, topic string, data string) {
-			if string(data) != "test-message" {
+		func(queue common.MessageQueue, topic string, event *common.CloudEvent) {
+			if event.ID != "1" {
+				t.Fatalf("%+v\n", errors.New("消息ID不一致"))
+			}
+
+			if event.Type != "test" {
+				t.Fatalf("%+v\n", errors.New("消息类型不一致"))
+			}
+
+			if event.Data != "test-message" {
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
 
@@ -86,7 +102,8 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
 		t.Fatalf("%+v\n", err)
 	}
 
-	err = redisMessageQueue.Publish("test-redis", "test-message")
+	err = redisMessageQueue.Publish("test-redis",
+		message_queue.NewCloudEvent("1", "test", "test-message"))
 	if err != nil {
 		t.Fatalf("%+v\n", err)
 	}
@@ -99,8 +116,16 @@ func testMqttMessageQueue(t *testing.T, mqttMessageQueue *mqtt.MessageQueue) {
 	wg.Add(2)
 
 	err := mqttMessageQueue.Subscribe("test1", "test-mqtt",
-		func(queue common.MessageQueue, topic string, data string) {
-			if string(data) != "test-message" {
+		func(queue common.MessageQueue, topic string, event *common.CloudEvent) {
+			if event.ID != "1" {
+				t.Fatalf("%+v\n", errors.New("消息ID不一致"))
+			}
+
+			if event.Type != "test" {
+				t.Fatalf("%+v\n", errors.New("消息类型不一致"))
+			}
+
+			if event.Data != "test-message" {
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
 
@@ -113,8 +138,16 @@ func testMqttMessageQueue(t *testing.T, mqttMessageQueue *mqtt.MessageQueue) {
 	}
 
 	err = mqttMessageQueue.Subscribe("test2", "test-mqtt",
-		func(queue common.MessageQueue, topic string, data string) {
-			if string(data) != "test-message" {
+		func(queue common.MessageQueue, topic string, event *common.CloudEvent) {
+			if event.ID != "1" {
+				t.Fatalf("%+v\n", errors.New("消息ID不一致"))
+			}
+
+			if event.Type != "test" {
+				t.Fatalf("%+v\n", errors.New("消息类型不一致"))
+			}
+
+			if event.Data != "test-message" {
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
 
@@ -126,7 +159,8 @@ func testMqttMessageQueue(t *testing.T, mqttMessageQueue *mqtt.MessageQueue) {
 		t.Fatalf("%+v\n", err)
 	}
 
-	err = mqttMessageQueue.Publish("test-mqtt", "test-message")
+	err = mqttMessageQueue.Publish("test-mqtt",
+		message_queue.NewCloudEvent("1", "test", "test-message"))
 	if err != nil {
 		t.Fatalf("%+v\n", err)
 	}
@@ -139,8 +173,16 @@ func testMessageQueue(t *testing.T, messageQueue common.MessageQueue) {
 	wg.Add(2)
 
 	err := message_queue.Subscribe(messageQueue, "test1", "test-message-queue",
-		func(queue common.MessageQueue, topic string, data string) {
-			if string(data) != "test-message" {
+		func(queue common.MessageQueue, topic string, event *common.CloudEvent) {
+			if event.ID != "1" {
+				t.Fatalf("%+v\n", errors.New("消息ID不一致"))
+			}
+
+			if event.Type != "test" {
+				t.Fatalf("%+v\n", errors.New("消息类型不一致"))
+			}
+
+			if event.Data != "test-message" {
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
 
@@ -153,8 +195,16 @@ func testMessageQueue(t *testing.T, messageQueue common.MessageQueue) {
 	}
 
 	err = message_queue.Subscribe(messageQueue, "test2", "test-message-queue",
-		func(queue common.MessageQueue, topic string, data string) {
-			if string(data) != "test-message" {
+		func(queue common.MessageQueue, topic string, event *common.CloudEvent) {
+			if event.ID != "1" {
+				t.Fatalf("%+v\n", errors.New("消息ID不一致"))
+			}
+
+			if event.Type != "test" {
+				t.Fatalf("%+v\n", errors.New("消息类型不一致"))
+			}
+
+			if event.Data != "test-message" {
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
 
@@ -166,7 +216,8 @@ func testMessageQueue(t *testing.T, messageQueue common.MessageQueue) {
 		t.Fatalf("%+v\n", err)
 	}
 
-	err = message_queue.Publish(messageQueue, "test-message-queue", "test-message")
+	err = message_queue.Publish(messageQueue, "test-message-queue",
+		message_queue.NewCloudEvent("1", "test", "test-message"))
 	if err != nil {
 		t.Fatalf("%+v\n", err)
 	}