pubsub_test.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package test
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "git.sxidc.com/service-supports/dapr_api/pubsub"
  7. "git.sxidc.com/service-supports/dapr_api/utils"
  8. "io"
  9. "net/http"
  10. "sync"
  11. "testing"
  12. "time"
  13. )
  14. const (
  15. pubSubServerAddress = "localhost:10081"
  16. sendMessageMaxCount = 3
  17. pubsubName = "dapr_pubsub"
  18. topic = "test_pubsub"
  19. )
  20. func TestPubSub(t *testing.T) {
  21. api := pubsub.NewAPI(daprHttpPort, 10*time.Second)
  22. defer pubsub.DestroyAPI(api)
  23. sendMessageCount := 0
  24. wgServer := &sync.WaitGroup{}
  25. wgServer.Add(1)
  26. srv := initPubSubHttpServer(wgServer, &sendMessageCount)
  27. wgServer.Wait()
  28. go func() {
  29. for sendMessageCount < sendMessageMaxCount {
  30. data, err := json.Marshal(map[string]interface{}{"data": utils.SimpleUUID()})
  31. if err != nil {
  32. fmt.Println(err)
  33. continue
  34. }
  35. err = api.Publish(pubsubName, topic, string(data), nil)
  36. if err != nil {
  37. fmt.Println(err)
  38. continue
  39. }
  40. time.Sleep(1 * time.Second)
  41. }
  42. }()
  43. for sendMessageCount < sendMessageMaxCount {
  44. time.Sleep(1 * time.Second)
  45. }
  46. _ = srv.Shutdown(context.Background())
  47. }
  48. func TestPubSubJson(t *testing.T) {
  49. api := pubsub.NewAPI(daprHttpPort, 10*time.Second)
  50. defer pubsub.DestroyAPI(api)
  51. sendMessageCount := 0
  52. wgServer := &sync.WaitGroup{}
  53. wgServer.Add(1)
  54. srv := initPubSubHttpServer(wgServer, &sendMessageCount)
  55. wgServer.Wait()
  56. go func() {
  57. for sendMessageCount < sendMessageMaxCount {
  58. data, err := json.Marshal(map[string]interface{}{"data": utils.SimpleUUID()})
  59. if err != nil {
  60. fmt.Println(err)
  61. continue
  62. }
  63. err = api.PublishJSON(pubsubName, topic, string(data), nil)
  64. if err != nil {
  65. fmt.Println(err)
  66. continue
  67. }
  68. time.Sleep(1 * time.Second)
  69. }
  70. }()
  71. for sendMessageCount < sendMessageMaxCount {
  72. time.Sleep(1 * time.Second)
  73. }
  74. _ = srv.Shutdown(context.Background())
  75. }
  76. func initPubSubHttpServer(wg *sync.WaitGroup, sendMessageCount *int) *http.Server {
  77. mux := http.NewServeMux()
  78. mux.HandleFunc("/test_pubsub", func(w http.ResponseWriter, r *http.Request) {
  79. body, err := io.ReadAll(r.Body)
  80. if err != nil {
  81. fmt.Println(err)
  82. w.WriteHeader(http.StatusOK)
  83. }
  84. fmt.Println("Receive", string(body))
  85. *sendMessageCount++
  86. w.WriteHeader(http.StatusOK)
  87. })
  88. srv := &http.Server{
  89. Addr: pubSubServerAddress,
  90. Handler: mux,
  91. }
  92. go func() {
  93. wg.Done()
  94. err := srv.ListenAndServe()
  95. if err != nil && err != http.ErrServerClosed {
  96. panic(err)
  97. }
  98. }()
  99. return srv
  100. }