data_service.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package data_service
  2. import (
  3. "git.sxidc.com/go-framework/baize/framwork/infrastructure/database/data_service/client"
  4. "git.sxidc.com/go-framework/baize/framwork/infrastructure/database/data_service/grpc_client"
  5. "git.sxidc.com/go-framework/baize/framwork/infrastructure/database/data_service/grpc_client/v1/request"
  6. "git.sxidc.com/go-framework/baize/framwork/infrastructure/database/sql"
  7. "git.sxidc.com/go-tools/utils/strutils"
  8. "git.sxidc.com/service-supports/fserr"
  9. "io"
  10. "time"
  11. )
  12. type Config struct {
  13. Token string `json:"token" yaml:"token"`
  14. Address string `json:"address" yaml:"address"`
  15. HttpPort string `json:"http_port" yaml:"http_port"`
  16. GrpcPort string `json:"grpc_port" yaml:"grpc_port"`
  17. Namespace string `json:"namespace" yaml:"namespace"`
  18. DataSource string `json:"data_source" yaml:"data_source"`
  19. TimeoutSec int64 `json:"timeout_sec" yaml:"timeout_sec"`
  20. }
  21. type DataService struct {
  22. config Config
  23. client *client.Client
  24. grpcClient *grpc_client.Client
  25. }
  26. func NewDataService(config *Config) (*DataService, error) {
  27. baseUrl := "http://" + config.Address + ":" + config.HttpPort
  28. grpcAddress := config.Address + ":" + config.GrpcPort
  29. if config.TimeoutSec == 0 {
  30. config.TimeoutSec = 30
  31. }
  32. c := client.New(time.Duration(config.TimeoutSec) * time.Second)
  33. namespaceInfos, err := c.GetNamespaces(config.Token, baseUrl,
  34. config.Namespace, 1, 1)
  35. if err != nil {
  36. return nil, err
  37. }
  38. if namespaceInfos == nil || len(namespaceInfos) == 0 {
  39. return nil, fserr.New("命名空间不存在")
  40. }
  41. dataSourceInfos, err := c.GetDataSources(
  42. config.Token, baseUrl, config.Namespace, config.DataSource, "", 1, 1)
  43. if err != nil {
  44. return nil, err
  45. }
  46. if dataSourceInfos == nil || len(dataSourceInfos) == 0 {
  47. return nil, fserr.New("数据源不存在")
  48. }
  49. grpcClient, err := grpc_client.NewClient(grpcAddress)
  50. if err != nil {
  51. return nil, err
  52. }
  53. return &DataService{
  54. config: *config,
  55. client: c,
  56. grpcClient: grpcClient,
  57. }, nil
  58. }
  59. func DestroyDataService(ds *DataService) error {
  60. if ds == nil {
  61. return nil
  62. }
  63. err := grpc_client.Destroy(ds.grpcClient)
  64. if err != nil {
  65. return err
  66. }
  67. ds = nil
  68. return nil
  69. }
  70. func (ds *DataService) ExecuteRawSql(sqlStr string, executeParams map[string]any) ([]sql.Result, error) {
  71. if strutils.IsStringEmpty(sqlStr) {
  72. return make([]sql.Result, 0), nil
  73. }
  74. config := ds.config
  75. baseUrl := "http://" + config.Address + ":" + config.HttpPort
  76. tableRows, err := ds.client.ExecuteRawSql(config.Token, baseUrl,
  77. config.Namespace, config.DataSource, sqlStr, executeParams)
  78. if err != nil {
  79. return nil, err
  80. }
  81. results := make([]sql.Result, len(tableRows))
  82. for i, row := range tableRows {
  83. results[i] = row
  84. }
  85. return results, nil
  86. }
  87. func (ds *DataService) ExecuteSql(name string, executeParams map[string]any) ([]sql.Result, error) {
  88. if strutils.IsStringEmpty(name) {
  89. return nil, fserr.New("没有传递SQL资源名称")
  90. }
  91. config := ds.config
  92. baseUrl := "http://" + config.Address + ":" + config.HttpPort
  93. tableRows, err := ds.client.ExecuteSql(config.Token, baseUrl,
  94. config.Namespace, config.DataSource, name, executeParams)
  95. if err != nil {
  96. return nil, err
  97. }
  98. results := make([]sql.Result, len(tableRows))
  99. for i, row := range tableRows {
  100. results[i] = row
  101. }
  102. return results, nil
  103. }
  104. func (ds *DataService) Transaction(txFunc TxFunc) error {
  105. stream, err := ds.grpcClient.Transaction()
  106. if err != nil {
  107. return err
  108. }
  109. defer func() {
  110. innerErr := stream.CloseSend()
  111. if innerErr != nil {
  112. panic(innerErr)
  113. }
  114. }()
  115. err = stream.Send(&request.TransactionOperation{
  116. Request: &request.TransactionOperation_TransactionBeginRequest{
  117. TransactionBeginRequest: &request.TransactionBeginRequest{
  118. Token: ds.config.Token,
  119. Namespace: ds.config.Namespace,
  120. DataSource: ds.config.DataSource,
  121. },
  122. }})
  123. if err != nil {
  124. return err
  125. }
  126. resp, err := stream.Recv()
  127. if err != nil {
  128. return err
  129. }
  130. if !resp.Success {
  131. return fserr.New(resp.Msg)
  132. }
  133. err = txFunc(&Transaction{
  134. stream: stream,
  135. })
  136. if err != nil {
  137. return err
  138. }
  139. err = stream.Send(&request.TransactionOperation{
  140. Request: &request.TransactionOperation_TransactionEndRequest{
  141. TransactionEndRequest: &request.TransactionEndRequest{},
  142. }})
  143. if err != nil {
  144. return err
  145. }
  146. _, err = stream.Recv()
  147. if err != nil && err != io.EOF {
  148. return err
  149. }
  150. return nil
  151. }