import com.alibaba.fastjson.JSON; import com.xuebusi.spring.study.http.BasicHttpUtil; import com.xuebusi.spring.study.model.ConfData; import org.springframework.beans.factory.InitializingBean; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; import java.util.Collection; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 模拟实现配置中心配置发生变化时秒级推送至客户端 * 操作步骤: * 1.请求http://host:port/conf/add?key=&value=接口添加数据 * 2.请求http://host:port/conf/get?key=接口查看数据 * 3.请求http://host:port/conf/set?key=&value=接口修改数据 * 4.观察控制台日志会看到配置变化的通知 * <p> * 原理: * 1.项目启动时,模拟一个客户端调用 /conf/monitor 接口请求数据,创建DeferredResult对象, * 以客户端ID为key,以DeferredResult对象为value放入 deferredResultMap; * 2.当客户端调用修改接口时,就将修改后的数据放入 changedDataMap(这里用一个map存,实际应该存数据库); * 3.通过另一个线程定时(间隔1s)查看 changedDataMap 中变化的数据; * 4.如果数据有变化就遍历deferredResultMap通过 deferredResult.setResult() 通知给所有的客户端; * * @author syj */ @RestController @RequestMapping public class ConfController implements InitializingBean { private static final int BEAT_TIME_OUT = 30; private static ExecutorService executorService = Executors.newCachedThreadPool(); private static Map<String, ConfData> changedDataMap = new ConcurrentHashMap<>(); private Map<String, DeferredResult> deferredResultMap = new ConcurrentHashMap(); // 模拟数据库存储配置 public static Map<String, ConfData> confDataMap = new ConcurrentHashMap<>(); /** * 获取最新配置 * 使用 DeferredResult 返回异步结果 * * @param clientId * @return */ @GetMapping("/conf/monitor") public DeferredResult<String> monitor(String clientId) { DeferredResult<String> result = new DeferredResult<>(BEAT_TIME_OUT * 1000L, "30秒超时啦"); deferredResultMap.put(clientId, result); return result; } /** * 查询所有配置 * * @return */ @GetMapping("/conf/list") public Result<Map<String, ConfData>> list() { return new Result<>(confDataMap); } /** * 查询配置 * * @param key * @return */ @GetMapping("/conf/get") public Result<ConfData> get(String key) { ConfData confData = confDataMap.get(key); if (confData == null) { return new Result<ConfData>(Result.FAIL_CODE, "key不存在"); } else { return new Result<ConfData>(confData); } } /** * 修改配置 * * @param key * @param value * @return */ @GetMapping("/conf/set") public Result<String> set(String key, String value) { ConfData confData = confDataMap.get(key); if (confData == null) { return new Result<String>(Result.FAIL_CODE, "key不存在"); } else { if (!confData.getValue().equals(value)) { confData.setValue(value); confDataMap.put(key, confData); // 同时放到 changedDataMap 中一份最新数据 changedDataMap.put(key, confData); } return Result.SUCCESS; } } /** * 新增配置 * * @param key * @param value * @return */ @GetMapping("/conf/add") public Result<String> add(String key, String value) { ConfData confData = confDataMap.get(key); if (confData != null) { return new Result<String>(Result.FAIL_CODE, "key已经存在了"); } confDataMap.put(key, new ConfData(key, value)); return Result.SUCCESS; } @Override public void afterPropertiesSet() throws Exception { // 配置中心启动线程监控配置变化 executorService.execute(() -> { while (true) { Collection<ConfData> values = changedDataMap.values(); if (values != null && values.size() > 0) { for (String key : changedDataMap.keySet()) { for (String clientId : deferredResultMap.keySet()) { DeferredResult deferredResult = deferredResultMap.get(clientId); String msg = "通知客户端" + clientId + "配置" + key + "发生变化:" + JSON.toJSONString(changedDataMap.get(key)); deferredResult.setResult(msg); // 移除已经通知过的客户端 deferredResultMap.remove(clientId); // 移除已经通知过的数据 changedDataMap.remove(key); } } } try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } } }); // 模拟客户端 http请求最新配置 executorService.execute(() -> { String clientId = UUID.randomUUID().toString().replace("-", ""); while (true) { String url = "http://localhost:8888/conf/monitor?clientId=" + clientId; String result = BasicHttpUtil.get(url, BEAT_TIME_OUT + 30); System.out.println(Thread.currentThread().getName() + "----http调用结果:" + result); } }); } /** * 返回结果 * * @param <T> */ public static class Result<T> { public static int SUCCESS_CODE = 200; public static int FAIL_CODE = 500; public static final Result<String> SUCCESS = new Result<>(SUCCESS_CODE, "操作成功"); public static final Result<String> FAIL = new Result<>(FAIL_CODE, "操作失败"); private int code; private String msg; private T data; public Result(T data) { this.data = data; } public Result(int code, String msg) { this.code = code; this.msg = msg; } public Result(int code, String msg, T data) { this.code = code; this.msg = msg; this.data = data; } public int getCode() { return code; } public void setCode(int code) { this.code = code; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } public T getData() { return data; } public void setData(T data) { this.data = data; } } }
原文地址:https://www.cnblogs.com/jun1019/p/10807055.html
时间: 2024-10-17 17:19:06