client.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. // Package mbsdsdk 从 SystemData(mbsd)拉取 system_configs 的客户端 SDK。
  2. package mbsdsdk
  3. import (
  4. "encoding/json"
  5. "errors"
  6. "math/rand"
  7. "net/url"
  8. "strconv"
  9. "sync"
  10. "time"
  11. "git.sxidc.com/service-supports/dapr_api/invoke"
  12. "git.sxidc.com/service-supports/fslog"
  13. )
  14. const (
  15. // DefaultURLPrefix mbsd 配置中心 API 前缀(相对 sd_url/baseUrl)。
  16. DefaultURLPrefix = "mbsd/api/v1/systemConfig"
  17. // CacheTTL 默认缓存有效期;过期后重新请求 mbsd。
  18. CacheTTL = 60 * time.Second
  19. )
  20. // ConfigInfo 与 mbsd.system_configs 返回字段一致。
  21. type ConfigInfo struct {
  22. ID string `json:"id"`
  23. ConfigKey string `json:"configKey"`
  24. ConfigValue string `json:"configValue"`
  25. ValueType string `json:"valueType"`
  26. ConfigGroup string `json:"configGroup"`
  27. Service string `json:"service"`
  28. Description string `json:"description"`
  29. }
  30. type cacheEntry struct {
  31. info ConfigInfo
  32. loadedAt time.Time
  33. }
  34. type getByKeysRequest struct {
  35. Keys []string `json:"keys"`
  36. }
  37. type getByKeysResult struct {
  38. Infos []ConfigInfo `json:"infos"`
  39. TotalCount int64 `json:"totalCount"`
  40. }
  41. type msgResponse struct {
  42. Success bool `json:"success"`
  43. ErrCode int `json:"errCode"`
  44. Msg string `json:"msg"`
  45. }
  46. type infoResponse struct {
  47. msgResponse
  48. Info getByKeysResult `json:"info"`
  49. }
  50. type updateRequest struct {
  51. ID string `json:"id"`
  52. ConfigKey string `json:"configKey"`
  53. ConfigValue string `json:"configValue"`
  54. ValueType string `json:"valueType"`
  55. ConfigGroup string `json:"configGroup"`
  56. Service string `json:"service"`
  57. Description string `json:"description"`
  58. UpdateUserID string `json:"updateUserId"`
  59. }
  60. type createRequest struct {
  61. ConfigKey string `json:"configKey"`
  62. ConfigValue string `json:"configValue"`
  63. ValueType string `json:"valueType"`
  64. ConfigGroup string `json:"configGroup"`
  65. Service string `json:"service"`
  66. Description string `json:"description"`
  67. CreateUserID string `json:"createUserId"`
  68. }
  69. var (
  70. invokeAPI *invoke.API
  71. urlPrefix = DefaultURLPrefix
  72. cacheTTL = CacheTTL
  73. cache sync.Map
  74. refreshStop chan struct{}
  75. rng = rand.New(rand.NewSource(time.Now().UnixNano()))
  76. )
  77. // SetURLPrefix 在 Init 之前调用可覆盖默认 API 前缀。
  78. func SetURLPrefix(prefix string) {
  79. if prefix != "" {
  80. urlPrefix = prefix
  81. }
  82. }
  83. // SetCacheTTL 覆盖默认缓存 TTL(供测试或特殊服务使用)。
  84. func SetCacheTTL(ttl time.Duration) {
  85. if ttl > 0 {
  86. cacheTTL = ttl
  87. }
  88. }
  89. func storeCache(key string, info ConfigInfo) {
  90. cache.Store(key, cacheEntry{info: info, loadedAt: time.Now()})
  91. }
  92. func loadCache(key string) (cacheEntry, bool) {
  93. raw, ok := cache.Load(key)
  94. if !ok {
  95. return cacheEntry{}, false
  96. }
  97. return raw.(cacheEntry), true
  98. }
  99. // Init 初始化 mbsd HTTP 客户端;baseUrl 为空则跳过,读配置一律走调用方默认值。
  100. func Init(baseUrl string, timeout time.Duration) {
  101. if baseUrl == "" || invokeAPI != nil {
  102. return
  103. }
  104. invokeAPI = invoke.NewAPI(baseUrl, timeout)
  105. }
  106. // Destroy 释放客户端并停止后台刷新。
  107. func Destroy() {
  108. StopBackgroundRefresh()
  109. if invokeAPI == nil {
  110. return
  111. }
  112. invoke.DestroyAPI(invokeAPI)
  113. invokeAPI = nil
  114. cache = sync.Map{}
  115. }
  116. // StartBackgroundRefresh 定时刷新 keys;失败保留旧缓存。启动前增加随机 jitter 打散多实例齐刷。
  117. func StartBackgroundRefresh(interval time.Duration, keys ...string) {
  118. if invokeAPI == nil || len(keys) == 0 {
  119. return
  120. }
  121. if interval <= 0 {
  122. interval = cacheTTL
  123. }
  124. StopBackgroundRefresh()
  125. refreshStop = make(chan struct{})
  126. keySlice := append([]string(nil), keys...)
  127. go func() {
  128. jitter := time.Duration(rng.Int63n(int64(interval)))
  129. timer := time.NewTimer(jitter)
  130. select {
  131. case <-timer.C:
  132. case <-refreshStop:
  133. timer.Stop()
  134. return
  135. }
  136. if err := Refresh(keySlice...); err != nil {
  137. fslog.Warn("mbsdsdk: 首次刷新失败,将使用代码兜底直至拉取成功: " + err.Error())
  138. }
  139. ticker := time.NewTicker(interval)
  140. defer ticker.Stop()
  141. for {
  142. select {
  143. case <-ticker.C:
  144. if err := Refresh(keySlice...); err != nil {
  145. fslog.Warn("mbsdsdk: 定时刷新失败,继续使用上一版缓存或兜底: " + err.Error())
  146. }
  147. case <-refreshStop:
  148. return
  149. }
  150. }
  151. }()
  152. }
  153. // StopBackgroundRefresh 停止后台刷新协程。
  154. func StopBackgroundRefresh() {
  155. if refreshStop != nil {
  156. close(refreshStop)
  157. refreshStop = nil
  158. }
  159. }
  160. // Refresh 立即从 mbsd 拉取并更新缓存。
  161. func Refresh(keys ...string) error {
  162. configs, err := GetByKeys(keys)
  163. if err != nil {
  164. return err
  165. }
  166. for key, value := range configs {
  167. storeCache(key, value)
  168. }
  169. return nil
  170. }
  171. // GetByKeys 批量查询;成功项写入缓存。
  172. func GetByKeys(keys []string) (map[string]ConfigInfo, error) {
  173. if invokeAPI == nil {
  174. return map[string]ConfigInfo{}, nil
  175. }
  176. if len(keys) == 0 {
  177. return map[string]ConfigInfo{}, nil
  178. }
  179. body, err := json.Marshal(&getByKeysRequest{Keys: keys})
  180. if err != nil {
  181. return nil, err
  182. }
  183. path, err := url.JoinPath(urlPrefix, "getByKeys")
  184. if err != nil {
  185. return nil, err
  186. }
  187. respBytes, err := invokeAPI.PostJSON(path, body, nil)
  188. if err != nil {
  189. return nil, err
  190. }
  191. var resp infoResponse
  192. if err = json.Unmarshal(respBytes, &resp); err != nil {
  193. return nil, err
  194. }
  195. if !resp.Success {
  196. return nil, errors.New(resp.Msg)
  197. }
  198. result := make(map[string]ConfigInfo, len(resp.Info.Infos))
  199. for _, info := range resp.Info.Infos {
  200. result[info.ConfigKey] = info
  201. storeCache(info.ConfigKey, info)
  202. }
  203. return result, nil
  204. }
  205. // Get 读取单个 key(带 TTL 与过期后 stale 回退)。
  206. func Get(key string) (ConfigInfo, bool, error) {
  207. if invokeAPI == nil {
  208. return ConfigInfo{}, false, nil
  209. }
  210. var stale *ConfigInfo
  211. if entry, ok := loadCache(key); ok {
  212. if time.Since(entry.loadedAt) < cacheTTL {
  213. return entry.info, true, nil
  214. }
  215. if entry.info.ConfigValue != "" {
  216. stale = &entry.info
  217. }
  218. }
  219. configs, err := GetByKeys([]string{key})
  220. if err != nil {
  221. if stale != nil {
  222. return *stale, true, nil
  223. }
  224. return ConfigInfo{}, false, err
  225. }
  226. info, ok := configs[key]
  227. if !ok && stale != nil {
  228. return *stale, true, nil
  229. }
  230. return info, ok, nil
  231. }
  232. // GetString 读取字符串配置;未配置/失败/空值时返回 defaultValue。
  233. func GetString(key, defaultValue string) string {
  234. info, ok, err := Get(key)
  235. if err != nil || !ok || info.ConfigValue == "" {
  236. return defaultValue
  237. }
  238. return info.ConfigValue
  239. }
  240. // GetInt 读取整数配置。
  241. func GetInt(key string, defaultValue int) int {
  242. info, ok, err := Get(key)
  243. if err != nil || !ok || info.ConfigValue == "" {
  244. return defaultValue
  245. }
  246. intValue, err := strconv.Atoi(info.ConfigValue)
  247. if err != nil {
  248. return defaultValue
  249. }
  250. return intValue
  251. }
  252. // GetBool 读取布尔配置。
  253. func GetBool(key string, defaultValue bool) bool {
  254. value := GetString(key, "")
  255. if value == "" {
  256. return defaultValue
  257. }
  258. boolValue, err := strconv.ParseBool(value)
  259. if err != nil {
  260. return defaultValue
  261. }
  262. return boolValue
  263. }
  264. // Update 更新已有配置项(需带 ID)。
  265. func Update(info ConfigInfo, updateUserID string) error {
  266. if invokeAPI == nil {
  267. return errors.New("mbsdsdk not initialized")
  268. }
  269. if info.ID == "" {
  270. return errors.New("config id is required")
  271. }
  272. body, err := json.Marshal(&updateRequest{
  273. ID: info.ID,
  274. ConfigKey: info.ConfigKey,
  275. ConfigValue: info.ConfigValue,
  276. ValueType: info.ValueType,
  277. ConfigGroup: info.ConfigGroup,
  278. Service: info.Service,
  279. Description: info.Description,
  280. UpdateUserID: updateUserID,
  281. })
  282. if err != nil {
  283. return err
  284. }
  285. path, err := url.JoinPath(urlPrefix, "update")
  286. if err != nil {
  287. return err
  288. }
  289. respBytes, err := invokeAPI.PutJSON(path, body, nil)
  290. if err != nil {
  291. return err
  292. }
  293. var resp msgResponse
  294. if err = json.Unmarshal(respBytes, &resp); err != nil {
  295. return err
  296. }
  297. if !resp.Success {
  298. return errors.New(resp.Msg)
  299. }
  300. storeCache(info.ConfigKey, info)
  301. return nil
  302. }
  303. // Create 新建配置项。
  304. func Create(info ConfigInfo, createUserID string) error {
  305. if invokeAPI == nil {
  306. return errors.New("mbsdsdk not initialized")
  307. }
  308. body, err := json.Marshal(&createRequest{
  309. ConfigKey: info.ConfigKey,
  310. ConfigValue: info.ConfigValue,
  311. ValueType: info.ValueType,
  312. ConfigGroup: info.ConfigGroup,
  313. Service: info.Service,
  314. Description: info.Description,
  315. CreateUserID: createUserID,
  316. })
  317. if err != nil {
  318. return err
  319. }
  320. path, err := url.JoinPath(urlPrefix, "create")
  321. if err != nil {
  322. return err
  323. }
  324. respBytes, err := invokeAPI.PostJSON(path, body, nil)
  325. if err != nil {
  326. return err
  327. }
  328. var resp msgResponse
  329. if err = json.Unmarshal(respBytes, &resp); err != nil {
  330. return err
  331. }
  332. if !resp.Success {
  333. return errors.New(resp.Msg)
  334. }
  335. return nil
  336. }
  337. // GetJSON 将配置值反序列化到 target;未找到 key 时返回错误。
  338. func GetJSON(key string, target any) error {
  339. info, ok, err := Get(key)
  340. if err != nil {
  341. return err
  342. }
  343. if !ok {
  344. return errors.New("system config not found: " + key)
  345. }
  346. return json.Unmarshal([]byte(info.ConfigValue), target)
  347. }