Explorar o código

修改CloudEvent

yjp hai 2 meses
pai
achega
3f40a35316

+ 94 - 0
framework/core/data_protocol/cloud_event.go

@@ -0,0 +1,94 @@
+package data_protocol
+
+import (
+	"encoding/json"
+	"git.sxidc.com/go-tools/utils/strutils"
+	"github.com/pkg/errors"
+	"time"
+)
+
+const (
+	DataContentTypeJson = "application/json"
+)
+
+func NewCloudEvent(eventID string, eventType string, source string, dataContentType string, data []byte) *CloudEvent {
+	return &CloudEvent{
+		SpecVersion:     "v1.0",
+		ID:              eventID,
+		Type:            eventType,
+		Source:          source,
+		DataContentType: dataContentType,
+		Time:            time.Now().Format(time.RFC3339),
+		Data:            data,
+	}
+}
+
+func NewCloudEventJson(eventID string, eventType string, source string, obj any) (*CloudEvent, error) {
+	dataJsonBytes, err := json.Marshal(obj)
+	if err != nil {
+		return nil, errors.New(err.Error())
+	}
+
+	return &CloudEvent{
+		SpecVersion:     "v1.0",
+		ID:              eventID,
+		Type:            eventType,
+		Source:          source,
+		DataContentType: DataContentTypeJson,
+		Time:            time.Now().Format(time.RFC3339),
+		Data:            dataJsonBytes,
+	}, nil
+}
+
+func UnmarshalJsonCloudEvent(cloudEventJsonBytes []byte) (*CloudEvent, error) {
+	event := new(CloudEvent)
+	err := json.Unmarshal(cloudEventJsonBytes, event)
+	if err != nil {
+		return nil, errors.New(err.Error())
+	}
+
+	return event, nil
+}
+
+type CloudEvent struct {
+	SpecVersion     string `json:"specversion"`
+	ID              string `json:"id"`
+	Type            string `json:"type"`
+	Source          string `json:"source"`
+	DataContentType string `json:"datacontenttype"`
+	Time            string `json:"time"`
+	Data            []byte `json:"data"`
+}
+
+func (event *CloudEvent) MarshalJson() ([]byte, error) {
+	if strutils.IsStringEmpty(event.SpecVersion) {
+		return nil, errors.New("没有传递事件规范版本")
+	}
+
+	if strutils.IsStringEmpty(event.ID) {
+		return nil, errors.New("没有传递事件ID")
+	}
+
+	if strutils.IsStringEmpty(event.Type) {
+		return nil, errors.New("没有传递事件类型")
+	}
+
+	if strutils.IsStringEmpty(event.Source) {
+		return nil, errors.New("没有传递事件源")
+	}
+
+	if strutils.IsStringEmpty(event.DataContentType) {
+		return nil, errors.New("没有传递事件数据类型")
+	}
+
+	if strutils.IsStringEmpty(event.Time) {
+		return nil, errors.New("没有传递事件时间")
+	}
+
+	eventJson, err := json.Marshal(event)
+	if err != nil {
+		return nil, errors.New(err.Error())
+	}
+
+	return eventJson, nil
+}

+ 3 - 73
framework/core/infrastructure/message_queue/common/common.go

@@ -1,76 +1,6 @@
 package common
 
