package data_service import ( "git.sxidc.com/go-framework/baize/infrastructure/database/data_service/client" "git.sxidc.com/go-framework/baize/infrastructure/database/data_service/grpc_client" "git.sxidc.com/go-framework/baize/infrastructure/database/data_service/grpc_client/v1/request" "git.sxidc.com/go-framework/baize/infrastructure/database/sql" "git.sxidc.com/go-tools/utils/strutils" "git.sxidc.com/service-supports/fserr" "io" "time" ) type Config struct { Token string `json:"token" yaml:"token"` BaseUrl string `json:"base_url" yaml:"base_url"` GrpcAddress string `json:"grpc_address" yaml:"grpc_address"` Namespace string `json:"namespace" yaml:"namespace"` DataSource string `json:"data_source" yaml:"data_source"` TimeoutSec int64 `json:"timeout_sec" yaml:"timeout_sec"` } type DataService struct { config Config client *client.Client grpcClient *grpc_client.Client } func NewDataService(config *Config) (*DataService, error) { c := client.New(time.Duration(config.TimeoutSec) * time.Second) namespaceInfos, err := c.GetNamespaces(config.Token, config.BaseUrl, config.Namespace, 1, 1) if err != nil { return nil, err } if namespaceInfos == nil || len(namespaceInfos) == 0 { return nil, fserr.New("命名空间不存在") } dataSourceInfos, err := c.GetDataSources( config.Token, config.BaseUrl, config.Namespace, config.DataSource, "", 1, 1) if err != nil { return nil, err } if dataSourceInfos == nil || len(dataSourceInfos) == 0 { return nil, fserr.New("数据源不存在") } grpcClient, err := grpc_client.NewClient(config.GrpcAddress) if err != nil { return nil, err } return &DataService{ config: *config, client: c, grpcClient: grpcClient, }, nil } func DestroyDataService(ds *DataService) error { if ds == nil { return nil } err := grpc_client.Destroy(ds.grpcClient) if err != nil { return err } ds = nil return nil } func (ds *DataService) ExecuteRawSql(sqlStr string, executeParams map[string]any) ([]sql.Result, error) { if strutils.IsStringEmpty(sqlStr) { return make([]sql.Result, 0), nil } config := ds.config tableRows, err := ds.client.ExecuteRawSql(config.Token, config.BaseUrl, config.Namespace, config.DataSource, sqlStr, executeParams) if err != nil { return nil, err } results := make([]sql.Result, len(tableRows)) for i, row := range tableRows { results[i] = row } return results, nil } func (ds *DataService) ExecuteSql(name string, executeParams map[string]any) ([]sql.Result, error) { if strutils.IsStringEmpty(name) { return nil, fserr.New("没有传递SQL资源名称") } config := ds.config tableRows, err := ds.client.ExecuteSql(config.Token, config.BaseUrl, config.Namespace, config.DataSource, name, executeParams) if err != nil { return nil, err } results := make([]sql.Result, len(tableRows)) for i, row := range tableRows { results[i] = row } return results, nil } func (ds *DataService) Transaction(txFunc TxFunc) error { stream, err := ds.grpcClient.Transaction() if err != nil { return err } defer func() { innerErr := stream.CloseSend() if innerErr != nil { panic(innerErr) } }() err = stream.Send(&request.TransactionOperation{ Request: &request.TransactionOperation_TransactionBeginRequest{ TransactionBeginRequest: &request.TransactionBeginRequest{ Token: ds.config.Token, Namespace: ds.config.Namespace, DataSource: ds.config.DataSource, }, }}) if err != nil { return err } resp, err := stream.Recv() if err != nil { return err } if !resp.Success { return fserr.New(resp.Msg) } err = txFunc(&Transaction{ stream: stream, }) if err != nil { return err } err = stream.Send(&request.TransactionOperation{ Request: &request.TransactionOperation_TransactionEndRequest{ TransactionEndRequest: &request.TransactionEndRequest{}, }}) if err != nil { return err } _, err = stream.Recv() if err != nil && err != io.EOF { return err } return nil }