个人第一个开源分布式项目distributeTemplate的实现二 分布式配置以及上下文的维护同步

我们实现分布式 也是按照 这样一步一步来的,首先是公用的资源对象,第一个必须的公用资源 对象是配置,这配置交给用户外部填写 手动配置 并能维持同步 更新 所以需要一个配置对象 维护在内存里面,需要事件驱动监听配置文件的变化情况

ok下面来 看看代码是怎么做的 ,首先 配置 有多重配置方式 ini conf prop xml 各种方式本质上 我们分布式就是要各台主机 自己所在的节点 都能知道我在网络上情况,或者所可以找到像zookeeper 只要知道或者找到了 才能进行以后的通讯同步

我们为了能够以后支持多重配置 ,所以先把配置定义为一个接口

public interface IConfig {
    public static final String DEFAULTFILE="distribute.conf";
/**
 * 
 * 从文件读取配置初始化
 * @param file
 * 添加(修改)人:zhuyuping
 */
public void readConfigFormFile(String file);
/**
 * 从用户提供的json字符串中初始化
 * @param json
 * 添加(修改)人:zhuyuping
 */
public void readConfigFormString(String json);
/**
 * 获得系统内存中维护的内存表
 * 
 * 添加(修改)人:zhuyuping
 */
public Table<InetSocketAddress, String, Object> getConfig();
/**
 * 摄入context上下文
 * @param context
 * 添加(修改)人:zhuyuping
 */
public void setContext(Context context);
/**
 * 获得上下文context
 * @return
 * 添加(修改)人:zhuyuping
 */
public Context getContext();
public BiMap<String, InetSocketAddress> getAlias();
}

主要 是3个方法 ,一个是读配置文件到内存对象中,拧一个就是内存对象写到文件中,对于分布式还需要一个同步

有时候 我们可能需要历史版本 能够容错还原 所以可能需要一个版本号 记录当前版本,然后保持有可能多个节点上配置文件更改后,发送请求时候能够保持顺序的更新,这后面我会处理 加入一个分布式先进先出的队列的,这里暂时未加 后面 加了会补上的,还有上下文context 对象,有人会说 你为什么需要上下文对象去保存变量啊,好多框架 都有上下文这个对象,也许是拦截器模式 也许是门面模式 等等其他模式,但是 这里我都不是,其实上下文只是为了本地节点的各个功能代码段之间的一个桥梁,有句话叫做 两情若是久长时 又岂在朝朝暮暮 我们有鹊桥,他就是上下文最重要的本质 就是维护本地节点上下文贯穿 如果上下文 里面保存着门面 那么他就有门面的功能 便于用户随时获取门面对象 进行操作,ok我们来看看 context是怎么定义的

/**
 * 
 *      
 *     
 * @author zhuyuping       
 * @version 1.0     
 * @created 2014-7-9 
 * @function:上下文接口  他只是存储用户上面类所有过程中的变量不是config配置而且分布式中不会同步的只会在单点上有效 切记  、、后期如果想支持xml 配置或者其他配置 可以添加策略模式 
 */
public interface Context {

	public final static String defaultConfig="distribute.conf";//默认配置名

	public void putValue(String key,Object value);

	public Object getValue(String key);

	public void setCurrHost(String host,int port);

	public InetSocketAddress getCurrHost();
	/**
	 * 获得默认配置文件
	 * @return
	 * 添加(修改)人:zhuyuping
	 */
	public String getDefaultFc();
//	/**
//	 * 设置默认属性文件的名称
//	 * @param pfile
//	 * 添加(修改)人:zhuyuping
//	 */
//	public void setDefaultFc(String pfile);
//	/**
//	 * 注入template 门面 便于后面直接通过上下文来使用  如果要整合spring ApplicationContextAware
//	 * @param distributedTemplate
//	 * 添加(修改)人:zhuyuping
//	 */
//	public void setTemplate(DistributedOperations distributedTemplate);
//
//	public DistributedOperations getTemplate();

}

这里其实 就是一个map 保存属性key value 而常用的就取出作为一个方法了

这个context 因为后面我们给用户一个继承的 这样便于 用户实现自己的上下文 或交给其他框架上下文 以及整合所以我们实现了一个抽象的 默认实现

