Explorar el Código

完成mqtt消息队列测试

yjp hace 3 meses
padre
commit
1dfeb4aba9
Se han modificado 4 ficheros con 101 adiciones y 6 borrados
  1. 29 0
      framework/core/infrastructure/infrastructure.go
  2. 3 0
      go.mod
  3. 6 0
      go.sum
  4. 63 6
      test/message_queue_test.go

+ 29 - 0
framework/core/infrastructure/infrastructure.go

@@ -8,6 +8,7 @@ import (
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/database/data_service"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/database/operations"
 	messageQueueCommon "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common"
+	mqttMessageQueue "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/mqtt"
 	redisMessageQueue "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/redis"
 	"git.sxidc.com/go-tools/utils/strutils"
 )
@@ -30,6 +31,7 @@ type CacheConfig struct {
 
 type MessageQueueConfig struct {
 	Redis *RedisConfig `json:"redis" yaml:"redis"`
+	Mqtt  *MqttConfig  `json:"mqtt" yaml:"mqtt"`
 }
 
 type RedisConfig struct {
@@ -41,12 +43,19 @@ type RedisConfig struct {
 	ConsumerNum int    `json:"consumer_num" yaml:"consumer_num"`
 }
 
+type MqttConfig struct {
+	Address  string `json:"address" yaml:"address"`
+	UserName string `json:"user_name" yaml:"user_name"`
+	Password string `json:"password" yaml:"password"`
+}
+
 // Infrastructure 基础设施结构
 type Infrastructure struct {
 	dbExecutor        database.Executor
 	localCache        cache.Cache
 	redisCache        cache.Cache
 	redisMessageQueue messageQueueCommon.MessageQueue
+	mqttMessageQueue  messageQueueCommon.MessageQueue
 }
 
 func NewInfrastructure(config Config) *Infrastructure {
@@ -98,6 +107,18 @@ func NewInfrastructure(config Config) *Infrastructure {
 		i.redisMessageQueue = newRedisMessageQueue
 	}
 
+	// 初始化Mqtt消息队列
+	if config.MessageQueueConfig.Mqtt != nil {
+		mqttConf := config.MessageQueueConfig.Mqtt
+
+		newMqttMessageQueue, err := mqttMessageQueue.New(mqttConf.Address, mqttConf.UserName, mqttConf.Password)
+		if err != nil {
+			panic("初始化MQTT消息队列失败: " + err.Error())
+		}
+
+		i.mqttMessageQueue = newMqttMessageQueue
+	}
+
 	return i
 }
 
@@ -173,3 +194,11 @@ func (i Infrastructure) RedisCache() cache.Cache {
 func (i Infrastructure) RedisMessageQueue() messageQueueCommon.MessageQueue {
 	return i.redisMessageQueue
 }
+
+// MqttMessageQueue 获取Mqtt消息队列基础设施
+// 参数: 无
+// 返回值:
+// - 消息队列基础设施的接口
+func (i Infrastructure) MqttMessageQueue() messageQueueCommon.MessageQueue {
+	return i.mqttMessageQueue
+}

+ 3 - 0
go.mod

@@ -6,6 +6,7 @@ require (
 	git.sxidc.com/go-tools/utils v1.5.23
 	git.sxidc.com/service-supports/ds-sdk v0.10.9
 	github.com/dgrijalva/jwt-go v3.2.0+incompatible
+	github.com/eclipse/paho.mqtt.golang v1.4.3
 	github.com/gin-gonic/gin v1.10.0
 	github.com/go-playground/locales v0.14.1
 	github.com/go-playground/universal-translator v0.18.1
@@ -42,6 +43,7 @@ require (
 	github.com/gogo/protobuf v1.3.2 // indirect
 	github.com/golang/protobuf v1.5.4 // indirect
 	github.com/google/uuid v1.6.0 // indirect
+	github.com/gorilla/websocket v1.5.0 // indirect
 	github.com/huandu/xstrings v1.3.3 // indirect
 	github.com/imdario/mergo v0.3.11 // indirect
 	github.com/jackc/pgpassfile v1.0.0 // indirect
@@ -74,6 +76,7 @@ require (
 	golang.org/x/arch v0.8.0 // indirect
 	golang.org/x/crypto v0.23.0 // indirect
 	golang.org/x/net v0.25.0 // indirect
+	golang.org/x/sync v0.6.0 // indirect
 	golang.org/x/sys v0.20.0 // indirect
 	golang.org/x/text v0.15.0 // indirect
 	golang.org/x/tools v0.6.0 // indirect

+ 6 - 0
go.sum

@@ -31,6 +31,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumC
 github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
 github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
 github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
+github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
+github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
 github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
 github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
 github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
@@ -73,6 +75,8 @@ github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
 github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
 github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
+github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
 github.com/hinshun/vt10x v0.0.0-20180616224451-1954e6464174 h1:WlZsjVhE8Af9IcZDGgJGQpNflI3+MJSBhsgT5PCtzBQ=
 github.com/hinshun/vt10x v0.0.0-20180616224451-1954e6464174/go.mod h1:DqJ97dSdRW1W22yXSB90986pcOyQ7r45iio1KN2ez1A=
 github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
@@ -230,6 +234,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
+golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
 golang.org/x/sys v0.0.0-20180606202747-9527bec2660b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

+ 63 - 6
test/message_queue_test.go

@@ -5,6 +5,7 @@ import (
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common"
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/mqtt"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/redis"
 	"github.com/pkg/errors"
 	"sync"
@@ -20,6 +21,16 @@ func TestRedisMessageQueue(t *testing.T) {
 	testRedisMessageQueue(t, redisMessageQueue)
 }
 
+func TestMqttMessageQueue(t *testing.T) {
+	mqttMessageQueue, err := mqtt.New("localhost:1883", "admin", "mtyzxhc")
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+	defer mqtt.Destroy(mqttMessageQueue)
+
+	testMqttMessageQueue(t, mqttMessageQueue)
+}
+
 func TestMessageQueueInfrastructure(t *testing.T) {
 	i := infrastructure.NewInfrastructure(infrastructure.Config{
 		MessageQueueConfig: infrastructure.MessageQueueConfig{
@@ -30,11 +41,17 @@ func TestMessageQueueInfrastructure(t *testing.T) {
 				MaxLen:      1000,
 				ConsumerNum: 1,
 			},
+			Mqtt: &infrastructure.MqttConfig{
+				Address:  "localhost:1883",
+				UserName: "admin",
+				Password: "mtyzxhc",
+			},
 		},
 	})
 	defer infrastructure.DestroyInfrastructure(i)
 
 	testMessageQueue(t, i.RedisMessageQueue())
+	testMessageQueue(t, i.MqttMessageQueue())
 }
 
 func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue) {
@@ -47,7 +64,7 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
 
-			fmt.Println("test1 consumed")
+			fmt.Println("redis test1 consumed")
 
 			wg.Done()
 		})
@@ -61,7 +78,7 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
 
-			fmt.Println("test2 consumed")
+			fmt.Println("redis test2 consumed")
 
 			wg.Done()
 		})
@@ -77,11 +94,51 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
 	wg.Wait()
 }
 
-func testMessageQueue(t *testing.T, redisMessageQueue common.MessageQueue) {
+func testMqttMessageQueue(t *testing.T, mqttMessageQueue *mqtt.MessageQueue) {
+	wg := sync.WaitGroup{}
+	wg.Add(2)
+
+	err := mqttMessageQueue.Subscribe("test1", "test-mqtt",
+		func(queue common.MessageQueue, topic string, data string) {
+			if string(data) != "test-message" {
+				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
+			}
+
+			fmt.Println("mqtt test1 consumed")
+
+			wg.Done()
+		})
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+
+	err = mqttMessageQueue.Subscribe("test2", "test-mqtt",
+		func(queue common.MessageQueue, topic string, data string) {
+			if string(data) != "test-message" {
+				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
+			}
+
+			fmt.Println("mqtt test2 consumed")
+
+			wg.Done()
+		})
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+
+	err = mqttMessageQueue.Publish("test-mqtt", "test-message")
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+
+	wg.Wait()
+}
+
+func testMessageQueue(t *testing.T, messageQueue common.MessageQueue) {
 	wg := sync.WaitGroup{}
 	wg.Add(2)
 
-	err := message_queue.Subscribe(redisMessageQueue, "test1", "test-message-queue",
+	err := message_queue.Subscribe(messageQueue, "test1", "test-message-queue",
 		func(queue common.MessageQueue, topic string, data string) {
 			if string(data) != "test-message" {
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
@@ -95,7 +152,7 @@ func testMessageQueue(t *testing.T, redisMessageQueue common.MessageQueue) {
 		t.Fatalf("%+v\n", err)
 	}
 
-	err = message_queue.Subscribe(redisMessageQueue, "test2", "test-message-queue",
+	err = message_queue.Subscribe(messageQueue, "test2", "test-message-queue",
 		func(queue common.MessageQueue, topic string, data string) {
 			if string(data) != "test-message" {
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
@@ -109,7 +166,7 @@ func testMessageQueue(t *testing.T, redisMessageQueue common.MessageQueue) {
 		t.Fatalf("%+v\n", err)
 	}
 
-	err = message_queue.Publish(redisMessageQueue, "test-message-queue", "test-message")
+	err = message_queue.Publish(messageQueue, "test-message-queue", "test-message")
 	if err != nil {
 		t.Fatalf("%+v\n", err)
 	}