基于Java自身技术实现消息方式的系统间通信
基于Java自身包实现消息方式的系统间通信的方式有:TCP/IP+BIO、TCP/IP+NIO、UDP/IP+BIO以及UDP/IP+NIO 4种,下面分别介绍如何实现这4种方式的系统间通信。
TCP/IP+BIO
在Java中可基于Socket、ServerSocket来实现TCP/IP+BIO的系统间通信。
Socket主要用于实现建立连接及网络IO的操作,ServerSocket主要用于实现服务器端端口的监听及Socket对象的获取。
1、基于Socket实现客户端的关键代码如下:
// 创建连接,如果域名解析不了会抛出UnknownHostException,当连接不上时会抛出IOException,如果希望控制建立连接的超时,可先调用new Socket(),然后调用socket.connect(SocketAddress类型的目标地址,以毫秒为单位的超时时间)
Socket socket=new Socket(目标IP或域名,目标端口);
// 创建读取服务器端返回流的BufferedReader
BufferedReader in=new BufferedReader(new InputStreamReader(socket.getInputStream()));
// 创建向服务器写入流的PrintWriter
PrintWriter out=new PrintWriter(socket.getOutputStream(),true);
// 向服务器发送字符串信息,要注意的是,此处即使写失败也不会抛出异常信息,并且一直会阻塞到写入操作系统或网络IO出现异常为止
out.println(“hello”);
// 阻塞读取服务端的返回信息,以下代码会阻塞到服务端返回信息或网络IO出现异常为止,如果希望在超过一段时间后就不阻塞了,那么要在创建Socket对象后调用socket.setSoTimeout(以毫秒为单位的超时时间)
in.readLine();
2、服务器端关键代码如下:
// 创建对本地指定端口的监听,如端口冲突则抛出SocketException,其他网络IO方面的异常则抛出IOException
ServerSocket ss=new ServerSocket(监听的端口)
// 接受客户端建立连接的请求,并返回Socket对象,以便和客户端进行交互,交互的方式和客户端相同,也是通过Socket.getInputStream和Socket.getOutputStream来进行读写操作,此方法会一直阻塞到有客户端发送建立连接的请求,如果希望此方法最多阻塞一定的时间,则要在创建ServerSocket后调用其setSoTimeout(以毫秒为单位的超时时间)
Socket socket=ss.accept();
3、该模式的性能分析
上面是基于Socket、ServerSocket实现的一个简单的系统间通信的例子。而在实际的系统中,通常要面对的是客户端同时要发送多个请求到服务器端,服务器端则同时要接受多个连接发送的请求,上面的代码显然是无法满足的。
为了满足客户端能同时发送多个请求到服务器端,最简单的方法就是生成多个Socket。
但这里会产生两个问题:
a)是生成太多的Socket会消耗过多的本地资源,在客户端机器多,服务器端机器少的情况下,客户端生成太多、Socket会导致服务器端须要支撑非常高的连接数;
b)是生成Socket(建立连接)通常是比较慢的,因此频繁地创建会导致系统性能不足。
4、连接池
鉴于这两个问题,通常采用连接池的方式来维护Socket是比较好的,一方面限制了能创建的Socket的个数;另一方面由于将Socket放入了池中,避免了重复创建Socket带来的性能下降问题。数据库连接池就是这种方式的典型代表。
5、连接池应该注意的问题
但连接池的方式会带来另一个问题,连接池中的Socket个数是有限的,但同时要用Socket的请求可能会很多,在这种情况下就会造成激烈的竞争和等待;还有一个需要注意的问题是合理控制等待响应的超时时间,如不设定超时会导致当服务器端处理变慢时,客户端相关的请求都在做无限的等待,而客户端的资源必然是有限的。
因此这种情况下很容易造成当服务器端出现问题时,客户端挂掉的现象。超时时间具体设置为多少取决于客户端能承受的请求量及服务器端的处理时间。既要保证性能,又要保证出错率不会过高,对于直接基于TCP/IP+BIO的方式,可采用Socket.setSoTimeout
来设置等待响应的超时时间。
为了满足服务器端能同时接受多个连接发送的请求,通常采用的方法是在accept获取Socket后,将此Socket放入一个线程中处理,通常将此方式称为一连接一线程。这样服务器端就可接受多个连接发送请求了,这种方式的缺点是无论连接上是否有真实的请求,都要耗费一个线程。为避免创建过多的线程导致服务器端资源耗尽,须限制创建的线程数量,这就造成了在采用BIO的情况下服务器端所能支撑的连接数是有限的。
TCP/IP+NIO
在Java中可基于java.nio.channels中的Channel和Selector的相关类来实现TCP/IP+NIO方式的系统间通信。Channel有SocketChannel和ServerSocketChannel两种。
SocketChannel用于建立连接、监听事件及操作读写,ServerSocketChannel用于监听端口及监听连接事件;
程序通过Selector来获取是否有要处理的事件。
1、基于这两个类实现客户端的关键代码如下:
SocketChannel channel=SocketChannel.open();
// 设置为非阻塞模式
channel.configureBlocking(false);
//对于非阻塞模式,立刻返回false,表示连接正在建立中
channel.connect(SocketAddress);
Selector selector=Selector.open();
// 向channel注册selector以及感兴趣的连接事件
channel.register(selector,SelectionKey.OP_CONNECT);
// 阻塞至有感兴趣的IO事件发生,或到达超时时间,如果希望一直等至有感兴趣的IO事件发生,可调用无参数的select方法,如果希望不阻塞直接返回目前是否有感兴趣的事件发生,可调用selectNow方法
int nKeys=selector.select(以毫秒为单位的超时时间)
// 如nKeys大于零,说明有感兴趣的IO事件发生
SelectionKey sKey=null;
if(nKeys>0) {
Set<SelectionKey> keys=selector.selectedKeys();
for(SelectionKey key:keys){
// 对于发生连接的事件
if(key.isConnectable()){
SocketChannel sc=(SocketChannel) key.channel();
sc.configureBlocking(false);
// 注册感兴趣的IO读事件,通常不直接注册写事件,在发送缓冲区未满的情况下,一直是可写的,因此如注册了写事件,而又不用写数据,很容易造成CPU消耗100%的现象
sKey = sc.register(selector, SelectionKey.OP_READ);
// 完成连接的建立
sc.finishConnect();
}
// 有流可读取
else if(key.isReadable()){
ByteBuffer buffer=ByteBuffer.allocate(1024);
SocketChannel sc=(SocketChannel) key.channel();
int readBytes=0;
try{
int ret=0;
try{
// 读取目前可读的流,sc.read返回的为成功复制到bytebuffer中的字节数,此步为阻塞操作,值可能为0;当已经是流的结尾时,返回-1
while((ret=sc.read(buffer))>0){
readBytes+=ret;
}
}
finally{
buffer.flip();
}
}
finally{
if(buffer!=null){
buffer.clear();
}
}
}
// 可写入流
else if(key.isWritable()){
// 取消对OP_WRITE事件的注册
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
SocketChannel sc=(SocketChannel) key.channel();
// 此步为阻塞操作,直到写入操作系统发送缓冲区或网络IO出现异常,返回的为成功写入的字节数,当操作系统的发送缓冲区已满,此处会返回0
int writtenedSize=sc.write(ByteBuffer);
// 如未写入,则继续注册感兴趣的OP_WRITE事件
if(writtenedSize==0){
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
}
}
}
selector.selectedKeys().clear();
}
// 对于要写入的流,可直接调用channel.write来完成,只有在写入未成功时才要注册OP_WRITE事件
int wSize=channel.write(ByteBuffer);
if(wSize==0){
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
}
从上可见,NIO是典型的Reactor模式的实现,通过注册感兴趣的事件及扫描是否有感兴趣的事件发生,从而做相应的动作。
2、服务器端关键代码如下:
ServerSocketChannel ssc=ServerSocketChannel.open();
ServerSocket serverSocket=ssc.socket();
// 绑定要监听的端口
serverSocket.bind(new InetSocketAddress(port));
ssc.configureBlocking(false);
// 注册感兴趣的连接建立事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
之后则可采取和客户端同样的方式对selector.select进行轮询,只是要增加一个对key.isAcceptable的处理,代码如下:
if(key.isAcceptable()){
ServerSocketChannel server=(ServerSocketChannel)key.channel();
SocketChannel sc=server.accept();
if(sc==null){
continue;
}
sc.configureBlocking(false);
sc.register(selector,SelectionKey.OP_READ);
}
上面只是基于TCP/IP+NIO实现的一个简单例子,同样来看看基于TCP/IP+NIO如何支撑客户端同时发送多个请求及服务器端接受多个连接发送的请求。
3、该模式性能分析
对于客户端发送多个请求的需求,采用TCP/IP+NIO和采用TCP/IP+BIO的方式没有任何不同。但NIO方式可做到不阻塞,因此如果服务器端返回的响应能带上请求标识,那么客户端则可采用连接复用的方式,即每SocketChannel在发送消息后,不用等响应即可继续发送其他消息,这种方式可降低连接池带来的资源争抢的问题,从而提升系统性能;
对于连接不复用的情况,可基于Socket.setSoTimeout`的方式来控制同步请求的超时;
对于连接复用的情况,同步请求的超时可基于BlockingQueue、对象的wait/notify机制或Future机制来实现。
对于服务器端接受多个连接请求的需求,通常采用的是由一个线程来监听连接的事件,另一个或多个线程来监听网络流读写的事件。
当有实际的网络流读写事件发生后,再放入线程池中处理。这种方式比TCP/IP+BIO的好处在于可接受很多的连接,而这些连接只在有真实的请求时才会创建线程来处理,这种方式通常又称为一请求一线程。当连接数不多,或连接数较多,且连接上的请求发送非常频繁时,TCP/IP+NIO的方式不会带来太大的优势,但在实际的场景中,通常是服务器端要支持大量的连接数,但这些连接同时发送的请求并不会非常多。
在基于Sun JDK开发Java NIO程序时,尤其要注意selector.select抛出IOException异常的处理及selector.select不阻塞就直接返回的情况。这两种状况都有可能造成CPU消耗达到100%,对于selector.select抛出IOException的状况,可以采用绕过的方法为捕捉异常并将当前Thread sleep一段时间,或是重新创建Selector。为避免selector.select不阻塞就直接返回,可采用bug库中提到的修改建议。
从上面可以看出,对于高访问量的系统而言,TCP/IP+NIO方式结合一定的改造在客户端能够带来更高的性能,在服务器端能支撑更高的连接数。
UDP/IP+BIO
Java对UDP/IP方式的网络数据传输同样采用Socket机制,只是UDP/IP下的Socket没有建立连接的要求,由于UDP/IP是无连接的,因此无法进行双向的通信。这也就要求如果要双向通信的话,必须两端都成为UDP Server。
在Java中可基于DatagramSocket和DatagramPacket
来实现UDP/IP+BIO方式的系统间通信,DatagramSocket负责监听端口及读写数据。DatagramPacket作为数据流对象进行传输。
1、基于这两个类实现客户端的关键代码如下:
// 由于UDP/IP是无连接的,如果希望双向通信,就必须启动一个监听端口,承担服务器的职责,如不能绑定到指定端口,则抛出SocketException
DatagramSocket serverSocket=new DatagramSocket(监听的端口);
byte[] buffer=new byte[65507];
DatagramPacket receivePacket=new DatagramPacket(buffer,buffer.length);
DatagramSocket socket=new DatagramSocket();
DatagramPacket packet=new DatagramPacket(datas,datas.length,server,port);
// 阻塞发送packet到指定的服务器和端口,当出现网络IO异常时抛出IOException,当连不上目标地址和端口时,抛出PortUnreachableException
socket.send(packet)
// 阻塞并同步读取流信息,如接收到的流信息比packet长度长,则删除更长的信息,可通过调用DatagramSocket.setSoTimeout(以毫秒为单位的超时时间)来设置读取流的超时时间
serverSocket.receive(receivePacket)
服务器端代码和客户端代码的结构基本一致,这里就不列了。
由于UDP/IP通信的两端不建立连接,就不会有TCP/IP通信连接竞争的问题,只是最终读写流的动作是同步的。
对于服务器端同时接收多请求的需求,通常采取每接收到一个packet就放入一个线程中进行处理的方式来实现。
UDP/IP+NIO
在Java中可通过DatagramChannel和ByteBuffer来实现UDP/IP+NIO方式的系统间通信,DatagramChannel负责监听端口及进行读写,ByteBuffer则用于数据流传输。
1、基于这两个类实现客户端的关键代码如下:
DatagramChannel receiveChannel=DatagramChannel.open();
receiveChannel.configureBlocking(false);
DatagramSocket socket=receiveChannel.socket();
socket.bind(new InetSocketAddress(rport));
Selector selector=Selector.open();
receiveChannel.register(selector, SelectionKey.OP_READ);
之后即可采取和TCP/IP+NIO中对selector遍历一样的方式进行流信息的读取。
DatagramChannel sendChannel=DatagramChannel.open();
sendChannel.configureBlocking(false);
SocketAddress target=new InetSocketAddress("127.0.0.1",sport);
sendChannel.connect(target);
// 阻塞写入流,如发送缓冲区已满,则返回0,此时可通过注册SelectionKey.OP_WRITE事件,以便在可写入时再进行写操作,方式和TCP/IP+NIO基本一致
sendChannel.write(ByteBuffer);
服务端代码和客户端代码基本一致,就不再一一描述。
从以上代码来看,对于UDP/IP方式,NIO带来的好处是只在有流要读取或可写入流时才做相应的IO操作,而不用像BIO方式直接阻塞当前线程。
多播协议实现通信
以上列举了基于Java包实现一对一(客户端和服务器端一对一的”交流“)的系统间通信的方式,在实际的场景中,通常还会要将消息发送到多台机器,此时可以选择为每个目标机器建立一个连接,这种方式对于发送消息端会造成很大的网络流量压力。例如传输的消息是视频数据的场景,在网络协议上还有一个基于UDP/IP扩展出来的多播协议,多播协议的传输方式是一份数据在网络上进行传输,而不是由发送者给每个接收者都传一份数据,这样,网络的流量就大幅度下降了。
在Java中可基于MulticastSocket和DatagramPacket来实现多播网络通信,MulticastSocket是基于DatagramSocket派生出来的类,其作用即为基于UDP/IP实现多播方式的网络通信。在多播通信中,不同的地方在于接收数据端通过加入到多播组来进行数据的接收,同样发送数据也要求加入到多播组进行发送,多播的目标地址具有指定的地址范围,在224.0.0.0和239.255.255.255之间。
1、基于多播方式实现网络通信的服务器端关键代码如下:
// 组播地址
InetAddress groupAddress=InetAddress.getByName("224.1.1.1");
MulticastSocket server=new MulticastSocket(port);
// 加入组播,如地址为非组播地址,则抛出IOException,当已经不希望再发送数据到组播地址,或不希望再读取数据时,可调用server.leaveGroup(组播地址)
server.joinGroup(groupAddress);
MulticastSocket client=new MulticastSocket();
client.joinGroup(groupAddress);
之后则可和UDP/IP+BIO一样通过receive和send方法来进行读写操作。Client端代码和服务端代码基本一致,就不再列举了。
在Java应用中,多播通常用于多台机器的状态的同步。例如JGroups,默认基于UDP/IP多播协议,由于UDP/IP协议在数据传输时不够可靠,对于可靠性要求很高的系统,会希望采用多播方式,同时又要做到可靠。对于这样的需求,业界提出了一些能够确保可靠实现多播的方式:SRM(Scalable Reliable Multicast)、URGCP(Uniform Reliable Group Communication Protocol),其中SRM是在UDP/IP多播的基础上增加了确认机制,从而保证可靠,eBay采用了SRM框架来实现将数据从主数据库同步到各个搜索节点机器。
从上面的介绍来看,使用Java包来实现基于消息方式的系统间通信还是比较麻烦。为了让开发人员能更加专注对数据进行业务处理,而不用过多关注纯技术细节,开源业界诞生了很多优秀的基于以上各种协议的系统间通信的框架。这其中的佼佼者就是Mina了。