/**
 * 
 *      
 *     
 * @author zhuyuping       
 * @version 1.0     
 * @created 2014-7-9 下午5:58:37 
 * @function:抽象的上下文 主要是 管理context的资源 还有就是提供用户自定义 整合spring使用该类 //这里后期需整合策略 实现
 */
public abstract class AbstractContext implements Context{
    //?也可以使用LocalThread 也可以
	private Map<String,Object> context=Maps.newConcurrentMap();

	private InetSocketAddress currHost;//当前主机 比如192.168.0.1 8888

	private String dfConfig;//默认读取的配置文件 当用户 提供就修改 没有提供就默认 

    
	public AbstractContext(String dfConfig) {
		super();

		this.dfConfig = dfConfig;
		//当前classes 下的文件
		//currentPort
		this.currHost=new InetSocketAddress(ConfigFactory.load(dfConfig).getString("client.currentHost"),ConfigFactory.load(dfConfig).getInt("client.currentPort"));
	}

	@Override
	public InetSocketAddress getCurrHost() {

		return currHost;
	}

    

	@Override
	public void setCurrHost(String host,int port) {

		this.currHost=new InetSocketAddress(host, port);

	}

	@Override
	public String getDefaultFc() {

		return dfConfig!=null?dfConfig:defaultConfig;
	}

	public AbstractContext() {
		super();
		this.dfConfig=defaultConfig;

		this.currHost=new InetSocketAddress(ConfigFactory.load(defaultConfig).getString("client.currentHost"),ConfigFactory.load(defaultConfig).getInt("client.currentPort"));
	    
	}

	@Override
	public void putValue(String key, Object value) {

		context.put(key, value);

	}

	@Override
	public Object getValue(String key) {
		return context.get(key);
	}

}

ok 很简单的 维护者

然后回到刚才的配置 ,首先 我们配置文件 需要从文件读取 到配置对象中 ,这是为了用户更改 或者初始化时候 吧配置文件初始化到配置内存对象中 然后这个内存对象将会在同步 配置文件 更改 网络通讯时候用到,在对于全局的所有节点的沦陷时候 单纯的context只维护本节点桥梁信息的 已经不够用了 因为他不会同步的,这就是为什么需要他的原因,我这里采用的配置文件时conf 也就是configLoad方式,后面我会逐步添加更多的支持方式 无非是xml 等读取问题,这并不重要 思想才是重要的

/**
 * 
 *      
 *     
 * @author zhuyuping       
 * @version 1.0     
 * @created 2014-7-9 下午4:07:00  如果后期需要支持xml 其他 只需要使用策略模式 
 * @function:基类config 主要维持 配置基本信息 以及配置信息的同步 备份 同时内存中维持一张内存表table
 */
public abstract class AbstractConfig implements IConfig{
    /**
     * 当前的配置表 行 为主机别名 其中一定有一列为版本号 AotmicLong 以及配置的相关字段  值为相关的对象  
     */
protected Table<InetSocketAddress, String, Object> config=HashBasedTable.create();//table信息  table 信息 这里无需要用到 哪个ConcurrentHashMap<K, V> 因为这个只会加载读取 加载 
//不会修改,因为这个table当用户真的更新config后 会同步并同时刷到更新到文件中的 ,而且每次用户提供查询的配置
//是不会更新到文件里面的 
protected AtomicLong version=new AtomicLong(0);//初始化版本为0;//判断当前的版本 是佛在config 里面是否存在
protected BiMap<String,InetSocketAddress> alias=HashBiMap.create();
protected Context context;
     