-import (
-	"encoding/json"
-	"git.sxidc.com/go-tools/utils/strutils"
-	"github.com/pkg/errors"
-	"time"
-)
-
-type CloudEvent struct {
-	SpecVersion     string `json:"specversion"`
-	ID              string `json:"id"`
-	Type            string `json:"type"`
-	Source          string `json:"source"`
-	DataContentType string `json:"datacontenttype"`
-	Time            string `json:"time"`
-	Data            string `json:"data"`
-}
-
-func NewCloudEvent(eventID string, eventType string, data string) *CloudEvent {
-	return &CloudEvent{
-		SpecVersion:     "v1.0",
-		ID:              eventID,
-		Type:            eventType,
-		Source:          "baize.sxidc.com",
-		DataContentType: "application/json",
-		Time:            time.Now().Format(time.RFC3339),
-		Data:            data,
-	}
-}
-
-func UnmarshalCloudEvent(data []byte) (*CloudEvent, error) {
-	event := new(CloudEvent)
-	err := json.Unmarshal(data, event)
-	if err != nil {
-		return nil, errors.New(err.Error())
-	}
-
-	return event, nil
-}
-
-func (event *CloudEvent) Marshal() ([]byte, error) {
-	if strutils.IsStringEmpty(event.SpecVersion) {
-		return nil, errors.New("没有传递事件规范版本")
-	}
-
-	if strutils.IsStringEmpty(event.ID) {
-		return nil, errors.New("没有传递事件ID")
-	}
-
-	if strutils.IsStringEmpty(event.Type) {
-		return nil, errors.New("没有传递事件类型")
-	}
-
-	if strutils.IsStringEmpty(event.Source) {
-		return nil, errors.New("没有传递事件源")
-	}
-
-	if strutils.IsStringEmpty(event.DataContentType) {
-		return nil, errors.New("没有传递事件数据类型")
-	}
-
-	if strutils.IsStringEmpty(event.Time) {
-		return nil, errors.New("没有传递事件时间")
-	}
-
-	eventJson, err := json.Marshal(event)
-	if err != nil {
-		return nil, errors.New(err.Error())
-	}
-
-	return eventJson, nil
-}
+import "git.sxidc.com/go-framework/baize/framework/core/data_protocol"
 
 // MessageHandler 消息处理函数
 // 参数:
