ソースを参照

Merge branch 'dev' into v1.1.0

yjp 3 ヶ月 前
コミット
09d850cb00

+ 35 - 7
framework/core/infrastructure/infrastructure.go

@@ -8,6 +8,7 @@ import (
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/database/data_service"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/database/operations"
 	messageQueueCommon "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common"
+	mqttMessageQueue "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/mqtt"
 	redisMessageQueue "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/redis"
 	"git.sxidc.com/go-tools/utils/strutils"
 )
@@ -29,16 +30,23 @@ type CacheConfig struct {
 }
 
 type MessageQueueConfig struct {
-	Redis       *RedisConfig `json:"redis" yaml:"redis"`
-	MaxLen      int64        `json:"max_len" yaml:"max_len"`
-	ConsumerNum int          `json:"consumer_num" yaml:"consumer_num"`
+	Redis *RedisConfig `json:"redis" yaml:"redis"`
+	Mqtt  *MqttConfig  `json:"mqtt" yaml:"mqtt"`
 }
 
 type RedisConfig struct {
+	Address     string `json:"address" yaml:"address"`
+	UserName    string `json:"user_name" yaml:"user_name"`
+	Password    string `json:"password" yaml:"password"`
+	DB          int    `json:"db" yaml:"db"`
+	MaxLen      int64  `json:"max_len" yaml:"max_len"`
+	ConsumerNum int    `json:"consumer_num" yaml:"consumer_num"`
+}
+
+type MqttConfig struct {
 	Address  string `json:"address" yaml:"address"`
 	UserName string `json:"user_name" yaml:"user_name"`
 	Password string `json:"password" yaml:"password"`
-	DB       int    `json:"db" yaml:"db"`
 }
 
 // Infrastructure 基础设施结构
@@ -47,6 +55,7 @@ type Infrastructure struct {
 	localCache        cache.Cache
 	redisCache        cache.Cache
 	redisMessageQueue messageQueueCommon.MessageQueue
+	mqttMessageQueue  messageQueueCommon.MessageQueue
 }
 
 func NewInfrastructure(config Config) *Infrastructure {
@@ -90,15 +99,26 @@ func NewInfrastructure(config Config) *Infrastructure {
 	// 初始化Redis消息队列
 	if config.MessageQueueConfig.Redis != nil {
 		redisConf := config.MessageQueueConfig.Redis
-		maxLen := config.MessageQueueConfig.MaxLen
 
 		newRedisMessageQueue := redisMessageQueue.New(redisConf.Address, redisConf.UserName, redisConf.Password, redisConf.DB,
-			messageQueueCommon.WithMaxLen(maxLen),
-			messageQueueCommon.WithConsumerNum(config.ConsumerNum))
+			redisMessageQueue.WithMaxLen(redisConf.MaxLen),
+			redisMessageQueue.WithConsumerNum(redisConf.ConsumerNum))
 
 		i.redisMessageQueue = newRedisMessageQueue
 	}
 
+	// 初始化Mqtt消息队列
+	if config.MessageQueueConfig.Mqtt != nil {
+		mqttConf := config.MessageQueueConfig.Mqtt
+
+		newMqttMessageQueue, err := mqttMessageQueue.New(mqttConf.Address, mqttConf.UserName, mqttConf.Password)
+		if err != nil {
+			panic("初始化MQTT消息队列失败: " + err.Error())
+		}
+
+		i.mqttMessageQueue = newMqttMessageQueue
+	}
+
 	return i
 }
 
