client.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. // Package mbsdsdk 从 SystemData(mbsd)拉取 system_configs 的客户端 SDK。
  2. package mbsdsdk
  3. import (
  4. "encoding/json"
  5. "errors"
  6. "math/rand"
  7. "net/url"
  8. "strconv"
  9. "sync"
  10. "time"
  11. "git.sxidc.com/service-supports/dapr_api/invoke"
  12. "git.sxidc.com/service-supports/fslog"
  13. )
  14. const (
  15. // DefaultURLPrefix mbsd 配置中心 API 前缀(相对 sd_url/baseUrl)。
  16. DefaultURLPrefix = "mbsd/api/v1/systemConfig"
  17. // CacheTTL 默认缓存有效期;过期后重新请求 mbsd。
  18. CacheTTL = 60 * time.Second
  19. )
  20. // ConfigInfo 与 mbsd.system_configs 返回字段一致。
  21. type ConfigInfo struct {
  22. ID string `json:"id"`
  23. ConfigKey string `json:"configKey"`
  24. ConfigValue string `json:"configValue"`
  25. ValueType string `json:"valueType"`
  26. ConfigGroup string `json:"configGroup"`
  27. Service string `json:"service"`
  28. Description string `json:"description"`
  29. }
  30. type cacheEntry struct {
  31. info ConfigInfo
  32. loadedAt time.Time
  33. }
  34. type getByKeysRequest struct {
  35. Keys []string `json:"keys"`
  36. }
  37. type getByKeysResult struct {
  38. Infos []ConfigInfo `json:"infos"`
  39. TotalCount int64 `json:"totalCount"`
  40. }
  41. type msgResponse struct {
  42. Success bool `json:"success"`
  43. ErrCode int `json:"errCode"`
  44. Msg string `json:"msg"`
  45. }
  46. type infoResponse struct {
  47. msgResponse
  48. Info getByKeysResult `json:"info"`
  49. }
  50. var (
  51. invokeAPI *invoke.API
  52. urlPrefix = DefaultURLPrefix
  53. cacheTTL = CacheTTL
  54. cache sync.Map
  55. refreshStop chan struct{}
  56. rng = rand.New(rand.NewSource(time.Now().UnixNano()))
  57. )
  58. // SetURLPrefix 在 Init 之前调用可覆盖默认 API 前缀。
  59. func SetURLPrefix(prefix string) {
  60. if prefix != "" {
  61. urlPrefix = prefix
  62. }
  63. }
  64. // SetCacheTTL 覆盖默认缓存 TTL(供测试或特殊服务使用)。
  65. func SetCacheTTL(ttl time.Duration) {
  66. if ttl > 0 {
  67. cacheTTL = ttl
  68. }
  69. }
  70. func storeCache(key string, info ConfigInfo) {
  71. cache.Store(key, cacheEntry{info: info, loadedAt: time.Now()})
  72. }
  73. func loadCache(key string) (cacheEntry, bool) {
  74. raw, ok := cache.Load(key)
  75. if !ok {
  76. return cacheEntry{}, false
  77. }
  78. return raw.(cacheEntry), true
  79. }
  80. // Init 初始化 mbsd HTTP 客户端;baseUrl 为空则跳过,读配置一律走调用方默认值。
  81. func Init(baseUrl string, timeout time.Duration) {
  82. if baseUrl == "" || invokeAPI != nil {
  83. return
  84. }
  85. invokeAPI = invoke.NewAPI(baseUrl, timeout)
  86. }
  87. // Destroy 释放客户端并停止后台刷新。
  88. func Destroy() {
  89. StopBackgroundRefresh()
  90. if invokeAPI == nil {
  91. return
  92. }
  93. invoke.DestroyAPI(invokeAPI)
  94. invokeAPI = nil
  95. cache = sync.Map{}
  96. }
  97. // StartBackgroundRefresh 定时刷新 keys;失败保留旧缓存。启动前增加随机 jitter 打散多实例齐刷。
  98. func StartBackgroundRefresh(interval time.Duration, keys ...string) {
  99. if invokeAPI == nil || len(keys) == 0 {
  100. return
  101. }
  102. if interval <= 0 {
  103. interval = cacheTTL
  104. }
  105. StopBackgroundRefresh()
  106. refreshStop = make(chan struct{})
  107. keySlice := append([]string(nil), keys...)
  108. go func() {
  109. jitter := time.Duration(rng.Int63n(int64(interval)))
  110. timer := time.NewTimer(jitter)
  111. select {
  112. case <-timer.C:
  113. case <-refreshStop:
  114. timer.Stop()
  115. return
  116. }
  117. if err := Refresh(keySlice...); err != nil {
  118. fslog.Warn("mbsdsdk: 首次刷新失败,将使用代码兜底直至拉取成功: " + err.Error())
  119. }
  120. ticker := time.NewTicker(interval)
  121. defer ticker.Stop()
  122. for {
  123. select {
  124. case <-ticker.C:
  125. if err := Refresh(keySlice...); err != nil {
  126. fslog.Warn("mbsdsdk: 定时刷新失败,继续使用上一版缓存或兜底: " + err.Error())
  127. }
  128. case <-refreshStop:
  129. return
  130. }
  131. }
  132. }()
  133. }
  134. // StopBackgroundRefresh 停止后台刷新协程。
  135. func StopBackgroundRefresh() {
  136. if refreshStop != nil {
  137. close(refreshStop)
  138. refreshStop = nil
  139. }
  140. }
  141. // Refresh 立即从 mbsd 拉取并更新缓存。
  142. func Refresh(keys ...string) error {
  143. configs, err := GetByKeys(keys)
  144. if err != nil {
  145. return err
  146. }
  147. for key, value := range configs {
  148. storeCache(key, value)
  149. }
  150. return nil
  151. }
  152. // GetByKeys 批量查询;成功项写入缓存。
  153. func GetByKeys(keys []string) (map[string]ConfigInfo, error) {
  154. if invokeAPI == nil {
  155. return map[string]ConfigInfo{}, nil
  156. }
  157. if len(keys) == 0 {
  158. return map[string]ConfigInfo{}, nil
  159. }
  160. body, err := json.Marshal(&getByKeysRequest{Keys: keys})
  161. if err != nil {
  162. return nil, err
  163. }
  164. path, err := url.JoinPath(urlPrefix, "getByKeys")
  165. if err != nil {
  166. return nil, err
  167. }
  168. respBytes, err := invokeAPI.PostJSON(path, body, nil)
  169. if err != nil {
  170. return nil, err
  171. }
  172. var resp infoResponse
  173. if err = json.Unmarshal(respBytes, &resp); err != nil {
  174. return nil, err
  175. }
  176. if !resp.Success {
  177. return nil, errors.New(resp.Msg)
  178. }
  179. result := make(map[string]ConfigInfo, len(resp.Info.Infos))
  180. for _, info := range resp.Info.Infos {
  181. result[info.ConfigKey] = info
  182. storeCache(info.ConfigKey, info)
  183. }
  184. return result, nil
  185. }
  186. // Get 读取单个 key(带 TTL 与过期后 stale 回退)。
  187. func Get(key string) (ConfigInfo, bool, error) {
  188. if invokeAPI == nil {
  189. return ConfigInfo{}, false, nil
  190. }
  191. var stale *ConfigInfo
  192. if entry, ok := loadCache(key); ok {
  193. if time.Since(entry.loadedAt) < cacheTTL {
  194. return entry.info, true, nil
  195. }
  196. if entry.info.ConfigValue != "" {
  197. stale = &entry.info
  198. }
  199. }
  200. configs, err := GetByKeys([]string{key})
  201. if err != nil {
  202. if stale != nil {
  203. return *stale, true, nil
  204. }
  205. return ConfigInfo{}, false, err
  206. }
  207. info, ok := configs[key]
  208. if !ok && stale != nil {
  209. return *stale, true, nil
  210. }
  211. return info, ok, nil
  212. }
  213. // GetString 读取字符串配置;未配置/失败/空值时返回 defaultValue。
  214. func GetString(key, defaultValue string) string {
  215. info, ok, err := Get(key)
  216. if err != nil || !ok || info.ConfigValue == "" {
  217. return defaultValue
  218. }
  219. return info.ConfigValue
  220. }
  221. // GetInt 读取整数配置。
  222. func GetInt(key string, defaultValue int) int {
  223. info, ok, err := Get(key)
  224. if err != nil || !ok || info.ConfigValue == "" {
  225. return defaultValue
  226. }
  227. intValue, err := strconv.Atoi(info.ConfigValue)
  228. if err != nil {
  229. return defaultValue
  230. }
  231. return intValue
  232. }
  233. // GetBool 读取布尔配置。
  234. func GetBool(key string, defaultValue bool) bool {
  235. value := GetString(key, "")
  236. if value == "" {
  237. return defaultValue
  238. }
  239. boolValue, err := strconv.ParseBool(value)
  240. if err != nil {
  241. return defaultValue
  242. }
  243. return boolValue
  244. }
  245. // GetJSON 将配置值反序列化到 target;未找到 key 时返回错误。
  246. func GetJSON(key string, target any) error {
  247. info, ok, err := Get(key)
  248. if err != nil {
  249. return err
  250. }
  251. if !ok {
  252. return errors.New("system config not found: " + key)
  253. }
  254. return json.Unmarshal([]byte(info.ConfigValue), target)
  255. }