package test import ( "context" "encoding/json" "fmt" "git.sxidc.com/service-supports/dapr_api/pubsub" "git.sxidc.com/service-supports/dapr_api/utils" "io" "net/http" "sync" "testing" "time" ) const ( pubSubServerAddress = "localhost:10081" sendMessageMaxCount = 3 pubsubName = "dapr_pubsub" topic = "test_pubsub" ) func TestPubSub(t *testing.T) { api := pubsub.NewAPI(daprHttpPort, 10*time.Second) defer pubsub.DestroyAPI(api) sendMessageCount := 0 wgServer := &sync.WaitGroup{} wgServer.Add(1) srv := initPubSubHttpServer(wgServer, &sendMessageCount) wgServer.Wait() go func() { for sendMessageCount < sendMessageMaxCount { data, err := json.Marshal(map[string]interface{}{"data": utils.SimpleUUID()}) if err != nil { fmt.Println(err) continue } err = api.Publish(pubsubName, topic, string(data), nil) if err != nil { fmt.Println(err) continue } time.Sleep(1 * time.Second) } }() for sendMessageCount < sendMessageMaxCount { time.Sleep(1 * time.Second) } _ = srv.Shutdown(context.Background()) } func TestPubSubJson(t *testing.T) { api := pubsub.NewAPI(daprHttpPort, 10*time.Second) defer pubsub.DestroyAPI(api) sendMessageCount := 0 wgServer := &sync.WaitGroup{} wgServer.Add(1) srv := initPubSubHttpServer(wgServer, &sendMessageCount) wgServer.Wait() go func() { for sendMessageCount < sendMessageMaxCount { data, err := json.Marshal(map[string]interface{}{"data": utils.SimpleUUID()}) if err != nil { fmt.Println(err) continue } err = api.PublishJSON(pubsubName, topic, string(data), nil) if err != nil { fmt.Println(err) continue } time.Sleep(1 * time.Second) } }() for sendMessageCount < sendMessageMaxCount { time.Sleep(1 * time.Second) } _ = srv.Shutdown(context.Background()) } func initPubSubHttpServer(wg *sync.WaitGroup, sendMessageCount *int) *http.Server { mux := http.NewServeMux() mux.HandleFunc("/test_pubsub", func(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body) if err != nil { fmt.Println(err) w.WriteHeader(http.StatusOK) } fmt.Println("Receive", string(body)) *sendMessageCount++ w.WriteHeader(http.StatusOK) }) srv := &http.Server{ Addr: pubSubServerAddress, Handler: mux, } go func() { wg.Done() err := srv.ListenAndServe() if err != nil && err != http.ErrServerClosed { panic(err) } }() return srv }