也讲Java NIO
一点开场白
百度搜索java nio,前面的几个帖子总是从各种基础概念介绍起,通道、缓冲区、选择器… 然后看着看着就晕了,所以,经过一晚上的研究,我想从自己的理解讲讲nio。
一、单线程的通信
在没有nio之前,java妥妥的可以进行CS项目间的通信,来个最简单的例子。(懒得写,抄了段)
server 端
package nio.nonio;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
public class SimpleServer {
public static int PORT = 8080;
public static void main(String[] agrs) throws IOException {
ServerSocket s = null;
Socket socket = null;
BufferedReader br = null;
PrintWriter pw = null;
// 设定服务端的端口号
s = new ServerSocket(PORT);
System.out.println("ServerSocket Start:" + s);
while (true) {
try {
// 等待请求,此方法会一直阻塞,直到获得请求才往下走
System.out.println("waiting for connection...");
socket = s.accept();
System.out.println("Connection accept socket:" + socket);
// 用于接收客户端发来的请求
br = new BufferedReader(new InputStreamReader(
socket.getInputStream()));
pw = new PrintWriter(new BufferedWriter(new OutputStreamWriter(
socket.getOutputStream())), true);
while (true) {
String str = br.readLine();
System.out.println("Client Socket Message:" + str);
if (str.equals("END")) {
break;
}
Thread.sleep(1000);
pw.println("Message Received");
pw.flush();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("Close.");
try {
br.close();
pw.close();
socket.close();
} catch (Exception e2) {
}
}
}
}
}
Client端
package nio.nonio;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.Socket;
public class SimpleClient {
public static void main(String[] args) {
Socket socket = null;
BufferedReader br = null;
PrintWriter pw = null;
try {
// 客户端socket指定服务器的地址和端口号
socket = new Socket("127.0.0.1", SimpleServer.PORT);
System.out.println("Socket=" + socket);
// 同服务器原理一样
br = new BufferedReader(new InputStreamReader(
socket.getInputStream()));
pw = new PrintWriter(new BufferedWriter(new OutputStreamWriter(
socket.getOutputStream())));
for (int i = 0; i < 10; i++) {
pw.println("howdy " + i);
pw.flush();
String str = br.readLine();
System.out.println(str);
}
pw.println("END");
pw.flush();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
System.out.println("close......");
br.close();
pw.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
其实,以上将是最简单的一个服务端和客户端通信的例子。服务端和客户端分别是用一个线程处理。服务端在while 循环中,会调用s.accept();若没有连接请求,则一直阻塞,直到有连接接入,则不断读取信息,直接读到END,才终止连接,然后继续等待下一个连接。
这种方式,服务端一次只能够处理一个连接,若有多个客户端要接入,则需要排队等待。为了能够快速响应客户端的请求,所以,有了服务端的多线程处理。
二、多线程的通信方式
每次accept一个连接请求后,就建立一个线程来处理。这样,就不用等到本次连接数据读取完毕并关闭后,再继续处理下个连接了。如下图:
代码是这样的。
package nio.multithread;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
public class MultiThreadServer {
public static int PORT = 8080;
void start() throws IOException {
ServerSocket s = null;
// 设定服务端的端口号
s = new ServerSocket(PORT);
System.out.println("ServerSocket Start:" + s);
while (true) {
try {
// 等待请求,此方法会一直阻塞,直到获得请求才往下走
System.out.println("waiting for connection...");
Socket socket = null;
socket = s.accept();
new Thread(new MessageHandler(socket)).start();
System.out.println("Connection accept socket:" + socket);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] agrs) throws IOException {
new MultiThreadServer().start();
}
class MessageHandler implements Runnable {
private Socket socket;
public MessageHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader br = null;
PrintWriter pw = null;
// 用于接收客户端发来的请求
try {
br = new BufferedReader(new InputStreamReader(
socket.getInputStream()));
pw = new PrintWriter(new BufferedWriter(new OutputStreamWriter(
socket.getOutputStream())), true);
while (true) {
String str = br.readLine();
System.out.println("Client Socket Message:" + str);
pw.println("Message Received");
if (str.equals("END")) {
break;
}
Thread.sleep(1000);
pw.flush();
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("Close.");
try {
br.close();
pw.close();
socket.close();
} catch (Exception e2) {
}
}
}
}
}
与之前的方式相比,有了巨大的飞跃,但如果有成千上万个客户端同时与服务器通信,岂不是要建立成千上万个线程?内存吃得消么?不断地创建新的线程,CPU岂不是大量时间要耗费在建新线程中了?所以,这种方式只适合处理有少量客户端同时连接的情况。
三、仅使用一个线程来处理所有的通信?
在单线程情况下,只能支持一个线程处理的原因是,accept一个连接后,一个线程只能够与一个客户端通信。如果有一个线程能够负责与所有的客户端通信,是不是可以解决太多线程的尴尬局面。
于是,我尝试用一个LinkedBlockingQueue存下所有被accept的连接,在一个线程中逐个去处理与这些连接的通信,于是有了下面的代码。
package nio.mynio;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class SingleThreadServer {
public static int PORT = 8080;
void start() throws IOException {
ServerSocket s = null;
// 设定服务端的端口号
s = new ServerSocket(PORT);
BlockingQueue<Socket> sockets = new LinkedBlockingQueue<Socket>();
new Thread(new MessageHandler(sockets)).start();
System.out.println("ServerSocket Start:" + s);
while (true) {
try {
// 等待请求,此方法会一直阻塞,直到获得请求才往下走
System.out.println("waiting for connection...");
Socket socket = null;
socket = s.accept();
sockets.add(socket);
System.out.println("Connection accept socket:" + socket);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] agrs) throws IOException {
new SingleThreadServer().start();
}
class MessageHandler implements Runnable {
private BlockingQueue<Socket> sockets;
public MessageHandler(BlockingQueue<Socket> sockets) {
this.sockets = sockets;
}
@Override
public void run() {
BufferedReader br = null;
PrintWriter pw = null;
// 用于接收客户端发来的请求
while (true) {
Socket socket = null;
try {
System.out.println(sockets.isEmpty());
socket = sockets.take();
System.out.println("take:");
br = new BufferedReader(new InputStreamReader(
socket.getInputStream()));
pw = new PrintWriter(new BufferedWriter(
new OutputStreamWriter(socket.getOutputStream())),
true);
while (true) {
String str = br.readLine();
System.out.println("Client Socket Message:" + str);
pw.println("Message Received");
if (str.equals("END")) {
break;
}
Thread.sleep(1000);
pw.flush();
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("Close.");
try {
br.close();
pw.close();
if (socket != null && !socket.isClosed())
socket.close();
} catch (Exception e2) {
}
}
}
}
}
}
然而,即便这么做,本质上与单线程没差,若同时又多个客户端接入,是要等上一个连接的数据读完之后,才进行下一个连接的数据处理,所以,这种方式是自欺欺人罢了。究其原因,是因为在调用输入流br.readLine()进行输入时,若没有数据,当前线程会阻塞直到有新的数据,所以,在一个连接没关闭前,是无法进行下一个连接数据的读取的,即便该连接目前并没有数据传输。
那么,若有一个方法如br.readLineNoWait(),若当前没有数据时,返回一个特定值(如-1),而不是阻塞当前状态,我们就可以在这个线程中通过循环读取各个连接来读取数据了。又或者,若有一种机制能够预先将传入的数据读取并放在某个内存区域,并通知我们数据到达了,我们直接从这块区域获取数据而非直接用流来获取,也能够避免等待IO的过程(其实类似于使用channel而非stream来读数据)。
至此,我们可以揣测下NIO的实现方式。
四、java 的nio
第三点中,我想通过一个无阻塞的方式去读inpustream中的数据,这种方式的思路是通过一个线程在循环中不断尝试pull数据来实现对所有连接数据的读入。
nio则是通过订阅-发布模式,首先,我们先新建一个选择器(selector),然后将channel(可以理解为一个连接)注册到选择器中,并且告诉他,当有可读的数据(或者其他事件)时通知我。然后,用一个单独的线程去处理所有订阅的事件,就可以了。
selector的作用,他会帮你管理多个通道(比如多个端口的监控),以及每个通道中的连接(这样能够处理长连接的情况)。
代码如下:
package nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
/**
* TCP/IP的NIO非阻塞方式 服务器端
* */
public class Server implements Runnable {
// 第一个端口
private Integer port1 = 8099;
// 第二个端口
private Integer port2 = 9099;
// 第一个服务器通道 服务A
private ServerSocketChannel serversocket1;
// 第二个服务器通道 服务B
private ServerSocketChannel serversocket2;
// 连接1
private SocketChannel clientchannel1;
// 连接2
private SocketChannel clientchannel2;
// 选择器,主要用来监控各个通道的事件
private Selector selector;
// 缓冲区
private ByteBuffer buf = ByteBuffer.allocate(512);
Set<SocketChannel> channelSet = new HashSet<SocketChannel>();
public Server() {
init();
}
/**
* 这个method的作用 1:是初始化选择器 2:打开两个通道 3:给通道上绑定一个socket 4:将选择器注册到通道上
* */
public void init() {
try {
// 创建选择器
this.selector = SelectorProvider.provider().openSelector();
// 打开第一个服务器通道
this.serversocket1 = ServerSocketChannel.open();
// 告诉程序现在不是阻塞方式的
this.serversocket1.configureBlocking(false);
// 获取现在与该通道关联的套接字
this.serversocket1.socket().bind(
new InetSocketAddress("localhost", this.port1));
// 将选择器注册到通道上,返回一个选择键
// OP_ACCEPT用于套接字接受操作的操作集位
this.serversocket1.register(this.selector, SelectionKey.OP_ACCEPT);
// 然后初始化第二个服务端
this.serversocket2 = ServerSocketChannel.open();
this.serversocket2.configureBlocking(false);
this.serversocket2.socket().bind(
new InetSocketAddress("localhost", this.port2));
this.serversocket2.register(this.selector, SelectionKey.OP_ACCEPT);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 这个方法是连接 客户端连接服务器
*
* @throws IOException
* */
public void accept(SelectionKey key) throws IOException {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// SocketChannel socketChannel = server.accept();
// channelSet.add(socketChannel);
if (server.equals(serversocket1)) {
clientchannel1 = server.accept();
System.out.println("連接了!");
clientchannel1.configureBlocking(false);
// OP_READ用于读取操作的操作集位
clientchannel1.register(this.selector, SelectionKey.OP_READ);
} else {
clientchannel2 = server.accept();
clientchannel2.configureBlocking(false);
// OP_READ用于读取操作的操作集位
clientchannel2.register(this.selector, SelectionKey.OP_READ);
}
}
/**
* 从通道中读取数据 并且判断是给那个服务通道的
*
* @throws IOException
* */
public void read(SelectionKey key) throws IOException {
this.buf.clear();
// 通过选择键来找到之前注册的通道
// 但是这里注册的是ServerSocketChannel为什么会返回一个SocketChannel??
SocketChannel channel = (SocketChannel) key.channel();
// 从通道里面读取数据到缓冲区并返回读取字节数
System.out.println(channel.isConnected());
System.out.println(channel.isOpen());
int count = -1;
try {
count = channel.read(this.buf);
} catch (IOException e) {
System.out.println("异常,关闭连接");
} finally {
if (count == -1) {
// 取消这个通道的注册
System.out.println("取消这个通道的注册");
key.channel().close();
key.cancel();
return;
}
}
// 将数据从缓冲区中拿出来
String input = new String(this.buf.array()).trim();
// 那么现在判断是连接的那种服务
if (channel.equals(this.clientchannel1)) {
System.out.println("欢迎您使用服务A");
System.out.println("您的输入为:" + input);
} else {
System.out.println("欢迎您使用服务B");
System.out.println("您的输入为:" + input);
}
}
@Override
public void run() {
while (true) {
try {
System.out.println("running ... ");
// 选择一组键,其相应的通道已为 I/O 操作准备就绪。
this.selector.select();
// 返回此选择器的已选择键集
// public abstract Set<SelectionKey> selectedKeys()
Iterator selectorKeys = this.selector.selectedKeys().iterator();
while (selectorKeys.hasNext()) {
System.out.println("running2 ... ");
// 这里找到当前的选择键
SelectionKey key = (SelectionKey) selectorKeys.next();
// 然后将它从返回键队列中删除
selectorKeys.remove();
if (!key.isValid()) { // 选择键无效
continue;
}
if (key.isAcceptable()) {
// 如果遇到请求那么就响应
this.accept(key);
} else if (key.isReadable()) {
// 读取客户端的数据
System.out.println("isReadable");
this.read(key);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Server server = new Server();
Thread thread = new Thread(server);
thread.start();
}
}
package nio;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* TCP/IP的NIO非阻塞方式 客户端
* */
public class Client {
// 创建缓冲区
private ByteBuffer buffer = ByteBuffer.allocate(512);
// 访问服务器
public void query(String host, int port) throws IOException {
InetSocketAddress address = new InetSocketAddress(
InetAddress.getByName(host), port);
SocketChannel socket = null;
byte[] bytes = new byte[512];
socket = SocketChannel.open();
socket.connect(address);
int i = 1;
try {
while (true) {
System.in.read(bytes);
if ("close".equals(bytes)) {
socket.close();
}
buffer.clear();
buffer.put(bytes);
buffer.flip();
socket.write(buffer);
buffer.clear();
if (i++ == 2)
break;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("finally close");
if (socket != null) {
socket.close();
}
}
}
public static void main(String[] args) throws IOException {
new Client().query("localhost", 8099);
// new Client().query("localhost", 9099);
}
}
看到一个总结感觉很形象,贴出来:
Channel 通道
Buffer 缓冲区
Selector 选择器
其中Channel对应以前的流,Buffer不是什么新东西,Selector是因为nio可以使用异步的非堵塞模式才加入的东西。
以前的流总是堵塞的,一个线程只要对它进行操作,其它操作就会被堵塞,也就相当于水管没有阀门,你伸手接水的时候,不管水到了没有,你就都只能耗在接水(流)上。
nio的Channel的加入,相当于增加了水龙头(有阀门),虽然一个时刻也只能接一个水管的水,但依赖轮换策略,在水量不大的时候,各个水管里流出来的水,都可以得到妥善接纳,这个关键之处就是增加了一个接水工,也就是Selector,他负责协调,也就是看哪根水管有水了的话,在当前水管的水接到一定程度的时候,就切换一下:临时关上当前水龙头,试着打开另一个水龙头(看看有没有水)。
当其他人需要用水的时候,不是直接去接水,而是事前提了一个水桶给接水工,这个水桶就是Buffer。也就是,其他人虽然也可能要等,但不会在现场等,而是回家等,可以做其它事去,水接满了,接水工会通知他们。
这其实也是非常接近当前社会分工细化的现实,也是统分利用现有资源达到并发效果的一种很经济的手段,而不是动不动就来个并行处理,虽然那样是最简单的,但也是最浪费资源的方式。
参考