    public BiMap<String, InetSocketAddress> getAlias() {
return alias;
}
/**
     * context 需要提供当前主机 以及 
     * @param context
     */
public AbstractConfig(Context context) {
super();
this.context = context;
wrapConfig(ConfigFactory.load(context.getDefaultFc()));
}
@Override
public void setContext(Context context) {
this.context=context;
}
@Override
public Context getContext() {
return context;
}
@Override
public void readConfigFormFile(String file) {
Config config=TypeSafeConfigLoadUtils.loadFromFile(new File(file));
wrapConfig(config);
}
@Override
public void readConfigFormString(String json) {
Config config=TypeSafeConfigLoadUtils.loadFromString(json);
wrapConfig(config);
}
/**
 * 对config进行初始化 table
 * @param config
 * 添加(修改)人:zhuyuping
 */
protected abstract void wrapConfig(Config config);
/**
 * 把table 从内存中读取从新写入到配置文件中 
 * @param config
 * 添加(修改)人:zhuyuping
 */
protected abstract String wrapTable(Table<String, String, Object> config);
/**
 * 只保留最近的5个版本 可以回滚 更新最新的
 * 
 * 添加(修改)人:zhuyuping
 */
public void updateVersion(Long version){
 
}
/**
 * 版本数更新
 *  更新完后 需要 
 * 添加(修改)人:zhuyuping
 */
public void addVersion(){
Long v=version.getAndIncrement();
//TODO 需要通知所有节点 我要修改版本了 如果同时有几个人也这样 那么接受该节点下次更新的版本号,
    //在回调函数中 更新配置 随后同步 只保留5个版本 
}
    @Override
public Table<InetSocketAddress, String, Object> getConfig() {
return config;
}
/**
     * 
     * 提交对文件配置做出的修改
     * 添加(修改)人:zhuyuping
     */
    protected abstract void commit();
/**
 * 
 * 提交对配置的修改 如果一个人在一个节点上 更改了配置 需要核对版本 并从新更新本地的配置文件
 * 添加(修改)人:zhuyuping
 */
protected abstract void sync();
    
}

这里 我为了好维护 就直接使用guava的table ,其实你可以用List< map> 实现,这里 重要是获取所有主机列表的方法

然后就是配置文件读取后 写入context 对象 当然 上面说的读取配置到内存对象 ,内存对象写入到配置文件是基础

然后看看怎么写入的 我为了以后支持xml 所以这读取方式 写入方式 交给后面的子类实现类去实现 这样实现xml只要实现这2个方法即可

/**
 * 
 *      
 *     
 * @author zhuyuping       
 * @version 1.0     
 * @created 2014-7-9 下午10:38:05 
 * @function:默认的配置  允许用户实现自定义的配置规则 只需要继承 AbstractConfig
 */
public class SimpleDistributeConfig extends AbstractConfig{
	public SimpleDistributeConfig(Context context) {
		super(context);

	}

	@Override
	protected void wrapConfig(Config configObj) {
		//得到所有的节点
		List<? extends ConfigObject> nodes=configObj.getObjectList("server.hosts");
		int i=0;
		for (ConfigObject node : nodes) {

		   i++;
		   //如果后期添加其他mysql 等支持 这里就需要添加判断
		  
		   //Integer localport=node.containsKey("localPort")?Integer.parseInt(node.get("localPort").render()):LOCALPORT;//Integer.parseInt(node.get("localPort").render());
		   Integer remotePort=Integer.parseInt(node.get("remotePort").render());
		   String remoteIp=node.get("remoteHost").unwrapped().toString();//远程主机的ip
		   //开始初始化配置table
		   String name=node.containsKey("name")?node.get("name").unwrapped().toString():remoteIp;//主机别名
		   InetSocketAddress remoteHost=new InetSocketAddress(remoteIp, remotePort);
		   super.alias.put(name, remoteHost);
		   super.config.put(remoteHost,"version", super.version.incrementAndGet());
		   super.config.put(remoteHost, "remoteHost", remoteHost);
		   //super.config.put(remoteHost, "localPort", localport);
		   super.config.put(remoteHost, "remotePort", remotePort);
		   super.config.put(remoteHost, "name", name);
		   
		 
		   if(node.containsKey("file")){
			   HashMap fcs=(HashMap) node.get("file").unwrapped();
			   String syncPath=fcs.get("syncPath").toString();//文件同步的路径
			  // System.out.println("SimpleDistributeConfig.wrapConfig() "+syncPath);
			   super.config.put(remoteHost, "file", syncPath);//以后配置多的时候 分装成一个bean存入
		   }
		   
		   
		}
		String chost=configObj.getString("client.currentHost");
		int port=configObj.getInt("client.currentPort");
		super.context.setCurrHost(chost, port);

		//config.root().containsKey(key)
		//config.getString(path);
	}

