package test import ( "fmt" "git.sxidc.com/go-framework/baize/framework/core/infrastructure" "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue" "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/common" "git.sxidc.com/go-framework/baize/framework/core/infrastructure/message_queue/redis" "github.com/pkg/errors" "sync" "testing" ) func TestRedisMessageQueue(t *testing.T) { redisMessageQueue := redis.New("localhost:30379", "", "mtyzxhc", 1, redis.WithMaxLen(1000), redis.WithConsumerNum(1)) defer redis.Destroy(redisMessageQueue) testRedisMessageQueue(t, redisMessageQueue) } func TestMessageQueueInfrastructure(t *testing.T) { i := infrastructure.NewInfrastructure(infrastructure.Config{ MessageQueueConfig: infrastructure.MessageQueueConfig{ Redis: &infrastructure.RedisConfig{ Address: "localhost:30379", Password: "mtyzxhc", DB: 1, MaxLen: 1000, ConsumerNum: 1, }, }, }) defer infrastructure.DestroyInfrastructure(i) testMessageQueue(t, i.RedisMessageQueue()) } func testRedisMessageQueue(t *testing.T, redisMessageQueue *redis.MessageQueue) { wg := sync.WaitGroup{} wg.Add(2) err := redisMessageQueue.Subscribe("test1", "test-redis", func(queue common.MessageQueue, topic string, data string) { if string(data) != "test-message" { t.Fatalf("%+v\n", errors.New("消息数据不一致")) } fmt.Println("test1 consumed") wg.Done() }) if err != nil { t.Fatalf("%+v\n", err) } err = redisMessageQueue.Subscribe("test2", "test-redis", func(queue common.MessageQueue, topic string, data string) { if string(data) != "test-message" { t.Fatalf("%+v\n", errors.New("消息数据不一致")) } fmt.Println("test2 consumed") wg.Done() }) if err != nil { t.Fatalf("%+v\n", err) } err = redisMessageQueue.Publish("test-redis", "test-message") if err != nil { t.Fatalf("%+v\n", err) } wg.Wait() } func testMessageQueue(t *testing.T, redisMessageQueue common.MessageQueue) { wg := sync.WaitGroup{} wg.Add(2) err := message_queue.Subscribe(redisMessageQueue, "test1", "test-message-queue", func(queue common.MessageQueue, topic string, data string) { if string(data) != "test-message" { t.Fatalf("%+v\n", errors.New("消息数据不一致")) } fmt.Println("test1 consumed") wg.Done() }) if err != nil { t.Fatalf("%+v\n", err) } err = message_queue.Subscribe(redisMessageQueue, "test2", "test-message-queue", func(queue common.MessageQueue, topic string, data string) { if string(data) != "test-message" { t.Fatalf("%+v\n", errors.New("消息数据不一致")) } fmt.Println("test2 consumed") wg.Done() }) if err != nil { t.Fatalf("%+v\n", err) } err = message_queue.Publish(redisMessageQueue, "test-message-queue", "test-message") if err != nil { t.Fatalf("%+v\n", err) } wg.Wait() }