123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- package test
- import (
- "context"
- "encoding/json"
- "fmt"
- "git.sxidc.com/service-supports/dapr_api/pubsub"
- "git.sxidc.com/service-supports/dapr_api/utils"
- "io"
- "net/http"
- "sync"
- "testing"
- "time"
- )
- const (
- pubSubServerAddress = "localhost:10081"
- sendMessageMaxCount = 3
- pubsubName = "dapr_pubsub"
- topic = "test_pubsub"
- )
- func TestPubSub(t *testing.T) {
- api := pubsub.NewAPI(daprHttpPort, 10*time.Second)
- defer pubsub.DestroyAPI(api)
- sendMessageCount := 0
- wgServer := &sync.WaitGroup{}
- wgServer.Add(1)
- srv := initPubSubHttpServer(wgServer, &sendMessageCount)
- wgServer.Wait()
- go func() {
- for sendMessageCount < sendMessageMaxCount {
- data, err := json.Marshal(map[string]interface{}{"data": utils.SimpleUUID()})
- if err != nil {
- fmt.Println(err)
- continue
- }
- err = api.Publish(pubsubName, topic, string(data), nil)
- if err != nil {
- fmt.Println(err)
- continue
- }
- time.Sleep(1 * time.Second)
- }
- }()
- for sendMessageCount < sendMessageMaxCount {
- time.Sleep(1 * time.Second)
- }
- _ = srv.Shutdown(context.Background())
- }
- func TestPubSubJson(t *testing.T) {
- api := pubsub.NewAPI(daprHttpPort, 10*time.Second)
- defer pubsub.DestroyAPI(api)
- sendMessageCount := 0
- wgServer := &sync.WaitGroup{}
- wgServer.Add(1)
- srv := initPubSubHttpServer(wgServer, &sendMessageCount)
- wgServer.Wait()
- go func() {
- for sendMessageCount < sendMessageMaxCount {
- data, err := json.Marshal(map[string]interface{}{"data": utils.SimpleUUID()})
- if err != nil {
- fmt.Println(err)
- continue
- }
- err = api.PublishJSON(pubsubName, topic, string(data), nil)
- if err != nil {
- fmt.Println(err)
- continue
- }
- time.Sleep(1 * time.Second)
- }
- }()
- for sendMessageCount < sendMessageMaxCount {
- time.Sleep(1 * time.Second)
- }
- _ = srv.Shutdown(context.Background())
- }
- func initPubSubHttpServer(wg *sync.WaitGroup, sendMessageCount *int) *http.Server {
- mux := http.NewServeMux()
- mux.HandleFunc("/test_pubsub", func(w http.ResponseWriter, r *http.Request) {
- body, err := io.ReadAll(r.Body)
- if err != nil {
- fmt.Println(err)
- w.WriteHeader(http.StatusOK)
- }
- fmt.Println("Receive", string(body))
- *sendMessageCount++
- w.WriteHeader(http.StatusOK)
- })
- srv := &http.Server{
- Addr: pubSubServerAddress,
- Handler: mux,
- }
- go func() {
- wg.Done()
- err := srv.ListenAndServe()
- if err != nil && err != http.ErrServerClosed {
- panic(err)
- }
- }()
- return srv
- }
|