@@ -174,3 +194,11 @@ func (i Infrastructure) RedisCache() cache.Cache {
 func (i Infrastructure) RedisMessageQueue() messageQueueCommon.MessageQueue {
 	return i.redisMessageQueue
 }
+
+// MqttMessageQueue 获取Mqtt消息队列基础设施
+// 参数: 无
+// 返回值:
+// - 消息队列基础设施的接口
+func (i Infrastructure) MqttMessageQueue() messageQueueCommon.MessageQueue {
+	return i.mqttMessageQueue
+}

+ 0 - 19
framework/core/infrastructure/message_queue/common/common.go

@@ -14,22 +14,3 @@ type MessageQueue interface {
 	UnSubscribe(group string, topic string) error
 	Publish(topic string, data string) error
 }
-
-type Option func(options *Options)
-
-type Options struct {
-	MaxLen      int64
-	ConsumerNum int
-}
-
-func WithMaxLen(maxLen int64) Option {
-	return func(options *Options) {
-		options.MaxLen = maxLen
-	}
-}
-
-func WithConsumerNum(consumerNum int) Option {
-	return func(options *Options) {
-		options.ConsumerNum = consumerNum
-	}
-}

+ 149 - 0
framework/core/infrastructure/message_queue/mqtt/mqtt.go

@@ -0,0 +1,149 @@
+package mqtt
+
+import (
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/logger"
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common"
+	"git.sxidc.com/go-tools/utils/strutils"
+	mqtt "github.com/eclipse/paho.mqtt.golang"
+	"github.com/pkg/errors"
+	"sync"
+	"time"
+)
+
+type MessageQueue struct {
+	mqttClient mqtt.Client
+
+	topicGroupMapHandlerMutex *sync.RWMutex
+	topicGroupHandlerMap      map[string]map[string]common.MessageHandler
+}
+
+func New(address string, userName string, password string) (*MessageQueue, error) {
+	clientID := "baize-" + strutils.SimpleUUID()[0:8]
+
+	mqttClient := mqtt.NewClient(mqtt.NewClientOptions().
+		SetAutoReconnect(true).
+		SetUsername(password).
+		SetPassword(userName).
+		AddBroker(address).
+		SetClientID(clientID).
+		SetOrderMatters(false).
+		SetWill(clientID+"/will", "dead", 2, true))
+
+	token := mqttClient.Connect()
+
+	if !token.WaitTimeout(20 * time.Second) {
+		return nil, errors.New("连接超时")
+	}
+
+	if token.Error() != nil {
+		return nil, token.Error()
+	}
+
+	for !mqttClient.IsConnected() {
+		logger.GetInstance().Info("等待连接MQTT代理...")
+		time.Sleep(1 * time.Second)
+	}
+
+	return &MessageQueue{
+		mqttClient:                mqttClient,
+		topicGroupMapHandlerMutex: &sync.RWMutex{},
+		topicGroupHandlerMap:      make(map[string]map[string]common.MessageHandler),
+	}, nil
+}
+
+func Destroy(messageQueue *MessageQueue) {
+	if messageQueue == nil {
+		return
+	}
+
+	messageQueue.mqttClient.Disconnect(250)
+	messageQueue.mqttClient = nil
+
+	messageQueue = nil
+}
+
+func (messageQueue *MessageQueue) Subscribe(group string, topic string, handler common.MessageHandler) error {
+	messageQueue.topicGroupMapHandlerMutex.Lock()
+	defer messageQueue.topicGroupMapHandlerMutex.Unlock()
+
+	token := messageQueue.mqttClient.Subscribe(topic, 1, func(client mqtt.Client, message mqtt.Message) {
+		messageQueue.topicGroupMapHandlerMutex.RLock()
+		defer messageQueue.topicGroupMapHandlerMutex.RUnlock()
+
+		for _, groupHandler := range messageQueue.topicGroupHandlerMap[topic] {
+			go groupHandler(messageQueue, message.Topic(), string(message.Payload()))
+		}
+	})
+
+	if !token.WaitTimeout(20 * time.Second) {
+		return errors.New("订阅超时")
+	}
+
+	if token.Error() != nil {
+		return token.Error()
+	}
+
+	groupHandlerMap, ok := messageQueue.topicGroupHandlerMap[topic]
+	if !ok {
+		groupHandlerMap = make(map[string]common.MessageHandler)
+	}
+
+	h, ok := groupHandlerMap[group]
+	if !ok {
+		h = handler
+	}
+
+	groupHandlerMap[group] = h
+	messageQueue.topicGroupHandlerMap[topic] = groupHandlerMap
+
+	return nil
+}
+
+func (messageQueue *MessageQueue) UnSubscribe(group string, topic string) error {
+	messageQueue.topicGroupMapHandlerMutex.Lock()
+	defer messageQueue.topicGroupMapHandlerMutex.Unlock()
+
+	groupHandlerMap, ok := messageQueue.topicGroupHandlerMap[topic]
+	if !ok {
+		return nil
+	}
+
+	_, ok = groupHandlerMap[group]
+	if !ok {
+		return nil
+	}
+
+	delete(groupHandlerMap, group)
+
+	if len(groupHandlerMap) == 0 {
+		delete(messageQueue.topicGroupHandlerMap, topic)
+	} else {
+		messageQueue.topicGroupHandlerMap[topic] = groupHandlerMap
+	}
+
+	token := messageQueue.mqttClient.Unsubscribe(topic)
+
+	if !token.WaitTimeout(20 * time.Second) {
+		return errors.New("取消订阅超时")
+	}
+
+	if token.Error() != nil {
+		return token.Error()
+	}
+
+	return nil
+}
+
+func (messageQueue *MessageQueue) Publish(topic string, data string) error {
+	token := messageQueue.mqttClient.Publish(topic, 1, false, data)
+
+	if !token.WaitTimeout(20 * time.Second) {
+		return errors.New("发布订阅超时")
+	}
+
+	if token.Error() != nil {
+		return token.Error()
+	}
+
+	return nil
+}

+ 23 - 4
framework/core/infrastructure/message_queue/redis/redis.go

@@ -13,6 +13,25 @@ const (
 	messageValuesDataKey = "data"
 )
 
+type Option func(options *Options)
+
+type Options struct {
+	MaxLen      int64
+	ConsumerNum int
+}
+
+func WithMaxLen(maxLen int64) Option {
+	return func(options *Options) {
+		options.MaxLen = maxLen
+	}
+}
+
+func WithConsumerNum(consumerNum int) Option {
+	return func(options *Options) {
+		options.ConsumerNum = consumerNum
+	}
+}
+
 type MessageQueue struct {
 	redisOptions *redisqueue.RedisOptions
 
@@ -22,11 +41,11 @@ type MessageQueue struct {
 	consumerMapMutex *sync.Mutex
 	consumerMap      map[string]*redisqueue.Consumer
 
-	options *common.Options
+	options *Options
 }
 
-func New(address string, userName string, password string, db int, opts ...common.Option) *MessageQueue {
-	options := new(common.Options)
+func New(address string, userName string, password string, db int, opts ...Option) *MessageQueue {
+	options := new(Options)
 
 	for _, opt := range opts {
 		opt(options)
@@ -104,7 +123,7 @@ func (messageQueue *MessageQueue) Subscribe(group string, topic string, handler
 			return nil
 		}
 
-		handler(messageQueue, topic, data)
+		handler(messageQueue, message.Stream, data)
 		return nil
 	})
 

+ 3 - 0
go.mod

@@ -6,6 +6,7 @@ require (
 	git.sxidc.com/go-tools/utils v1.5.23
 	git.sxidc.com/service-supports/ds-sdk v0.10.9
 	github.com/dgrijalva/jwt-go v3.2.0+incompatible
+	github.com/eclipse/paho.mqtt.golang v1.4.3
 	github.com/gin-gonic/gin v1.10.0
 	github.com/go-playground/locales v0.14.1
 	github.com/go-playground/universal-translator v0.18.1
@@ -42,6 +43,7 @@ require (
 	github.com/gogo/protobuf v1.3.2 // indirect
 	github.com/golang/protobuf v1.5.4 // indirect
 	github.com/google/uuid v1.6.0 // indirect
+	github.com/gorilla/websocket v1.5.0 // indirect
 	github.com/huandu/xstrings v1.3.3 // indirect
 	github.com/imdario/mergo v0.3.11 // indirect
 	github.com/jackc/pgpassfile v1.0.0 // indirect
@@ -74,6 +76,7 @@ require (
 	golang.org/x/arch v0.8.0 // indirect
 	golang.org/x/crypto v0.23.0 // indirect
 	golang.org/x/net v0.25.0 // indirect
+	golang.org/x/sync v0.6.0 // indirect
 	golang.org/x/sys v0.20.0 // indirect
 	golang.org/x/text v0.15.0 // indirect
 	golang.org/x/tools v0.6.0 // indirect

+ 6 - 0
go.sum

@@ -31,6 +31,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumC
 github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
 github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
 github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
+github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
+github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
 github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
 github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
 github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
@@ -73,6 +75,8 @@ github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
 github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
 github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
+github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
 github.com/hinshun/vt10x v0.0.0-20180616224451-1954e6464174 h1:WlZsjVhE8Af9IcZDGgJGQpNflI3+MJSBhsgT5PCtzBQ=
 github.com/hinshun/vt10x v0.0.0-20180616224451-1954e6464174/go.mod h1:DqJ97dSdRW1W22yXSB90986pcOyQ7r45iio1KN2ez1A=
 github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
@@ -230,6 +234,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
+golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
 golang.org/x/sys v0.0.0-20180606202747-9527bec2660b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

+ 69 - 12
test/message_queue_test.go

@@ -5,6 +5,7 @@ import (
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common"
+	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/mqtt"
 	"git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/redis"
 	"github.com/pkg/errors"
 	"sync"
@@ -13,28 +14,44 @@ import (
 
 func TestRedisMessageQueue(t *testing.T) {
 	redisMessageQueue := redis.New("localhost:30379", "", "mtyzxhc", 1,
-		common.WithMaxLen(1000),
-		common.WithConsumerNum(1))
+		redis.WithMaxLen(1000),
+		redis.WithConsumerNum(1))
 	defer redis.Destroy(redisMessageQueue)
 
 	testRedisMessageQueue(t, redisMessageQueue)
 }
 
+func TestMqttMessageQueue(t *testing.T) {
+	mqttMessageQueue, err := mqtt.New("localhost:1883", "admin", "mtyzxhc")
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+	defer mqtt.Destroy(mqttMessageQueue)
+
+	testMqttMessageQueue(t, mqttMessageQueue)
+}
+
 func TestMessageQueueInfrastructure(t *testing.T) {
 	i := infrastructure.NewInfrastructure(infrastructure.Config{
 		MessageQueueConfig: infrastructure.MessageQueueConfig{
 			Redis: &infrastructure.RedisConfig{
-				Address:  "localhost:30379",
+				Address:     "localhost:30379",
+				Password:    "mtyzxhc",
+				DB:          1,
+				MaxLen:      1000,
+				ConsumerNum: 1,
+			},
+			Mqtt: &infrastructure.MqttConfig{
+				Address:  "localhost:1883",
+				UserName: "admin",
 				Password: "mtyzxhc",
-				DB:       1,
 			},
-			MaxLen:      1000,
-			ConsumerNum: 1,
 		},
 	})
 	defer infrastructure.DestroyInfrastructure(i)
 
 	testMessageQueue(t, i.RedisMessageQueue())
+	testMessageQueue(t, i.MqttMessageQueue())
 }
 
 func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue) {
@@ -47,7 +64,7 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
 
-			fmt.Println("test1 consumed")
+			fmt.Println("redis test1 consumed")
 
 			wg.Done()
 		})
@@ -61,7 +78,7 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
 			}
 
-			fmt.Println("test2 consumed")
+			fmt.Println("redis test2 consumed")
 
 			wg.Done()
 		})
@@ -77,11 +94,51 @@ func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue)
 	wg.Wait()
 }
 