@@ -78,11 +8,11 @@ func (event *CloudEvent) Marshal() ([]byte, error) {
 // - topic: 主题
 // - data: 消息数据
 // 返回值: 无
-type MessageHandler func(queue MessageQueue, topic string, event *CloudEvent)
+type MessageHandler func(queue MessageQueue, topic string, event *data_protocol.CloudEvent)
 
 // MessageQueue 消息队列接口
 type MessageQueue interface {
 	Subscribe(group string, topic string, handler MessageHandler) error
 	UnSubscribe(group string, topic string) error
-	Publish(topic string, event *CloudEvent) error
+	Publish(topic string, event *data_protocol.CloudEvent) error
 }

+ 2 - 12
framework/core/infrastructure/message_queue/message_queue.go

@@ -1,20 +1,10 @@
 package message_queue
 
 import (
+	"git.sxidc.com/go-framework/baize/framework/core/data_protocol"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common"
 )
 
-// NewCloudEvent 创建CloudEventType
-// 参数:
-// - eventID: 事件ID,可以使用实体ID
-// - eventType事件类型: 一般为领域.操作,如project.create
-// - data: 事件的数据
-// 返回值:
-// - CloudEvent
-func NewCloudEvent(eventID string, eventType string, data string) *common.CloudEvent {
-	return common.NewCloudEvent(eventID, eventType, data)
-}
-
 // Subscribe 订阅
 // 参数:
 // - queue: 消息队列
@@ -45,6 +35,6 @@ func UnSubscribe(messageQueue common.MessageQueue, group string, topic string) e
 // - event: CloudEvent
 // 返回值:
 // - 错误
-func Publish(messageQueue common.MessageQueue, topic string, event *common.CloudEvent) error {
+func Publish(messageQueue common.MessageQueue, topic string, event *data_protocol.CloudEvent) error {
 	return messageQueue.Publish(topic, event)
 }

+ 4 - 3
framework/core/infrastructure/message_queue/mqtt/mqtt.go

@@ -1,6 +1,7 @@
 package mqtt
 
 import (
+	"git.sxidc.com/go-framework/baize/framework/core/data_protocol"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common"
 	"git.sxidc.com/go-tools/utils/strutils"
@@ -70,7 +71,7 @@ func (messageQueue *MessageQueue) Subscribe(group string, topic string, handler
 		messageQueue.topicGroupMapHandlerMutex.RLock()
 		defer messageQueue.topicGroupMapHandlerMutex.RUnlock()
 
-		event, err := common.UnmarshalCloudEvent([]byte(message.Payload()))
+		event, err := data_protocol.UnmarshalJsonCloudEvent(message.Payload())
 		if err != nil {
 			logger.GetInstance().Error(err)
 			return
@@ -140,8 +141,8 @@ func (messageQueue *MessageQueue) UnSubscribe(group string, topic string) error
 	return nil
 }
 
-func (messageQueue *MessageQueue) Publish(topic string, event *common.CloudEvent) error {
-	eventBytes, err := event.Marshal()
+func (messageQueue *MessageQueue) Publish(topic string, event *data_protocol.CloudEvent) error {
+	eventBytes, err := event.MarshalJson()
 	if err != nil {
 		return err
 	}

+ 4 - 3
framework/core/infrastructure/message_queue/redis/redis.go

@@ -1,6 +1,7 @@
 package redis
 
 import (
+	"git.sxidc.com/go-framework/baize/framework/core/data_protocol"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common"
 	"github.com/pkg/errors"
@@ -123,7 +124,7 @@ func (messageQueue *MessageQueue) Subscribe(group string, topic string, handler
 			return nil
 		}
 
-		event, err := common.UnmarshalCloudEvent([]byte(data))
+		event, err := data_protocol.UnmarshalJsonCloudEvent([]byte(data))
 		if err != nil {
 			logger.GetInstance().Error(err)
 			return nil
@@ -156,7 +157,7 @@ func (messageQueue *MessageQueue) UnSubscribe(group string, topic string) error
 	return nil
 }
 
-func (messageQueue *MessageQueue) Publish(topic string, event *common.CloudEvent) error {
+func (messageQueue *MessageQueue) Publish(topic string, event *data_protocol.CloudEvent) error {
 	messageQueue.producerMapMutex.Lock()
 	defer messageQueue.producerMapMutex.Unlock()
 
@@ -165,7 +166,7 @@ func (messageQueue *MessageQueue) Publish(topic string, event *common.CloudEvent
 		return err
 	}
 
-	eventBytes, err := event.Marshal()
+	eventBytes, err := event.MarshalJson()
 	if err != nil {
 		return err
 	}

+ 16 - 15
test/message_queue_test.go

@@ -2,6 +2,7 @@ package test
 
 import (
 	"fmt"
+	"git.sxidc.com/go-framework/baize/framework/core/data_protocol"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common"
@@ -59,7 +60,7 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
 	wg.Add(2)
 
 	err := redisMessageQueue.Subscribe("test1", "test-redis",
-		func(queue common.MessageQueue, topic string, event *common.CloudEvent) {
+		func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) {
 			if event.ID != "1" {
 				t.Fatalf("%+v\n", errors.New("消息ID不一致"))
 			}
@@ -68,7 +69,7 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
 				t.Fatalf("%+v\n", errors.New("消息类型不一致"))
 			}
 
-			if event.Data != "test-message" {
+			if string(event.Data) != "test-message" {
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
 
@@ -81,7 +82,7 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
 	}
 
 	err = redisMessageQueue.Subscribe("test2", "test-redis",
-		func(queue common.MessageQueue, topic string, event *common.CloudEvent) {
+		func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) {
 			if event.ID != "1" {
 				t.Fatalf("%+v\n", errors.New("消息ID不一致"))
 			}
@@ -90,7 +91,7 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
 				t.Fatalf("%+v\n", errors.New("消息类型不一致"))
 			}
 
-			if event.Data != "test-message" {
+			if string(event.Data) != "test-message" {
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
 
@@ -103,7 +104,7 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
 	}
 
 	err = redisMessageQueue.Publish("test-redis",
-		message_queue.NewCloudEvent("1", "test", "test-message"))
+		data_protocol.NewCloudEvent("1", "test", "baize-test.com", "application/text", []byte("test-message")))
 	if err != nil {
 		t.Fatalf("%+v\n", err)
 	}
@@ -116,7 +117,7 @@ func testMqttMessageQueue(t *testing.T, mqttMessageQueue *mqtt.MessageQueue) {
 	wg.Add(2)
 
 	err := mqttMessageQueue.Subscribe("test1", "test-mqtt",
-		func(queue common.MessageQueue, topic string, event *common.CloudEvent) {
+		func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) {
 			if event.ID != "1" {
 				t.Fatalf("%+v\n", errors.New("消息ID不一致"))
 			}
@@ -125,7 +126,7 @@ func testMqttMessageQueue(t *testing.T, mqttMessageQueue *mqtt.MessageQueue) {
 				t.Fatalf("%+v\n", errors.New("消息类型不一致"))
 			}
 
-			if event.Data != "test-message" {
+			if string(event.Data) != "test-message" {
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
 
@@ -138,7 +139,7 @@ func testMqttMessageQueue(t *testing.T, mqttMessageQueue *mqtt.MessageQueue) {
 	}
 
 	err = mqttMessageQueue.Subscribe("test2", "test-mqtt",
-		func(queue common.MessageQueue, topic string, event *common.CloudEvent) {
+		func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) {
 			if event.ID != "1" {
 				t.Fatalf("%+v\n", errors.New("消息ID不一致"))
 			}
@@ -147,7 +148,7 @@ func testMqttMessageQueue(t *testing.T, mqttMessageQueue *mqtt.MessageQueue) {
 				t.Fatalf("%+v\n", errors.New("消息类型不一致"))
 			}
 
-			if event.Data != "test-message" {
+			if string(event.Data) != "test-message" {
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
 
@@ -160,7 +161,7 @@ func testMqttMessageQueue(t *testing.T, mqttMessageQueue *mqtt.MessageQueue) {
 	}
 
 	err = mqttMessageQueue.Publish("test-mqtt",
-		message_queue.NewCloudEvent("1", "test", "test-message"))
+		data_protocol.NewCloudEvent("1", "test", "baize-test.com", "application/text", []byte("test-message")))
 	if err != nil {
 		t.Fatalf("%+v\n", err)
 	}
@@ -173,7 +174,7 @@ func testMessageQueue(t *testing.T, messageQueue common.MessageQueue) {
 	wg.Add(2)
 
 	err := message_queue.Subscribe(messageQueue, "test1", "test-message-queue",
-		func(queue common.MessageQueue, topic string, event *common.CloudEvent) {
+		func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) {
 			if event.ID != "1" {
 				t.Fatalf("%+v\n", errors.New("消息ID不一致"))
 			}
@@ -182,7 +183,7 @@ func testMessageQueue(t *testing.T, messageQueue common.MessageQueue) {
 				t.Fatalf("%+v\n", errors.New("消息类型不一致"))
 			}
 
-			if event.Data != "test-message" {
+			if string(event.Data) != "test-message" {
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
 
@@ -195,7 +196,7 @@ func testMessageQueue(t *testing.T, messageQueue common.MessageQueue) {
 	}
 
 	err = message_queue.Subscribe(messageQueue, "test2", "test-message-queue",
-		func(queue common.MessageQueue, topic string, event *common.CloudEvent) {
+		func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) {
 			if event.ID != "1" {
 				t.Fatalf("%+v\n", errors.New("消息ID不一致"))
 			}
@@ -204,7 +205,7 @@ func testMessageQueue(t *testing.T, messageQueue common.MessageQueue) {
 				t.Fatalf("%+v\n", errors.New("消息类型不一致"))
 			}
 
-			if event.Data != "test-message" {
+			if string(event.Data) != "test-message" {
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
 
@@ -217,7 +218,7 @@ func testMessageQueue(t *testing.T, messageQueue common.MessageQueue) {
 	}
 
 	err = message_queue.Publish(messageQueue, "test-message-queue",
-		message_queue.NewCloudEvent("1", "test", "test-message"))
+		data_protocol.NewCloudEvent("1", "test", "baize-test.com", "application/text", []byte("test-message")))
 	if err != nil {
 		t.Fatalf("%+v\n", err)
 	}