data_service.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package data_service
  2. import (
  3. "git.sxidc.com/go-framework/baize/infrastructure/database/data_service/client"
  4. "git.sxidc.com/go-framework/baize/infrastructure/database/data_service/grpc_client"
  5. "git.sxidc.com/go-framework/baize/infrastructure/database/data_service/grpc_client/v1/request"
  6. "git.sxidc.com/go-framework/baize/infrastructure/database/sql"
  7. "git.sxidc.com/go-tools/utils/strutils"
  8. "git.sxidc.com/service-supports/fserr"
  9. "io"
  10. "time"
  11. )
  12. type Config struct {
  13. Token string `json:"token" yaml:"token"`
  14. BaseUrl string `json:"base_url" yaml:"base_url"`
  15. GrpcAddress string `json:"grpc_address" yaml:"grpc_address"`
  16. Namespace string `json:"namespace" yaml:"namespace"`
  17. DataSource string `json:"data_source" yaml:"data_source"`
  18. TimeoutSec int64 `json:"timeout_sec" yaml:"timeout_sec"`
  19. }
  20. type DataService struct {
  21. config Config
  22. client *client.Client
  23. grpcClient *grpc_client.Client
  24. }
  25. func NewDataService(config *Config) (*DataService, error) {
  26. c := client.New(time.Duration(config.TimeoutSec) * time.Second)
  27. namespaceInfos, err := c.GetNamespaces(config.Token, config.BaseUrl, config.Namespace, 1, 1)
  28. if err != nil {
  29. return nil, err
  30. }
  31. if namespaceInfos == nil || len(namespaceInfos) == 0 {
  32. return nil, fserr.New("命名空间不存在")
  33. }
  34. dataSourceInfos, err := c.GetDataSources(
  35. config.Token, config.BaseUrl, config.Namespace, config.DataSource, "", 1, 1)
  36. if err != nil {
  37. return nil, err
  38. }
  39. if dataSourceInfos == nil || len(dataSourceInfos) == 0 {
  40. return nil, fserr.New("数据源不存在")
  41. }
  42. grpcClient, err := grpc_client.NewClient(config.GrpcAddress)
  43. if err != nil {
  44. return nil, err
  45. }
  46. return &DataService{
  47. config: *config,
  48. client: c,
  49. grpcClient: grpcClient,
  50. }, nil
  51. }
  52. func DestroyDataService(ds *DataService) error {
  53. if ds == nil {
  54. return nil
  55. }
  56. err := grpc_client.Destroy(ds.grpcClient)
  57. if err != nil {
  58. return err
  59. }
  60. ds = nil
  61. return nil
  62. }
  63. func (ds *DataService) ExecuteRawSql(sqlStr string, executeParams map[string]any) ([]sql.Result, error) {
  64. if strutils.IsStringEmpty(sqlStr) {
  65. return make([]sql.Result, 0), nil
  66. }
  67. config := ds.config
  68. tableRows, err := ds.client.ExecuteRawSql(config.Token, config.BaseUrl,
  69. config.Namespace, config.DataSource, sqlStr, executeParams)
  70. if err != nil {
  71. return nil, err
  72. }
  73. results := make([]sql.Result, len(tableRows))
  74. for i, row := range tableRows {
  75. results[i] = row
  76. }
  77. return results, nil
  78. }
  79. func (ds *DataService) ExecuteSql(name string, executeParams map[string]any) ([]sql.Result, error) {
  80. if strutils.IsStringEmpty(name) {
  81. return nil, fserr.New("没有传递SQL资源名称")
  82. }
  83. config := ds.config
  84. tableRows, err := ds.client.ExecuteSql(config.Token, config.BaseUrl,
  85. config.Namespace, config.DataSource, name, executeParams)
  86. if err != nil {
  87. return nil, err
  88. }
  89. results := make([]sql.Result, len(tableRows))
  90. for i, row := range tableRows {
  91. results[i] = row
  92. }
  93. return results, nil
  94. }
  95. func (ds *DataService) Transaction(txFunc TxFunc) error {
  96. stream, err := ds.grpcClient.Transaction()
  97. if err != nil {
  98. return err
  99. }
  100. defer func() {
  101. innerErr := stream.CloseSend()
  102. if innerErr != nil {
  103. panic(innerErr)
  104. }
  105. }()
  106. err = stream.Send(&request.TransactionOperation{
  107. Request: &request.TransactionOperation_TransactionBeginRequest{
  108. TransactionBeginRequest: &request.TransactionBeginRequest{
  109. Token: ds.config.Token,
  110. Namespace: ds.config.Namespace,
  111. DataSource: ds.config.DataSource,
  112. },
  113. }})
  114. if err != nil {
  115. return err
  116. }
  117. resp, err := stream.Recv()
  118. if err != nil {
  119. return err
  120. }
  121. if !resp.Success {
  122. return fserr.New(resp.Msg)
  123. }
  124. err = txFunc(&Transaction{
  125. stream: stream,
  126. })
  127. if err != nil {
  128. return err
  129. }
  130. err = stream.Send(&request.TransactionOperation{
  131. Request: &request.TransactionOperation_TransactionEndRequest{
  132. TransactionEndRequest: &request.TransactionEndRequest{},
  133. }})
  134. if err != nil {
  135. return err
  136. }
  137. _, err = stream.Recv()
  138. if err != nil && err != io.EOF {
  139. return err
  140. }
  141. return nil
  142. }