-func testMessageQueue(t *testing.T, redisMessageQueue common.MessageQueue) {
+func testMqttMessageQueue(t *testing.T, mqttMessageQueue *mqtt.MessageQueue) {
+	wg := sync.WaitGroup{}
+	wg.Add(2)
+
+	err := mqttMessageQueue.Subscribe("test1", "test-mqtt",
+		func(queue common.MessageQueue, topic string, data string) {
+			if string(data) != "test-message" {
+				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
+			}
+
+			fmt.Println("mqtt test1 consumed")
+
+			wg.Done()
+		})
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+
+	err = mqttMessageQueue.Subscribe("test2", "test-mqtt",
+		func(queue common.MessageQueue, topic string, data string) {
+			if string(data) != "test-message" {
+				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
+			}
+
+			fmt.Println("mqtt test2 consumed")
+
+			wg.Done()
+		})
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+
+	err = mqttMessageQueue.Publish("test-mqtt", "test-message")
+	if err != nil {
+		t.Fatalf("%+v\n", err)
+	}
+
+	wg.Wait()
+}
+
+func testMessageQueue(t *testing.T, messageQueue common.MessageQueue) {
 	wg := sync.WaitGroup{}
 	wg.Add(2)
 
-	err := message_queue.Subscribe(redisMessageQueue, "test1", "test-message-queue",
+	err := message_queue.Subscribe(messageQueue, "test1", "test-message-queue",
 		func(queue common.MessageQueue, topic string, data string) {
 			if string(data) != "test-message" {
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
@@ -95,7 +152,7 @@ func testMessageQueue(t *testing.T, redisMessageQueue common.MessageQueue) {
 		t.Fatalf("%+v\n", err)
 	}
 
-	err = message_queue.Subscribe(redisMessageQueue, "test2", "test-message-queue",
+	err = message_queue.Subscribe(messageQueue, "test2", "test-message-queue",
 		func(queue common.MessageQueue, topic string, data string) {
 			if string(data) != "test-message" {
 				t.Fatalf("%+v\n", errors.New("消息数据不一致"))
@@ -109,7 +166,7 @@ func testMessageQueue(t *testing.T, redisMessageQueue common.MessageQueue) {
 		t.Fatalf("%+v\n", err)
 	}
 
-	err = message_queue.Publish(redisMessageQueue, "test-message-queue", "test-message")
+	err = message_queue.Publish(messageQueue, "test-message-queue", "test-message")
 	if err != nil {
 		t.Fatalf("%+v\n", err)
 	}