pubsub.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. package pubsub
  2. import (
  3. "fmt"
  4. "git.sxidc.com/service-supports/dapr_api/utils"
  5. "github.com/go-resty/resty/v2"
  6. "time"
  7. )
  8. const (
  9. publishUrlFormat = "http://localhost:%d/v1.0/publish/%s/%s"
  10. )
  11. type API struct {
  12. client *resty.Client
  13. port int
  14. }
  15. func NewAPI(port int, timeout time.Duration) *API {
  16. return &API{
  17. client: resty.New().SetTimeout(timeout),
  18. port: port,
  19. }
  20. }
  21. func DestroyAPI(api *API) {
  22. if api == nil {
  23. return
  24. }
  25. api.port = 0
  26. api.client = nil
  27. }
  28. func (api *API) Publish(pubsubName string, topic string, data string, queryParams map[string]string) error {
  29. return api.publish(pubsubName, topic, "test/plain", data, queryParams)
  30. }
  31. func (api *API) PublishJSON(pubsubName string, topic string, data string, queryParams map[string]string) error {
  32. return api.publish(pubsubName, topic, "application/json", data, queryParams)
  33. }
  34. func (api *API) publish(pubsubName string, topic string, contentType string, data string,
  35. queryParams map[string]string) error {
  36. publishUrl := fmt.Sprintf(publishUrlFormat, api.port, pubsubName, topic)
  37. resp, err := api.client.R().
  38. SetHeader("Content-Type", contentType).
  39. SetQueryParams(queryParams).
  40. SetBody(data).
  41. Post(publishUrl)
  42. if err != nil {
  43. return err
  44. }
  45. if resp.IsError() {
  46. return utils.ResponseStatusError(publishUrl, resp)
  47. }
  48. return nil
  49. }