// Package mbsdsdk 从 SystemData(mbsd)拉取 system_configs 的客户端 SDK。 package mbsdsdk import ( "encoding/json" "errors" "math/rand" "net/url" "strconv" "sync" "time" "git.sxidc.com/service-supports/dapr_api/invoke" "git.sxidc.com/service-supports/fslog" ) const ( // DefaultURLPrefix mbsd 配置中心 API 前缀(相对 sd_url/baseUrl)。 DefaultURLPrefix = "mbsd/api/v1/systemConfig" // CacheTTL 默认缓存有效期;过期后重新请求 mbsd。 CacheTTL = 60 * time.Second ) // ConfigInfo 与 mbsd.system_configs 返回字段一致。 type ConfigInfo struct { ID string `json:"id"` ConfigKey string `json:"configKey"` ConfigValue string `json:"configValue"` ValueType string `json:"valueType"` ConfigGroup string `json:"configGroup"` Service string `json:"service"` Description string `json:"description"` } type cacheEntry struct { info ConfigInfo loadedAt time.Time } type getByKeysRequest struct { Keys []string `json:"keys"` } type getByKeysResult struct { Infos []ConfigInfo `json:"infos"` TotalCount int64 `json:"totalCount"` } type msgResponse struct { Success bool `json:"success"` ErrCode int `json:"errCode"` Msg string `json:"msg"` } type infoResponse struct { msgResponse Info getByKeysResult `json:"info"` } var ( invokeAPI *invoke.API urlPrefix = DefaultURLPrefix cacheTTL = CacheTTL cache sync.Map refreshStop chan struct{} rng = rand.New(rand.NewSource(time.Now().UnixNano())) ) // SetURLPrefix 在 Init 之前调用可覆盖默认 API 前缀。 func SetURLPrefix(prefix string) { if prefix != "" { urlPrefix = prefix } } // SetCacheTTL 覆盖默认缓存 TTL(供测试或特殊服务使用)。 func SetCacheTTL(ttl time.Duration) { if ttl > 0 { cacheTTL = ttl } } func storeCache(key string, info ConfigInfo) { cache.Store(key, cacheEntry{info: info, loadedAt: time.Now()}) } func loadCache(key string) (cacheEntry, bool) { raw, ok := cache.Load(key) if !ok { return cacheEntry{}, false } return raw.(cacheEntry), true } // Init 初始化 mbsd HTTP 客户端;baseUrl 为空则跳过,读配置一律走调用方默认值。 func Init(baseUrl string, timeout time.Duration) { if baseUrl == "" || invokeAPI != nil { return } invokeAPI = invoke.NewAPI(baseUrl, timeout) } // Destroy 释放客户端并停止后台刷新。 func Destroy() { StopBackgroundRefresh() if invokeAPI == nil { return } invoke.DestroyAPI(invokeAPI) invokeAPI = nil cache = sync.Map{} } // StartBackgroundRefresh 定时刷新 keys;失败保留旧缓存。启动前增加随机 jitter 打散多实例齐刷。 func StartBackgroundRefresh(interval time.Duration, keys ...string) { if invokeAPI == nil || len(keys) == 0 { return } if interval <= 0 { interval = cacheTTL } StopBackgroundRefresh() refreshStop = make(chan struct{}) keySlice := append([]string(nil), keys...) go func() { jitter := time.Duration(rng.Int63n(int64(interval))) timer := time.NewTimer(jitter) select { case <-timer.C: case <-refreshStop: timer.Stop() return } if err := Refresh(keySlice...); err != nil { fslog.Warn("mbsdsdk: 首次刷新失败,将使用代码兜底直至拉取成功: " + err.Error()) } ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ticker.C: if err := Refresh(keySlice...); err != nil { fslog.Warn("mbsdsdk: 定时刷新失败,继续使用上一版缓存或兜底: " + err.Error()) } case <-refreshStop: return } } }() } // StopBackgroundRefresh 停止后台刷新协程。 func StopBackgroundRefresh() { if refreshStop != nil { close(refreshStop) refreshStop = nil } } // Refresh 立即从 mbsd 拉取并更新缓存。 func Refresh(keys ...string) error { configs, err := GetByKeys(keys) if err != nil { return err } for key, value := range configs { storeCache(key, value) } return nil } // GetByKeys 批量查询;成功项写入缓存。 func GetByKeys(keys []string) (map[string]ConfigInfo, error) { if invokeAPI == nil { return map[string]ConfigInfo{}, nil } if len(keys) == 0 { return map[string]ConfigInfo{}, nil } body, err := json.Marshal(&getByKeysRequest{Keys: keys}) if err != nil { return nil, err } path, err := url.JoinPath(urlPrefix, "getByKeys") if err != nil { return nil, err } respBytes, err := invokeAPI.PostJSON(path, body, nil) if err != nil { return nil, err } var resp infoResponse if err = json.Unmarshal(respBytes, &resp); err != nil { return nil, err } if !resp.Success { return nil, errors.New(resp.Msg) } result := make(map[string]ConfigInfo, len(resp.Info.Infos)) for _, info := range resp.Info.Infos { result[info.ConfigKey] = info storeCache(info.ConfigKey, info) } return result, nil } // Get 读取单个 key(带 TTL 与过期后 stale 回退)。 func Get(key string) (ConfigInfo, bool, error) { if invokeAPI == nil { return ConfigInfo{}, false, nil } var stale *ConfigInfo if entry, ok := loadCache(key); ok { if time.Since(entry.loadedAt) < cacheTTL { return entry.info, true, nil } if entry.info.ConfigValue != "" { stale = &entry.info } } configs, err := GetByKeys([]string{key}) if err != nil { if stale != nil { return *stale, true, nil } return ConfigInfo{}, false, err } info, ok := configs[key] if !ok && stale != nil { return *stale, true, nil } return info, ok, nil } // GetString 读取字符串配置;未配置/失败/空值时返回 defaultValue。 func GetString(key, defaultValue string) string { info, ok, err := Get(key) if err != nil || !ok || info.ConfigValue == "" { return defaultValue } return info.ConfigValue } // GetInt 读取整数配置。 func GetInt(key string, defaultValue int) int { info, ok, err := Get(key) if err != nil || !ok || info.ConfigValue == "" { return defaultValue } intValue, err := strconv.Atoi(info.ConfigValue) if err != nil { return defaultValue } return intValue } // GetBool 读取布尔配置。 func GetBool(key string, defaultValue bool) bool { value := GetString(key, "") if value == "" { return defaultValue } boolValue, err := strconv.ParseBool(value) if err != nil { return defaultValue } return boolValue } // GetJSON 将配置值反序列化到 target;未找到 key 时返回错误。 func GetJSON(key string, target any) error { info, ok, err := Get(key) if err != nil { return err } if !ok { return errors.New("system config not found: " + key) } return json.Unmarshal([]byte(info.ConfigValue), target) }