client.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. // Package mbsdsdk 从 SystemData(mbsd)拉取 system_configs 的客户端 SDK。
  2. package mbsdsdk
  3. import (
  4. "encoding/json"
  5. "errors"
  6. "math/rand"
  7. "net/url"
  8. "strconv"
  9. "sync"
  10. "time"
  11. "git.sxidc.com/service-supports/dapr_api/invoke"
  12. "git.sxidc.com/service-supports/fslog"
  13. )
  14. const (
  15. // DefaultURLPrefix mbsd 配置中心 API 前缀(相对 sd_url/baseUrl)。
  16. DefaultURLPrefix = "mbsd/api/v1/systemConfig"
  17. // CacheTTL 默认缓存有效期;过期后重新请求 mbsd。
  18. CacheTTL = 60 * time.Second
  19. )
  20. // ConfigInfo 与 mbsd.system_configs 返回字段一致。
  21. type ConfigInfo struct {
  22. ID string `json:"id"`
  23. ConfigKey string `json:"configKey"`
  24. ConfigValue string `json:"configValue"`
  25. ValueType string `json:"valueType"`
  26. ConfigGroup string `json:"configGroup"`
  27. Service string `json:"service"`
  28. Description string `json:"description"`
  29. }
  30. type cacheEntry struct {
  31. info ConfigInfo
  32. loadedAt time.Time
  33. }
  34. type getByKeysRequest struct {
  35. Keys []string `json:"keys"`
  36. }
  37. type getByKeysResult struct {
  38. Infos []ConfigInfo `json:"infos"`
  39. TotalCount int64 `json:"totalCount"`
  40. }
  41. type msgResponse struct {
  42. Success bool `json:"success"`
  43. ErrCode int `json:"errCode"`
  44. Msg string `json:"msg"`
  45. }
  46. type infoResponse struct {
  47. msgResponse
  48. Info getByKeysResult `json:"info"`
  49. }
  50. type updateRequest struct {
  51. ID string `json:"id"`
  52. ConfigKey string `json:"configKey"`
  53. ConfigValue string `json:"configValue"`
  54. ValueType string `json:"valueType"`
  55. ConfigGroup string `json:"configGroup"`
  56. Service string `json:"service"`
  57. Description string `json:"description"`
  58. UpdateUserID string `json:"updateUserId"`
  59. OperatorUserName string `json:"operatorUserName"`
  60. }
  61. type createRequest struct {
  62. ConfigKey string `json:"configKey"`
  63. ConfigValue string `json:"configValue"`
  64. ValueType string `json:"valueType"`
  65. ConfigGroup string `json:"configGroup"`
  66. Service string `json:"service"`
  67. Description string `json:"description"`
  68. CreateUserID string `json:"createUserId"`
  69. OperatorUserName string `json:"operatorUserName"`
  70. }
  71. var (
  72. invokeAPI *invoke.API
  73. urlPrefix = DefaultURLPrefix
  74. cacheTTL = CacheTTL
  75. cache sync.Map
  76. refreshStop chan struct{}
  77. rng = rand.New(rand.NewSource(time.Now().UnixNano()))
  78. )
  79. // SetURLPrefix 在 Init 之前调用可覆盖默认 API 前缀。
  80. func SetURLPrefix(prefix string) {
  81. if prefix != "" {
  82. urlPrefix = prefix
  83. }
  84. }
  85. // SetCacheTTL 覆盖默认缓存 TTL(供测试或特殊服务使用)。
  86. func SetCacheTTL(ttl time.Duration) {
  87. if ttl > 0 {
  88. cacheTTL = ttl
  89. }
  90. }
  91. func storeCache(key string, info ConfigInfo) {
  92. cache.Store(key, cacheEntry{info: info, loadedAt: time.Now()})
  93. }
  94. func loadCache(key string) (cacheEntry, bool) {
  95. raw, ok := cache.Load(key)
  96. if !ok {
  97. return cacheEntry{}, false
  98. }
  99. return raw.(cacheEntry), true
  100. }
  101. // Init 初始化 mbsd HTTP 客户端;baseUrl 为空则跳过,读配置一律走调用方默认值。
  102. func Init(baseUrl string, timeout time.Duration) {
  103. if baseUrl == "" || invokeAPI != nil {
  104. return
  105. }
  106. invokeAPI = invoke.NewAPI(baseUrl, timeout)
  107. }
  108. // Destroy 释放客户端并停止后台刷新。
  109. func Destroy() {
  110. StopBackgroundRefresh()
  111. if invokeAPI == nil {
  112. return
  113. }
  114. invoke.DestroyAPI(invokeAPI)
  115. invokeAPI = nil
  116. cache = sync.Map{}
  117. }
  118. // StartBackgroundRefresh 定时刷新 keys;失败保留旧缓存。启动前增加随机 jitter 打散多实例齐刷。
  119. func StartBackgroundRefresh(interval time.Duration, keys ...string) {
  120. if invokeAPI == nil || len(keys) == 0 {
  121. return
  122. }
  123. if interval <= 0 {
  124. interval = cacheTTL
  125. }
  126. StopBackgroundRefresh()
  127. refreshStop = make(chan struct{})
  128. keySlice := append([]string(nil), keys...)
  129. go func() {
  130. jitter := time.Duration(rng.Int63n(int64(interval)))
  131. timer := time.NewTimer(jitter)
  132. select {
  133. case <-timer.C:
  134. case <-refreshStop:
  135. timer.Stop()
  136. return
  137. }
  138. if err := Refresh(keySlice...); err != nil {
  139. fslog.Warn("mbsdsdk: 首次刷新失败,将使用代码兜底直至拉取成功: " + err.Error())
  140. }
  141. ticker := time.NewTicker(interval)
  142. defer ticker.Stop()
  143. for {
  144. select {
  145. case <-ticker.C:
  146. if err := Refresh(keySlice...); err != nil {
  147. fslog.Warn("mbsdsdk: 定时刷新失败,继续使用上一版缓存或兜底: " + err.Error())
  148. }
  149. case <-refreshStop:
  150. return
  151. }
  152. }
  153. }()
  154. }
  155. // StopBackgroundRefresh 停止后台刷新协程。
  156. func StopBackgroundRefresh() {
  157. if refreshStop != nil {
  158. close(refreshStop)
  159. refreshStop = nil
  160. }
  161. }
  162. // Refresh 立即从 mbsd 拉取并更新缓存。
  163. func Refresh(keys ...string) error {
  164. configs, err := GetByKeys(keys)
  165. if err != nil {
  166. return err
  167. }
  168. for key, value := range configs {
  169. storeCache(key, value)
  170. }
  171. return nil
  172. }
  173. // GetByKeys 批量查询;成功项写入缓存。
  174. func GetByKeys(keys []string) (map[string]ConfigInfo, error) {
  175. if invokeAPI == nil {
  176. return map[string]ConfigInfo{}, nil
  177. }
  178. if len(keys) == 0 {
  179. return map[string]ConfigInfo{}, nil
  180. }
  181. body, err := json.Marshal(&getByKeysRequest{Keys: keys})
  182. if err != nil {
  183. return nil, err
  184. }
  185. path, err := url.JoinPath(urlPrefix, "getByKeys")
  186. if err != nil {
  187. return nil, err
  188. }
  189. respBytes, err := invokeAPI.PostJSON(path, body, nil)
  190. if err != nil {
  191. return nil, err
  192. }
  193. var resp infoResponse
  194. if err = json.Unmarshal(respBytes, &resp); err != nil {
  195. return nil, err
  196. }
  197. if !resp.Success {
  198. return nil, errors.New(resp.Msg)
  199. }
  200. result := make(map[string]ConfigInfo, len(resp.Info.Infos))
  201. for _, info := range resp.Info.Infos {
  202. result[info.ConfigKey] = info
  203. storeCache(info.ConfigKey, info)
  204. }
  205. return result, nil
  206. }
  207. // Get 读取单个 key(带 TTL 与过期后 stale 回退)。
  208. func Get(key string) (ConfigInfo, bool, error) {
  209. if invokeAPI == nil {
  210. return ConfigInfo{}, false, nil
  211. }
  212. var stale *ConfigInfo
  213. if entry, ok := loadCache(key); ok {
  214. if time.Since(entry.loadedAt) < cacheTTL {
  215. return entry.info, true, nil
  216. }
  217. if entry.info.ConfigValue != "" {
  218. stale = &entry.info
  219. }
  220. }
  221. configs, err := GetByKeys([]string{key})
  222. if err != nil {
  223. if stale != nil {
  224. return *stale, true, nil
  225. }
  226. return ConfigInfo{}, false, err
  227. }
  228. info, ok := configs[key]
  229. if !ok && stale != nil {
  230. return *stale, true, nil
  231. }
  232. return info, ok, nil
  233. }
  234. // GetString 读取字符串配置;未配置/失败/空值时返回 defaultValue。
  235. func GetString(key, defaultValue string) string {
  236. info, ok, err := Get(key)
  237. if err != nil || !ok || info.ConfigValue == "" {
  238. return defaultValue
  239. }
  240. return info.ConfigValue
  241. }
  242. // GetInt 读取整数配置。
  243. func GetInt(key string, defaultValue int) int {
  244. info, ok, err := Get(key)
  245. if err != nil || !ok || info.ConfigValue == "" {
  246. return defaultValue
  247. }
  248. intValue, err := strconv.Atoi(info.ConfigValue)
  249. if err != nil {
  250. return defaultValue
  251. }
  252. return intValue
  253. }
  254. // GetBool 读取布尔配置。
  255. func GetBool(key string, defaultValue bool) bool {
  256. value := GetString(key, "")
  257. if value == "" {
  258. return defaultValue
  259. }
  260. boolValue, err := strconv.ParseBool(value)
  261. if err != nil {
  262. return defaultValue
  263. }
  264. return boolValue
  265. }
  266. // Update 更新已有配置项(需带 ID)。
  267. func Update(info ConfigInfo, updateUserID, operatorUserName string) error {
  268. if invokeAPI == nil {
  269. return errors.New("mbsdsdk not initialized")
  270. }
  271. if info.ID == "" {
  272. return errors.New("config id is required")
  273. }
  274. if operatorUserName == "" {
  275. operatorUserName = updateUserID
  276. }
  277. body, err := json.Marshal(&updateRequest{
  278. ID: info.ID,
  279. ConfigKey: info.ConfigKey,
  280. ConfigValue: info.ConfigValue,
  281. ValueType: info.ValueType,
  282. ConfigGroup: info.ConfigGroup,
  283. Service: info.Service,
  284. Description: info.Description,
  285. UpdateUserID: updateUserID,
  286. OperatorUserName: operatorUserName,
  287. })
  288. if err != nil {
  289. return err
  290. }
  291. path, err := url.JoinPath(urlPrefix, "update")
  292. if err != nil {
  293. return err
  294. }
  295. respBytes, err := invokeAPI.PutJSON(path, body, nil)
  296. if err != nil {
  297. return err
  298. }
  299. var resp msgResponse
  300. if err = json.Unmarshal(respBytes, &resp); err != nil {
  301. return err
  302. }
  303. if !resp.Success {
  304. return errors.New(resp.Msg)
  305. }
  306. storeCache(info.ConfigKey, info)
  307. return nil
  308. }
  309. // Create 新建配置项。
  310. func Create(info ConfigInfo, createUserID, operatorUserName string) error {
  311. if invokeAPI == nil {
  312. return errors.New("mbsdsdk not initialized")
  313. }
  314. if operatorUserName == "" {
  315. operatorUserName = createUserID
  316. }
  317. body, err := json.Marshal(&createRequest{
  318. ConfigKey: info.ConfigKey,
  319. ConfigValue: info.ConfigValue,
  320. ValueType: info.ValueType,
  321. ConfigGroup: info.ConfigGroup,
  322. Service: info.Service,
  323. Description: info.Description,
  324. CreateUserID: createUserID,
  325. OperatorUserName: operatorUserName,
  326. })
  327. if err != nil {
  328. return err
  329. }
  330. path, err := url.JoinPath(urlPrefix, "create")
  331. if err != nil {
  332. return err
  333. }
  334. respBytes, err := invokeAPI.PostJSON(path, body, nil)
  335. if err != nil {
  336. return err
  337. }
  338. var resp msgResponse
  339. if err = json.Unmarshal(respBytes, &resp); err != nil {
  340. return err
  341. }
  342. if !resp.Success {
  343. return errors.New(resp.Msg)
  344. }
  345. return nil
  346. }
  347. // GetJSON 将配置值反序列化到 target;未找到 key 时返回错误。
  348. func GetJSON(key string, target any) error {
  349. info, ok, err := Get(key)
  350. if err != nil {
  351. return err
  352. }
  353. if !ok {
  354. return errors.New("system config not found: " + key)
  355. }
  356. return json.Unmarshal([]byte(info.ConfigValue), target)
  357. }