package data_service import ( "git.sxidc.com/go-framework/baize/infrastructure/database/data_service/client" "git.sxidc.com/go-framework/baize/infrastructure/database/data_service/grpc_client" "git.sxidc.com/go-framework/baize/infrastructure/database/data_service/grpc_client/v1/request" "git.sxidc.com/go-framework/baize/infrastructure/database/sql" "git.sxidc.com/go-tools/utils/strutils" "git.sxidc.com/service-supports/fserr" "io" "time" ) type Config struct { token string baseUrl string grpcAddress string namespace string dataSource string timeout time.Duration } type DataService struct { config Config client *client.Client grpcClient *grpc_client.Client } func NewDataService(config Config) (*DataService, error) { c := client.New(config.timeout) namespaceInfos, err := c.GetNamespaces(config.token, config.baseUrl, config.namespace, 1, 1) if err != nil { return nil, err } if namespaceInfos == nil || len(namespaceInfos) == 0 { return nil, fserr.New("命名空间不存在") } dataSourceInfos, err := c.GetDataSources( config.token, config.baseUrl, config.namespace, config.dataSource, "", 1, 1) if err != nil { return nil, err } if dataSourceInfos == nil || len(dataSourceInfos) == 0 { return nil, fserr.New("数据源不存在") } grpcClient, err := grpc_client.NewClient(config.grpcAddress) if err != nil { return nil, err } return &DataService{ config: config, client: c, grpcClient: grpcClient, }, nil } func DestroyDataService(ds *DataService) error { if ds == nil { return nil } err := grpc_client.Destroy(ds.grpcClient) if err != nil { return err } ds = nil return nil } func (ds *DataService) ExecuteRawSql(sqlStr string, executeParams map[string]any) ([]sql.Result, error) { if strutils.IsStringEmpty(sqlStr) { return make([]sql.Result, 0), nil } config := ds.config tableRows, err := ds.client.ExecuteRawSql(config.token, config.baseUrl, config.namespace, config.dataSource, sqlStr, executeParams) if err != nil { return nil, err } results := make([]sql.Result, len(tableRows)) for i, row := range tableRows { results[i] = row } return results, nil } func (ds *DataService) ExecuteSql(name string, executeParams map[string]any) ([]sql.Result, error) { if strutils.IsStringEmpty(name) { return nil, fserr.New("没有传递SQL资源名称") } config := ds.config tableRows, err := ds.client.ExecuteSql(config.token, config.baseUrl, config.namespace, config.dataSource, name, executeParams) if err != nil { return nil, err } results := make([]sql.Result, len(tableRows)) for i, row := range tableRows { results[i] = row } return results, nil } func (ds *DataService) Transaction(txFunc TxFunc) error { stream, err := ds.grpcClient.Transaction() if err != nil { return err } defer func() { innerErr := stream.CloseSend() if innerErr != nil { panic(innerErr) } }() err = stream.Send(&request.TransactionOperation{ Request: &request.TransactionOperation_TransactionBeginRequest{ TransactionBeginRequest: &request.TransactionBeginRequest{ Token: ds.config.token, Namespace: ds.config.namespace, DataSource: ds.config.dataSource, }, }}) if err != nil { return err } resp, err := stream.Recv() if err != nil { return err } if !resp.Success { return fserr.New(resp.Msg) } err = txFunc(&Transaction{ stream: stream, }) if err != nil { return err } err = stream.Send(&request.TransactionOperation{ Request: &request.TransactionOperation_TransactionEndRequest{ TransactionEndRequest: &request.TransactionEndRequest{}, }}) if err != nil { return err } _, err = stream.Recv() if err != nil && err != io.EOF { return err } return nil }