模拟实现配置中心配置发生变化时秒级推送至客户端代码思路

import com.alibaba.fastjson.JSON;
import com.xuebusi.spring.study.http.BasicHttpUtil;
import com.xuebusi.spring.study.model.ConfData;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;

import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 模拟实现配置中心配置发生变化时秒级推送至客户端
 * 操作步骤:
 * 1.请求http://host:port/conf/add?key=&value=接口添加数据
 * 2.请求http://host:port/conf/get?key=接口查看数据
 * 3.请求http://host:port/conf/set?key=&value=接口修改数据
 * 4.观察控制台日志会看到配置变化的通知
 * <p>
 * 原理:
 * 1.项目启动时,模拟一个客户端调用 /conf/monitor 接口请求数据,创建DeferredResult对象,
 * 以客户端ID为key,以DeferredResult对象为value放入 deferredResultMap;
 * 2.当客户端调用修改接口时,就将修改后的数据放入 changedDataMap(这里用一个map存,实际应该存数据库);
 * 3.通过另一个线程定时(间隔1s)查看 changedDataMap 中变化的数据;
 * 4.如果数据有变化就遍历deferredResultMap通过 deferredResult.setResult() 通知给所有的客户端;
 *
 * @author syj
 */
@RestController
@RequestMapping
public class ConfController implements InitializingBean {

    private static final int BEAT_TIME_OUT = 30;

    private static ExecutorService executorService = Executors.newCachedThreadPool();

    private static Map<String, ConfData> changedDataMap = new ConcurrentHashMap<>();

    private Map<String, DeferredResult> deferredResultMap = new ConcurrentHashMap();

    // 模拟数据库存储配置
    public static Map<String, ConfData> confDataMap = new ConcurrentHashMap<>();

    /**
     * 获取最新配置
     * 使用 DeferredResult 返回异步结果
     *
     * @param clientId
     * @return
     */
    @GetMapping("/conf/monitor")
    public DeferredResult<String> monitor(String clientId) {
        DeferredResult<String> result = new DeferredResult<>(BEAT_TIME_OUT * 1000L, "30秒超时啦");
        deferredResultMap.put(clientId, result);
        return result;
    }

    /**
     * 查询所有配置
     *
     * @return
     */
    @GetMapping("/conf/list")
    public Result<Map<String, ConfData>> list() {
        return new Result<>(confDataMap);
    }

    /**
     * 查询配置
     *
     * @param key
     * @return
     */
    @GetMapping("/conf/get")
    public Result<ConfData> get(String key) {
        ConfData confData = confDataMap.get(key);
        if (confData == null) {
            return new Result<ConfData>(Result.FAIL_CODE, "key不存在");
        } else {
            return new Result<ConfData>(confData);
        }
    }

    /**
     * 修改配置
     *
     * @param key
     * @param value
     * @return
     */
    @GetMapping("/conf/set")
    public Result<String> set(String key, String value) {
        ConfData confData = confDataMap.get(key);
        if (confData == null) {
            return new Result<String>(Result.FAIL_CODE, "key不存在");
        } else {
            if (!confData.getValue().equals(value)) {
                confData.setValue(value);
                confDataMap.put(key, confData);
                // 同时放到 changedDataMap 中一份最新数据
                changedDataMap.put(key, confData);
            }
            return Result.SUCCESS;
        }
    }

    /**
     * 新增配置
     *
     * @param key
     * @param value
     * @return
     */
    @GetMapping("/conf/add")
    public Result<String> add(String key, String value) {
        ConfData confData = confDataMap.get(key);
        if (confData != null) {
            return new Result<String>(Result.FAIL_CODE, "key已经存在了");
        }
        confDataMap.put(key, new ConfData(key, value));
        return Result.SUCCESS;
    }

