pubsub_test.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package test
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "git.sxidc.com/service-supports/dapr_api/pubsub"
  7. "git.sxidc.com/service-supports/dapr_api/utils"
  8. "io"
  9. "net/http"
  10. "testing"
  11. "time"
  12. )
  13. const (
  14. serverAddress = "localhost:10081"
  15. sendMessageMaxCount = 3
  16. pubsubName = "dapr_pubsub"
  17. topic = "test_pubsub"
  18. )
  19. func TestPubSub(t *testing.T) {
  20. api := pubsub.NewAPI(daprHttpPort, 10*time.Second)
  21. defer pubsub.DestroyAPI(api)
  22. sendMessageCount := 0
  23. srv := initHttpServer(&sendMessageCount)
  24. go func() {
  25. for sendMessageCount < sendMessageMaxCount {
  26. data, err := json.Marshal(map[string]interface{}{"data": utils.SimpleUUID()})
  27. if err != nil {
  28. fmt.Println(err)
  29. continue
  30. }
  31. err = api.Publish(pubsubName, topic, string(data), nil)
  32. if err != nil {
  33. fmt.Println(err)
  34. continue
  35. }
  36. time.Sleep(1 * time.Second)
  37. }
  38. }()
  39. for sendMessageCount < sendMessageMaxCount {
  40. time.Sleep(1 * time.Second)
  41. }
  42. _ = srv.Shutdown(context.Background())
  43. }
  44. func TestPubSubJson(t *testing.T) {
  45. api := pubsub.NewAPI(daprHttpPort, 10*time.Second)
  46. defer pubsub.DestroyAPI(api)
  47. sendMessageCount := 0
  48. srv := initHttpServer(&sendMessageCount)
  49. go func() {
  50. for sendMessageCount < sendMessageMaxCount {
  51. data, err := json.Marshal(map[string]interface{}{"data": utils.SimpleUUID()})
  52. if err != nil {
  53. fmt.Println(err)
  54. continue
  55. }
  56. err = api.PublishJSON(pubsubName, topic, string(data), nil)
  57. if err != nil {
  58. fmt.Println(err)
  59. continue
  60. }
  61. time.Sleep(1 * time.Second)
  62. }
  63. }()
  64. for sendMessageCount < sendMessageMaxCount {
  65. time.Sleep(1 * time.Second)
  66. }
  67. _ = srv.Shutdown(context.Background())
  68. }
  69. func initHttpServer(sendMessageCount *int) *http.Server {
  70. mux := http.NewServeMux()
  71. mux.HandleFunc("/test_pubsub", func(w http.ResponseWriter, r *http.Request) {
  72. body, err := io.ReadAll(r.Body)
  73. if err != nil {
  74. fmt.Println(err)
  75. w.WriteHeader(http.StatusOK)
  76. }
  77. fmt.Println("Receive", string(body))
  78. *sendMessageCount++
  79. w.WriteHeader(http.StatusOK)
  80. })
  81. srv := &http.Server{
  82. Addr: serverAddress,
  83. Handler: mux,
  84. }
  85. go func() {
  86. err := srv.ListenAndServe()
  87. if err != nil && err != http.ErrServerClosed {
  88. panic(err)
  89. }
  90. }()
  91. return srv
  92. }