package test import ( "fmt" "git.sxidc.com/go-framework/baize/framework/core/data_protocol" "git.sxidc.com/go-framework/baize/framework/core/infrastructure" "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue" "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common" "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/mqtt" "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/redis" "github.com/pkg/errors" "sync" "testing" ) func TestRedisMessageQueue(t *testing.T) { redisMessageQueue := redis.New("localhost:30379", "", "mtyzxhc", 1, redis.WithMaxLen(1000), redis.WithConsumerNum(1)) defer redis.Destroy(redisMessageQueue) testRedisMessageQueue(t, redisMessageQueue) } func TestMqttMessageQueue(t *testing.T) { mqttMessageQueue, err := mqtt.New("localhost:1883", "admin", "mtyzxhc") if err != nil { t.Fatalf("%+v\n", err) } defer mqtt.Destroy(mqttMessageQueue) testMqttMessageQueue(t, mqttMessageQueue) } func TestMessageQueueInfrastructure(t *testing.T) { i := infrastructure.NewInfrastructure(infrastructure.Config{ MessageQueueConfig: infrastructure.MessageQueueConfig{ Redis: &infrastructure.RedisConfig{ Address: "localhost:30379", Password: "mtyzxhc", DB: 1, MaxLen: 1000, ConsumerNum: 1, }, Mqtt: &infrastructure.MqttConfig{ Address: "localhost:1883", UserName: "admin", Password: "mtyzxhc", }, }, }) defer infrastructure.DestroyInfrastructure(i) testMessageQueue(t, i.RedisMessageQueue()) testMessageQueue(t, i.MqttMessageQueue()) } func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue) { wg := sync.WaitGroup{} wg.Add(2) err := redisMessageQueue.Subscribe("test1", "test-redis", func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) { if event.ID != "1" { t.Fatalf("%+v\n", errors.New("消息ID不一致")) } if event.Type != "test" { t.Fatalf("%+v\n", errors.New("消息类型不一致")) } if string(event.Data) != "test-message" { t.Fatalf("%+v\n", errors.New("消息数据不一致")) } fmt.Println("redis test1 consumed") wg.Done() }) if err != nil { t.Fatalf("%+v\n", err) } err = redisMessageQueue.Subscribe("test2", "test-redis", func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) { if event.ID != "1" { t.Fatalf("%+v\n", errors.New("消息ID不一致")) } if event.Type != "test" { t.Fatalf("%+v\n", errors.New("消息类型不一致")) } if string(event.Data) != "test-message" { t.Fatalf("%+v\n", errors.New("消息数据不一致")) } fmt.Println("redis test2 consumed") wg.Done() }) if err != nil { t.Fatalf("%+v\n", err) } err = redisMessageQueue.Publish("test-redis", data_protocol.NewCloudEvent("1", "test", "baize-test.com", "application/text", []byte("test-message"))) if err != nil { t.Fatalf("%+v\n", err) } wg.Wait() } func testMqttMessageQueue(t *testing.T, mqttMessageQueue *mqtt.MessageQueue) { wg := sync.WaitGroup{} wg.Add(2) err := mqttMessageQueue.Subscribe("test1", "test-mqtt", func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) { if event.ID != "1" { t.Fatalf("%+v\n", errors.New("消息ID不一致")) } if event.Type != "test" { t.Fatalf("%+v\n", errors.New("消息类型不一致")) } if string(event.Data) != "test-message" { t.Fatalf("%+v\n", errors.New("消息数据不一致")) } fmt.Println("mqtt test1 consumed") wg.Done() }) if err != nil { t.Fatalf("%+v\n", err) } err = mqttMessageQueue.Subscribe("test2", "test-mqtt", func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) { if event.ID != "1" { t.Fatalf("%+v\n", errors.New("消息ID不一致")) } if event.Type != "test" { t.Fatalf("%+v\n", errors.New("消息类型不一致")) } if string(event.Data) != "test-message" { t.Fatalf("%+v\n", errors.New("消息数据不一致")) } fmt.Println("mqtt test2 consumed") wg.Done() }) if err != nil { t.Fatalf("%+v\n", err) } err = mqttMessageQueue.Publish("test-mqtt", data_protocol.NewCloudEvent("1", "test", "baize-test.com", "application/text", []byte("test-message"))) if err != nil { t.Fatalf("%+v\n", err) } wg.Wait() } func testMessageQueue(t *testing.T, messageQueue common.MessageQueue) { wg := sync.WaitGroup{} wg.Add(2) err := message_queue.Subscribe(messageQueue, "test1", "test-message-queue", func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) { if event.ID != "1" { t.Fatalf("%+v\n", errors.New("消息ID不一致")) } if event.Type != "test" { t.Fatalf("%+v\n", errors.New("消息类型不一致")) } if string(event.Data) != "test-message" { t.Fatalf("%+v\n", errors.New("消息数据不一致")) } fmt.Println("test1 consumed") wg.Done() }) if err != nil { t.Fatalf("%+v\n", err) } err = message_queue.Subscribe(messageQueue, "test2", "test-message-queue", func(queue common.MessageQueue, topic string, event *data_protocol.CloudEvent) { if event.ID != "1" { t.Fatalf("%+v\n", errors.New("消息ID不一致")) } if event.Type != "test" { t.Fatalf("%+v\n", errors.New("消息类型不一致")) } if string(event.Data) != "test-message" { t.Fatalf("%+v\n", errors.New("消息数据不一致")) } fmt.Println("test2 consumed") wg.Done() }) if err != nil { t.Fatalf("%+v\n", err) } err = message_queue.Publish(messageQueue, "test-message-queue", data_protocol.NewCloudEvent("1", "test", "baize-test.com", "application/text", []byte("test-message"))) if err != nil { t.Fatalf("%+v\n", err) } wg.Wait() }