    @Override
    public void afterPropertiesSet() throws Exception {

        // 配置中心启动线程监控配置变化
        executorService.execute(() -> {
            while (true) {
                Collection<ConfData> values = changedDataMap.values();
                if (values != null && values.size() > 0) {
                    for (String key : changedDataMap.keySet()) {
                        for (String clientId : deferredResultMap.keySet()) {
                            DeferredResult deferredResult = deferredResultMap.get(clientId);
                            String msg = "通知客户端" + clientId + "配置" + key + "发生变化:" + JSON.toJSONString(changedDataMap.get(key));
                            deferredResult.setResult(msg);

                            // 移除已经通知过的客户端
                            deferredResultMap.remove(clientId);

                            // 移除已经通知过的数据
                            changedDataMap.remove(key);
                        }
                    }
                }
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        // 模拟客户端 http请求最新配置
        executorService.execute(() -> {
            String clientId = UUID.randomUUID().toString().replace("-", "");
            while (true) {
                String url = "http://localhost:8888/conf/monitor?clientId=" + clientId;
                String result = BasicHttpUtil.get(url, BEAT_TIME_OUT + 30);
                System.out.println(Thread.currentThread().getName() + "----http调用结果:" + result);
            }
        });
    }

    /**
     * 返回结果
     *
     * @param <T>
     */
    public static class Result<T> {

        public static int SUCCESS_CODE = 200;
        public static int FAIL_CODE = 500;
        public static final Result<String> SUCCESS = new Result<>(SUCCESS_CODE, "操作成功");
        public static final Result<String> FAIL = new Result<>(FAIL_CODE, "操作失败");

        private int code;
        private String msg;
        private T data;

        public Result(T data) {
            this.data = data;
        }

        public Result(int code, String msg) {
            this.code = code;
            this.msg = msg;
        }

        public Result(int code, String msg, T data) {
            this.code = code;
            this.msg = msg;
            this.data = data;
        }

        public int getCode() {
            return code;
        }

        public void setCode(int code) {
            this.code = code;
        }

        public String getMsg() {
            return msg;
        }

        public void setMsg(String msg) {
            this.msg = msg;
        }

        public T getData() {
            return data;
        }

        public void setData(T data) {
            this.data = data;
        }
    }
}

原文地址:https://www.cnblogs.com/jun1019/p/10807055.html

时间: 2024-10-17 17:19:06

模拟实现配置中心配置发生变化时秒级推送至客户端代码思路的相关文章

微服务:服务注册发现+ API 网关+配置中心+配置中心+服务跟踪

服务注册发现服务注册就是维护一个登记簿,它管理系统内所有的服务地址.当新的服务启动后,它会向登记簿交待自己的地址信息.服务的依赖方直接向登记簿要 Service Provider 地址就行了.当下用于服务注册的工具非常多 ZooKeeper,Consul,Etcd, 还有 Netflix 家的 eureka 等.服务注册有两种 形式:客户端注册和第三方注册. 客户端注册(zookeeper) 客户端注册是服务自身要负责注册与注销的工作.当服务启动后向注册中心注册自身,当服务下线时注销自己.期间还

通向高可扩展性之路(推特篇) ---- 一个推特用来支撑1亿5千万活跃用户、30万QPS、22MB每秒Firehose、以及5秒内推送信息的架构

原文链接:http://highscalability.com/blog/2013/7/8/the-architecture-twitter-uses-to-deal-with-150m-active-users.html 写于2013年7月8日,译文如下: “可以解决推特所面临的挑战”的玩具般的方案是一个常用在扩展性上的比喻.每个人都觉得推特很容易实现.稍微具备一些系统架构的知识我们就可以构建一个推特,就这么简单.但是根据推特软件开发部门的VP Raffi Krikorian在 Timelin

.net MVC 微信公众号 点击菜单拉取消息时的事件推送

官方文档:https://mp.weixin.qq.com/wiki?t=resource/res_main&id=mp1421141016&token=&lang=zh_CN 业务描述:点击菜单,推送消息,消息内容为自定义的读取数据库信息之后,为用户推送的动态消息. 首先需要用代码实现自定义菜单,为对应的菜单指定click事件. 关于自定义菜单的创建及事件指定,请看上一篇文章,本篇主要介绍事件响应的实现. MVC controller 中的代码如下: public void Me

Cygwin中使用git时无法远程推送(出现DLL文件不兼容)

Cygwin中使用Git远程推送出现DLL文件不兼容 之前在Window和Linux时使用git远程推送都没有什么问题,今天在Win7中试了下Cygwin的git push是却出现如下提示: 说是不兼容的cygwin DLL文件引起段错误. 网上试了好几个办法都不行,最后突然想到GitHub分布式主要是通过公钥和私钥的原理来实现的. 因为在Win7上已经安装了Git, 也就是说私钥已经有了.于是直接把Win7上.ssh下面的文件拷贝到Cywgin home路径下的.ssh路径,发现居然可以了.

分布式系统组件之配置中心

配置中心概述: 在分布式系统中,配置中心是一个基本的组件,它为散布在不同机器上的服务提供配置文件的通知,读取,更新服务,一般对配置中心的设计要点如下: 1)       配置持久化 2)       多语言获取接口 3)       client定时获取,并缓存到本地,MD5比较是否更新 4)       非关键路径:多层级本地缓存,配置中心,客户端机器..,只要不是所有层级都挂掉就可以访问 5)       实时通知,主动获取,定时获取 配置中心示例: 下面介绍一下diamond和qconf都

