client.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package dapr_client
  2. import (
  3. "context"
  4. dapr "github.com/dapr/go-sdk/client"
  5. "os"
  6. "time"
  7. )
  8. const (
  9. daprPortDefault = "50001"
  10. daprPortEnvVarName = "DAPR_GRPC_PORT"
  11. )
  12. type Client struct {
  13. client dapr.Client
  14. timeoutSec int64
  15. }
  16. func newClient(timeoutSec int64) (*Client, error) {
  17. port := os.Getenv(daprPortEnvVarName)
  18. if port == "" {
  19. port = daprPortDefault
  20. }
  21. client, err := dapr.NewClientWithPort(port)
  22. if err != nil {
  23. return nil, err
  24. }
  25. c := new(Client)
  26. c.client = client
  27. c.timeoutSec = timeoutSec
  28. return c, nil
  29. }
  30. func destroyClient(client *Client) {
  31. if client == nil {
  32. return
  33. }
  34. client.client.Close()
  35. client.client = nil
  36. client = nil
  37. }
  38. func (client *Client) InvokeGetMethod(appID string, methodName string) ([]byte, error) {
  39. return client.invokeMethod(appID, methodName, "get")
  40. }
  41. func (client *Client) InvokePostMethodJson(appID string, methodName string, jsonData []byte) ([]byte, error) {
  42. return client.invokeMethodJson(appID, methodName, "post", jsonData)
  43. }
  44. func (client *Client) InvokePostMethodMultipart(appID string, methodName string, multipartContentType string, data []byte) ([]byte, error) {
  45. return client.invokeMethodWithContentType(appID, methodName, "post", multipartContentType, data)
  46. }
  47. func (client *Client) InvokePutMethodJson(appID string, methodName string, jsonData []byte) ([]byte, error) {
  48. return client.invokeMethodJson(appID, methodName, "put", jsonData)
  49. }
  50. func (client *Client) InvokeDeleteMethod(appID string, methodName string) ([]byte, error) {
  51. return client.invokeMethod(appID, methodName, "delete")
  52. }
  53. func (client *Client) Publish(pubsubName string, topic string, content []byte) error {
  54. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(client.timeoutSec)*time.Second)
  55. defer cancel()
  56. return client.client.PublishEvent(ctx, pubsubName, topic, content)
  57. }
  58. func (client *Client) invokeMethod(appID string, methodName string, verb string) ([]byte, error) {
  59. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(client.timeoutSec)*time.Second)
  60. defer cancel()
  61. return client.client.InvokeMethod(ctx, appID, methodName, verb)
  62. }
  63. func (client *Client) invokeMethodJson(appID string, methodName string, verb string, jsonData []byte) ([]byte, error) {
  64. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(client.timeoutSec)*time.Second)
  65. defer cancel()
  66. return client.client.InvokeMethodWithContent(ctx, appID, methodName, verb,
  67. &dapr.DataContent{
  68. Data: jsonData,
  69. ContentType: "application/json",
  70. })
  71. }
  72. func (client *Client) invokeMethodWithContentType(appID string, methodName string, verb string, contentType string, data []byte) ([]byte, error) {
  73. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(client.timeoutSec)*time.Second)
  74. defer cancel()
  75. return client.client.InvokeMethodWithContent(ctx, appID, methodName, verb,
  76. &dapr.DataContent{
  77. Data: data,
  78. ContentType: contentType,
  79. })
  80. }
  81. func (client *Client) SaveState(stateStoreName string, key string, value []byte, meta map[string]string, so ...dapr.StateOption) error {
  82. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(client.timeoutSec)*time.Second)
  83. defer cancel()
  84. return client.client.SaveState(ctx, stateStoreName, key, value, meta, so...)
  85. }
  86. func (client *Client) DeleteState(stateStoreName string, key string, meta map[string]string) error {
  87. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(client.timeoutSec)*time.Second)
  88. defer cancel()
  89. return client.client.DeleteState(ctx, stateStoreName, key, meta)
  90. }
  91. func (client *Client) GetState(stateStoreName string, key string, meta map[string]string) (*dapr.StateItem, error) {
  92. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(client.timeoutSec)*time.Second)
  93. defer cancel()
  94. return client.client.GetState(ctx, stateStoreName, key, meta)
  95. }
  96. func (client *Client) ExecuteStateTransaction(stateStoreName string, meta map[string]string, ops []*dapr.StateOperation) error {
  97. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(client.timeoutSec)*time.Second)
  98. defer cancel()
  99. return client.client.ExecuteStateTransaction(ctx, stateStoreName, meta, ops)
  100. }