yjp пре 3 година
родитељ
комит
16dd07aaf4
6 измењених фајлова са 204 додато и 3 уклоњено
  1. 13 0
      components/pubsub.yaml
  2. 11 0
      components/subscribe.yaml
  3. 1 1
      dapr_run.sh
  4. 60 0
      pubsub/pubsub.go
  5. 2 2
      state/state.go
  6. 117 0
      test/pubsub_test.go

+ 13 - 0
components/pubsub.yaml

@@ -0,0 +1,13 @@
+apiVersion: dapr.io/v1alpha1
+kind: Component
+metadata:
+  name: dapr_pubsub
+  namespace: default
+spec:
+  type: pubsub.redis
+  version: v1
+  metadata:
+    - name: redisHost
+      value: localhost:30379
+    - name: redisPassword
+      value: mtyzxhc

+ 11 - 0
components/subscribe.yaml

@@ -0,0 +1,11 @@
+apiVersion: dapr.io/v1alpha1
+kind: Subscription
+metadata:
+  name: dapr_pubsub_subscribe
+  namespace: default
+spec:
+  topic: test_pubsub
+  route: /test_pubsub
+  pubsubname: dapr_pubsub
+scopes:
+  - dapr_api

+ 1 - 1
dapr_run.sh

@@ -1,3 +1,3 @@
 #!/bin/bash
-dapr run --app-id dapr_api --dapr-http-port 10080 --components-path ./components
+dapr run --app-id dapr_api --app-port 10081 --dapr-http-port 10080 --components-path ./components
 

+ 60 - 0
pubsub/pubsub.go

@@ -0,0 +1,60 @@
+package pubsub
+
+import (
+	"fmt"
+	"github.com/go-resty/resty/v2"
+	"time"
+)
+
+const (
+	publishUrlFormat = "http://localhost:%d/v1.0/publish/%s/%s"
+)
+
+type API struct {
+	port   int
+	client *resty.Client
+}
+
+func NewAPI(port int, timeout time.Duration) *API {
+	return &API{
+		client: resty.New().SetTimeout(timeout),
+		port:   port,
+	}
+}
+
+func DestroyAPI(api *API) {
+	if api == nil {
+		return
+	}
+
+	api.port = 0
+	api.client = nil
+}
+
+func (api *API) Publish(pubsubName string, topic string, data string, queryParams map[string]string) error {
+	return api.publish(pubsubName, topic, "test/plain", data, queryParams)
+}
+
+func (api *API) PublishJSON(pubsubName string, topic string, data string, queryParams map[string]string) error {
+	return api.publish(pubsubName, topic, "application/json", data, queryParams)
+}
+
+func (api *API) publish(pubsubName string, topic string, contentType string, data string,
+	queryParams map[string]string) error {
+	publishUrl := fmt.Sprintf(publishUrlFormat, api.port, pubsubName, topic)
+
+	resp, err := api.client.R().
+		SetHeader("Content-Type", contentType).
+		SetQueryParams(queryParams).
+		SetBody(data).
+		Post(publishUrl)
+	if err != nil {
+		return err
+	}
+
+	if resp.IsError() {
+		return fmt.Errorf("Status %d: %s\n", resp.StatusCode(), resp.Body())
+	}
+
+	return nil
+}

+ 2 - 2
state/state.go

@@ -18,14 +18,14 @@ const (
 )
 
 type API struct {
-	port   int
 	client *resty.Client
+	port   int
 }
 
 func NewAPI(port int, timeout time.Duration) *API {
 	return &API{
-		port:   port,
 		client: resty.New().SetTimeout(timeout),
+		port:   port,
 	}
 }
 

+ 117 - 0
test/pubsub_test.go

@@ -0,0 +1,117 @@
+package test
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"git.sxidc.com/service-supports/dapr_api/pubsub"
+	"git.sxidc.com/service-supports/dapr_api/utils"
+	"io"
+	"net/http"
+	"testing"
+	"time"
+)
+
+const (
+	serverAddress       = "localhost:10081"
+	sendMessageMaxCount = 3
+	pubsubName          = "dapr_pubsub"
+	topic               = "test_pubsub"
+)
+
+func TestPubSub(t *testing.T) {
+	api := pubsub.NewAPI(daprHttpPort, 10*time.Second)
+	defer pubsub.DestroyAPI(api)
+
+	sendMessageCount := 0
+
+	srv := initHttpServer(&sendMessageCount)
+
+	go func() {
+		for sendMessageCount < sendMessageMaxCount {
+			data, err := json.Marshal(map[string]interface{}{"data": utils.SimpleUUID()})
+			if err != nil {
+				fmt.Println(err)
+				continue
+			}
+
+			err = api.Publish(pubsubName, topic, string(data), nil)
+			if err != nil {
+				fmt.Println(err)
+				continue
+			}
+
+			time.Sleep(1 * time.Second)
+		}
+	}()
+
+	for sendMessageCount < sendMessageMaxCount {
+		time.Sleep(1 * time.Second)
+	}
+
+	_ = srv.Shutdown(context.Background())
+}
+
+func TestPubSubJson(t *testing.T) {
+	api := pubsub.NewAPI(daprHttpPort, 10*time.Second)
+	defer pubsub.DestroyAPI(api)
+
+	sendMessageCount := 0
+
+	srv := initHttpServer(&sendMessageCount)
+
+	go func() {
+		for sendMessageCount < sendMessageMaxCount {
+			data, err := json.Marshal(map[string]interface{}{"data": utils.SimpleUUID()})
+			if err != nil {
+				fmt.Println(err)
+				continue
+			}
+
+			err = api.PublishJSON(pubsubName, topic, string(data), nil)
+			if err != nil {
+				fmt.Println(err)
+				continue
+			}
+
+			time.Sleep(1 * time.Second)
+		}
+	}()
+
+	for sendMessageCount < sendMessageMaxCount {
+		time.Sleep(1 * time.Second)
+	}
+
+	_ = srv.Shutdown(context.Background())
+}
+
+func initHttpServer(sendMessageCount *int) *http.Server {
+	mux := http.NewServeMux()
+	mux.HandleFunc("/test_pubsub", func(w http.ResponseWriter, r *http.Request) {
+		body, err := io.ReadAll(r.Body)
+		if err != nil {
+			fmt.Println(err)
+			w.WriteHeader(http.StatusOK)
+		}
+
+		fmt.Println("Receive", string(body))
+
+		*sendMessageCount++
+
+		w.WriteHeader(http.StatusOK)
+	})
+
+	srv := &http.Server{
+		Addr:    serverAddress,
+		Handler: mux,
+	}
+
+	go func() {
+		err := srv.ListenAndServe()
+		if err != nil && err != http.ErrServerClosed {
+			panic(err)
+		}
+	}()
+
+	return srv
+}