一、序言
近几个月一直从事一个分布式异步通信系统,今天就整理并blog一下.
这是一个全国性的通信平台,对性能,海量数据,容错性以及扩展性有非常高的要求,所以在系统的架构上就不能简单的采用集中式.简单的总结一下就是:
1.数据分布式存储
2.请求分布式调度
3.多结点分布式部署
4.双重备份,热切换
系统的核心无非就是网络架构,分布式算子和通信,要求如下:
分布式算子:
1.对于任意输入,输出均匀分布
2.输出结果数可控
通信:
1.高并发量
2.多线程
分布式算子我们选择的是sun公司的hash函数,通信用的则是cindy socket通信.网络架构以及具体的描述会在后面的blog中逐步给出.
二、网络架构
整个系统的架构如图所示,包括四层,每一层可以由若干结点来对数据和请求分流:
1.接口服务器(Interface Server):
1).对外提供访问接口并接受请求,考虑到HTTP的广泛性,一般内置一个http服务器进程
2).监控各dispatcher server的工作状态
3).转发请求到其中的一个最优dispatcher中,这里的最优性判断以各dispatcher server的工作状态为依据,当然在这一层上不心请求的具体内容可以简单地采用轮询或随机算法.
2.消息分发服务器(Dispatcher Server):
1).接受来自于接口服务器的请求
2).解析请求,提取特征参数(一般是类似于用户帐号之类的东西,一个帐号下的数据会被分布到同一个结点上),然后对该参数执行hash函数,计算出目标数据所在的App Server,然后将请求转发给该App Server.
3).事实上,在实际的项目中的处理比上面的介绍要更复杂一些,但伸缩性大大加强了.
3.应用服务器(App Server):
1).执行业务逻辑,等同于集中式系统中的应用服务器,已经不存在分布式的特征了.所处理的数据就是自己数据库中的数据,与网络上的其他结点无关.
2).被划分为多个逻辑组(group),同一个组中的服务器负载均衡
3).考虑到数据库的双重备份,热切换和负载均衡,才用了多数据库单读多写策略.对于读,监控各数据库工作状态,选择一个最优数据库来提供数据;对于写,同时写所有的数据库,因此必须保证操作的事务性.
4.数据库服务器(DB Server):
1).提供数据访问,没什么好说的,对于非事务性数据库需要在App Server层提供辅助措施;
5.结点之间的通信
1).数据(请求,响应,异常)以网络格式异步并发传输
三、分布式算法
接口服务器(Interface Server)和消息分发服务器(Dispatcher Server)在分发请求的策略上有所不同.
辅助函数和变量:
public String[] getTargetServerIps();//目标服务器的ip,如193.243.15.45:8080
public int[] getTargetServerIds();//目标服务器ID,与上述服务器ip一一对应,可以自由配置
public boolean isServerWorking(index);//判断目标服务器的状态
int currentTargetServerIndex=0;//当前的目标服务器在targetServerIds中的index
接口服务器(Interface Server)采用轮询算法:
public String getTargetServerIp(){//获取该次请求所要分发的目标服务器
String[] targetServerIps=getTargetServerIps();
int[] targetServerIds=getTargetServerIds();
int index=currentTargetServerIndex;
boolean isWorking=false;
while(!isWorking){
index=targetServerIds.length()%(currentTargetServerIndex+1);
isWorking=isServerWorking(index);
if(!isWorking&&index==currentTargetServerIndex){//无任何目标服务器可用
return "0:0";
}
}
currentTargetServerIndex=index;
return targetServerIps[index];
}
消息分发服务器(Dispatcher Server)分发请求采用的hash算法
// hash algrithm from JDK‘s String,来自于jdk的hash算法
public int hash(byte[] bs) {
int hash = 0;
for (int i = 0; i < bs.length; i++) {
hash = 31 * hash + bs[i];
}
return hash;
}
public int getTargetServerGroupIndexByHash(String hashParam) throws BtirException {//返回根据hash计算出的目标服务器群组
byte[] hashinfo=hashParam.getBytes("utf-8");
int frameCount=2://由hash值的后两位进行分段的数目 ,即hash结果数,目标服务器群组的数量
int step = 100 / frameCount;
int hash = Math.abs(hash(hashParam) % 100);
for(int i=0, beg=0, end=step; i<frameCount; i++) {
if(beg <= hash && hash < end )
return 2*i;
beg = end;
end += step;
}
return 2*(frameCount-1); //如果设置得好,应该不会走到这里
}
public String getTargetServerIpInGriuo(int groupIndex){//根据轮询算法,计算服务器群组中的最优服务器
String[] targetServerIps=getTargetServerIps();//组内的轮询算法,代码略
int[] targetServerIds=getTargetServerIds();
int index=getTargetServerIndexInGroup(groupIndex);//轮询算法代码略
return targetServerIps[index];
}
四、通信节点设计模型
通信是请求响应的方式,这对于接口服务器,消息分发服务器和应用服务器来说都是一直的,所以三者可以采用一致的模型来描述.
包括两个部分:client和server.这里描述一下二者的结构和网络通信.
client构造并发送请求,在异步系统里可以将构造和发送解偶,如图
RequestBuilder生成Request
将Request投入到Request队列(RequestQueue)中
独立线程RequestScanner扫描Request队列并调用RequestSender发送请求.
针对不同类型的请求可以构造不同的队列和不同的sender,队列中Request的优先级策略可以根据需要来定制.
server接收请求并处理,如图
server接收请求并处理,如图
RequestAccepter接收请求并放入Request队列
独立线程扫描队列并将调用RequestHandler进行处理
RequestHandler处理完毕后返回Response.
client与server之间的通信:
通信协议和技术有很多,如web service,EJB,jms,单这里采用基于java NIO的socket,因为其异步性和高并发量.
采用socket的两个基本标准是:
1.服务器上的线程数可控,切忌与请求数线性增长
2.将处理请求和接收请求分开,否则会降低吞吐率和并法量