如何借助配置中心ACM加速企业IT服务快速迭代

摘要: 在5月29日召开的第二届研发效能嘉年华中,云效邀请了阿里云产品团队的伏羿和来自阿里巴巴中间件技术部的彦林带来了"如何借助配置中心ACM加速企业IT服务快速迭代"的主题分享. 分别对配置中心ACM和ACM技术进行了讲解,并且对ACM的主要应用场景进行了介绍,并进行了Demo环节. 在5月29日召开的第二届研发效能嘉年华中,云效邀请了阿里云产品团队的伏羿和来自阿里巴巴中间件技术部的彦林带来了"如何借助配置中心ACM加速企业IT服务快速迭代"的主题分享. 分别对配

Apollo配置中心介绍

1.What is Apollo 1.1 背景 随着程序功能的日益复杂,程序的配置日益增多:各种功能的开关.参数的配置.服务器的地址-- 对程序配置的期望值也越来越高:配置修改后实时生效,灰度发布,分环境.分集群管理配置,完善的权限.审核机制-- 在这样的大环境下,传统的通过配置文件.数据库等方式已经越来越无法满足开发人员对配置管理的需求. Apollo配置中心应运而生! 1.2 Apollo简介 Apollo(阿波罗)是携程框架部门研发的开源配置管理中心,能够集中化管理应用不同环境.不同集群的

springboot项目接入配置中心,实现@ConfigurationProperties的bean属性刷新方案

前言 配置中心,通过key=value的形式存储环境变量.配置中心的属性做了修改,项目中可以通过配置中心的依赖(sdk)立即感知到.需要做的就是如何在属性发生变化时,改变带有@ConfigurationProperties的bean的相关属性. 配置中心 在读配置中心源码的时候发现,里面维护了一个Environment,以及ZookeeperPropertySource.当配置中心属性发生变化的时候,清空ZookeeperPropertySource,并放入最新的属性值. public clas

springcloud(五):Spring Cloud 配置中心的基本用法

Spring Cloud 配置中心的基本用法 1. 概述 本文介绍了Spring Cloud的配置中心,介绍配置中心的如何配置服务端及配置参数,也介绍客户端如何和配置中心交互和配置参数说明. 配置中心服务器部分内容包括:服务创建,git,svn,native后端的配置,各种url访问 配置中心客户端部分内容包括:访问配置.failfast,重试 2. Spring Cloud Config的服务端 2.1. 简述 我们在开发大的系统时,由于服务较多,相同的配置(如数据库信息.缓存.开关量等)会出