	@Override
	protected String wrapTable(Table<String, String, Object> table) {
		StringBuilder sb=new StringBuilder("server{");

		sb.append("\n\t").append("hosts=[");
		Set<String> rows=table.rowKeySet();
		int size=rows.size();
		int i=0;
		for (String row : rows) {
			i++;
			Map<String,Object> map=table.row(row);
			if(!map.containsKey("remoteHost")) continue;
			sb.append("\t {");
			Object remoteHost=map.get("remoteHost");
			Object remotePort=map.get("remotePort");
			//Object localPort=map.get("localPort");
			Object name=map.get("name");
			sb.append("name=").append(name).append("\n\t");
			//sb.append("localPort=").append(localPort).append("\n\t");
			sb.append("remoteHost=").append(remoteHost).append("\n\t");
			sb.append("remotePort=").append(remotePort).append("\n\t");
		    if(map.containsKey("file")){
		    	sb.append("file{").append("\n\t").append("syncPath=").append(map.get("syncPath")).append("}");
		    }
		   
		    sb.append("\t }");
		    if(i!=size){
		    	sb.append(",");
		    }
		}
	   sb.append("]").append("\n\t").append("}");
	  //继续 保存client
	  
	   sb.append("\n\t").append("client{").append("\n\t").append("currentHost=").append(context.getCurrHost().getHostString()).append("\n\t");
	   sb.append("\n\t").append("currentPort=").append(context.getCurrHost().getPort()).append("\n\t");
	   sb.append("}");
	   
	   return sb.toString();

	}

	@Override
	protected void commit() {

	}

	@Override
	protected void sync() {

	}

	public static void main(String[] args) {

	}

}

ok 基本的简单的 配置 以及 上下文 这些用来同步的 用来做桥梁的都已经做好了 下面就是怎么监听配置文件的更改,有人说怎么监听一个配置文件更改啊,其实 文件有个属性 最后更改的时间,只要监听这个就可以 初级版本没有加上更改,这个后面可以随时加上 而且为了以后好更改 只写了相关方式没有 加上

import java.io.File;

public interface FileListener {
	public void fileChanged(File file);
}

然后用定时器 timeer 或者线程池定时线程去轮训他 ,

然后比较时间 一个一个通知事件  (图片传反了)

这里 注意弱应用 ,其实 对于这些时间 我建议用LinkTransferQueue 他是FIFO 先进先出的无阻晒的队列 然后队列 加上若引用,保证时间的一些顺序,

然后 通知了接口 在接口里面我们在做相应的更改配置 同步配置 的相关操作,这一章基本思想就是

同步 资源 配置 上下文为桥梁 事件驱动为引擎

还有支出的是 timer excutors.newSchulerXXXX  还有很多方式实现轮训的方式  这种方式 也可以实现心跳线

同时 告诉大家的事 Apache、 camel| 里面的事件驱动文件部分 核心代码就是上面 这一小部分

代码 没有贴完整 ,但是代码已经托管到  http://git.oschina.net/zhuyuping/distributeTemplate

个人第一个开源分布式项目distributeTemplate的实现二 分布式配置以及上下文的维护同步

时间: 2024-08-15 10:24:59

个人第一个开源分布式项目distributeTemplate的实现二 分布式配置以及上下文的维护同步的相关文章

个人第一个开源分布式项目distributeTemplate的实现三 网络通讯netty传输大文件

今天 我将讲讲网络通讯,这里我初始版本 由于采用的事Netty框架  所以 这里讲网络Netty在我们这里是怎么使用的,下周开始添加rpc lucene内容了 实现之后的0.2 0.3版本,后面将会去掉netty依赖 采用原生的NIO2 (aio) 异步非阻塞方式 实现自己网络通讯,也就是说 这部分可能会实现一个简单的但是比netty精简高效的网络框架,后期做出来 可能会单独开一个分支开源出来,netty说白了 就是 事件驱动 以及 NIO 加一些协议 以及 异常 处理,废话不多说了. 我最近

【ASP.NET Core分布式项目实战】(二)oauth2 + oidc 实现 server部分

原文:[ASP.NET Core分布式项目实战](二)oauth2 + oidc 实现 server部分 本博客根据http://video.jessetalk.cn/my/course/5视频整理(内容可能会有部分,推荐看源视频学习) 资料 我们基于之前的MvcCookieAuthSample来做开发 MvcCookieAuthSample下载地址:https://files.cnblogs.com/files/wyt007/ASPNETCore%E5%BF%AB%E9%80%9F%E5%85

我的第一个开源小项目终于诞生了!

这个小项目(卡片秀)是一个卡片抽奖特效,用开源项目这样的词语让我多少有些羞愧,毕竟作为一个涉世未深的小伙子,用项目的标准衡量还有很大差距.不过该案例采用 jQuery 插件方式编写,提供配置参数并且做了浏览器兼容优化,整体而言作为一个小项目也不为过.目前正在持续更新. 话不多少,先上地址:https://github.com/nzbin/CardShow/tree/master 当然,博主写这篇文章不是为了炫耀这个 Demo,而是交流 jQuery 插件的编写以及这一项目中遇到的各种问题.现在的

分布式缓存系统Memcached(十二)——基本配置与命令

为了方便测试,给出一个C客户端libmemcached链接:https://launchpad.net/libmemcached/ 以及memcacheclient-2.0 : http://code.jellycan.com/files/memcacheclient-2.0.zip(已生成 sln,在windows下直接用VS打开,编译成功) 在Memcached启动时,有很多配置参数可以选择,以下参数对应memcached1.4.15,现给出这些参数的具体含义: "a:" //un

CSDN开源夏令营项目进入第一实习期,入选名单和优秀开题报告推荐名单公布

由CSDN推出的面向中国在校大学生的"开源夏令营"技术公益活动已于7月4日截止学生报名,自6月16日起,共有1785名学生参与了活动报名,445名学生提交了共650份开题报告,经过导师们的精心挑选和认真考核,共有67名学生最终入选. 在这短短的20天当中,学生们参与热情十足,导师们更是积极负责的与报名学生沟通并审阅其开题报告,从众多学生中选定出最符合自己项目要求的参与者. 本次活动每一个开源项目均施行一对一导师制度,最终有67名学生脱颖而出,成功获得进入第一阶段实习期的机会.据导师反馈

C#开源资源项目

一.AOP框架 Encase 是C#编写开发的为.NET平台提供的AOP框架.Encase 独特的提供了把方面(aspects)部署到运行时代码,而其它AOP框架依赖配置文件的方式.这种部署方面(aspects)的方法帮助缺少经验的开发人员提高开发效率. NKalore是一款编程语言,它扩展了C#允许在.net平台使用AOP.NKalore的语法简单.直观,它的编译器是基于Mono C#编译器(MCS).NKalore目前只能在命令行或#Develop内部使用.NKalore兼容公共语言规范CL

AI 也开源:50 大开源 AI 项目 (转)

这些开源AI项目专注于机器学习.深度学习.神经网络及其他应用场合. 自IT界早期以来,研制出能像人类那样“思考”的机器一直是研究人员的一大目标.在过去几年,计算机科学家们在人工智能(AI)领域已取得了巨大进展,如今这项技术日益普及开来. 事实上,Gartner预测“到2020年,AI技术实际上将普遍出现在几乎每一个新的软件产品和服务中.”IDC预测,2017年企业界在AI技术上的开支将达到125亿美元,比2016年增长逾59.3%.这股强劲的增长势头可能会一直持续到2020年,到时收入有望达到4

前后端分离分布式项目开发(二)-Dubbox框架

==========================================精品教程推送:PL14352018年最新JavaEE(Java1.8)SSMWeb项目实战学习要趁早,点滴记录,学习就是进步! 不要到处找了,抓紧提升自己!版权保护,加Q:1225462853 Q群:702101215或关注公众号:菜鸟奋斗 ========================================== 3.Dubbox框架 3.1 Dubbox简介 Dubbox 是一个分布式服务框架,其前

[转]基于C#的开源GIS项目介绍之SharpMap篇

我是一个刚毕业的GIS本科毕业生,目前在杭州从事GIS软件应用开发.在项目开发中总感觉自己的编程水平还不够,于是想找些开源GIS小项目来研究研究,借以提高自己的编程能力和项目开发能力.在网上搜了一下“GIS开源”发现还不少,下面是一个介绍GIS开源项目的链接: http://www.yuanma.org/data/2008/0526/article_3048.htm 里面介绍了基于各种编程语言的GIS开源项目,并列出了各自的特点和官网链接. 由于在学校时候学的一直都是C#和Visual Stud