当服务器端的server Channel绑定某个端口之后,就可以处理来自客户端的连接请求,而且在构建 NioServerSocketChannelFactory
的时候已经生成了对应的 BossPool 和 WorkerPool,前者管理的 NioServerBoss 就是专门用来接受客户端连接的Selector封装,当,下面是关键的代码:
1. AbstractNioSelector中的run方法代表这个boss thread的核心工作。
public void run()
{
thread =
Thread. currentThread();
// 打开闭锁;
startupLatch.countDown();
int selectReturnsImmediately
= 0;
Selector selector = this. selector;
if (selector
== null) {
return;
}
// use 80% of the timeout for measure
final long minSelectTimeout
= SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
boolean wakenupFromLoop
= false;
for (;;)
{
wakenUp.set( false);
try {
long beforeSelect
= System. nanoTime();
// 返回有多少个Channel准备好了
int selected
= select(selector);
if (SelectorUtil. EPOLL_BUG_WORKAROUND &&
selected == 0 && !wakenupFromLoop && !wakenUp .get()) {
//上述select阻塞的时间;
long timeBlocked
= System. nanoTime() - beforeSelect;
//如果小于最小超时时间限制;
if (timeBlocked
< minSelectTimeout) {
boolean notConnected
= false;
// loop over all keys as the selector may was unblocked because
of a closed channel
for (SelectionKey
key: selector.keys()) {
SelectableChannel ch = key.channel();
try {
if (ch instanceof DatagramChannel
&& !ch.isOpen() ||
ch instanceof SocketChannel
&& !((SocketChannel) ch).isConnected()) {
notConnected = true;
// cancel the key just to be on the safe side
key.cancel();
}
} catch (CancelledKeyException
e) {
// ignore
}
}
if (notConnected)
{
selectReturnsImmediately = 0;
} else {
//在超时限制之前就返回,并且返回的结果是0,这或许是导致 jdk epoll bug的原因,累积。
selectReturnsImmediately ++;
}
} else {
// 是超时。
selectReturnsImmediately = 0;
}
// 这是jdk epoll bug,所以需要替换掉这个Selector!!!
//然后重新下一轮的select处理。
if (selectReturnsImmediately
== 1024) {
// The selector returned immediately for 10 times in a row,
// so recreate one selector as it seems like we hit the
// famous epoll (..) jdk bug.
rebuildSelector();
selector = this. selector;
selectReturnsImmediately = 0;
wakenupFromLoop = false;
// try to select again
continue;
}
} else {
// reset counter
selectReturnsImmediately = 0;
}
/**
* 在调用selector.wakeup()之前总是先执行wakenUp.compareAndSet(false, true),
* 来减小wake -up的开销,因为Selector.wakeup()执行的代价很大。
* 然后这种方法存在一种竟态条件,发生在如果把 wakenUp 设置为true太早的时候:
* 1) Selecttor在‘wakenUp.set(false)‘和‘selector.select(...)‘之间醒来(BAD);
* 2)在‘selector.select(...)‘和‘if (wakenUp.get()) { ... }‘醒来时OK的。
* 在第一种情况下,‘wakenUp‘被置为了true,但是没有对那个select生效,所以他会让接下来的那个
* ‘selector.select(...)‘立即醒来。直到在下一轮循环当中‘wakenUp‘ 被再次置为FALSE的时候,
* 那么 ‘wakenUp.compareAndSet(false, true)‘就会失败,任何想惊醒Selector的尝试都会失败,
* 导致接下来的‘selector.select(...)‘方法无谓的阻塞。
*
* 为了解决这个问题,就在selector.select(...)之后,判断wakenUp是true的时候,立即调用一次
* selector.wakeup()。
* 对这两种情况来说,惊醒selector的操作都是低效的。
*/
if ( wakenUp.get())
{
wakenupFromLoop = true;
selector.wakeup();
} else {
wakenupFromLoop = false;
}
cancelledKeys =
0;
processTaskQueue(); // 处理任务
selector = this. selector; //
processTaskQueue() can call rebuildSelector()
if ( shutdown)
{
this. selector = null;
// process one time again
processTaskQueue();
for (SelectionKey
k: selector.keys()) {
close(k);
}
try {
// 要关闭Selector;
selector.close();
} catch (IOException
e) {
logger.warn( "Failed
to close a selector.", e);
}
// 打开这个闭锁;
shutdownLatch.countDown();
break;
} else {
//核心的过程,有具体的NioSelector来实现
process(selector);
}
} catch (Throwable
t) {
logger.warn(
"Unexpected exception in the selector loop.",
t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread. sleep(1000);
} catch (InterruptedException
e) {
// Ignore.
}
}
}
}
2. 具体的流程处理在具体的 NioServerBoss 中,具体的处理连接请求。
protected void process(Selector
selector) {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (selectedKeys.isEmpty())
{
return;
}
for (Iterator<SelectionKey>
i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
i.remove(); //
// 得到监听套接字通道
NioServerSocketChannel channel = (NioServerSocketChannel) k.attachment();
try {
// accept connections in a for loop until no new connection is ready
for (;;)
{
SocketChannel acceptedSocket = channel.socket .accept();
// 非阻塞模式
if (acceptedSocket
== null) {
break;
}
// 有连接请求的到来,那么就分发给worker处理。
registerAcceptedChannel(channel, acceptedSocket, thread );
}
} catch (CancelledKeyException
e) {
// Raised by accept() when the server socket was closed.
k.cancel();
channel.close();
} catch (SocketTimeoutException
e) {
// Thrown every second to get ClosedChannelException
// raised.
} catch (ClosedChannelException
e) {
// Closed as requested.
} catch (Throwable
t) {
if ( logger.isWarnEnabled())
{
logger.warn( "Failed
to accept a connection.", t);
}
try {
Thread. sleep(1000);
} catch (InterruptedException
e1) {
// Ignore
}
}
}
}
3. 把这个连接套接字分发给一个worker pool中下一个worker来处理。
private static void registerAcceptedChannel(NioServerSocketChannel
parent, SocketChannel acceptedSocket,
Thread currentThread) {
try {
ChannelSink sink = parent.getPipeline().getSink();
ChannelPipeline pipeline = parent.getConfig().getPipelineFactory().getPipeline();
//安排一个线程来处理这个连接通道 acceptedSocket
NioWorker worker = parent. workerPool.nextWorker();
worker.register( new NioAcceptedSocketChannel(
parent.getFactory(), pipeline, parent, sink
, acceptedSocket,
worker, currentThread), null);
} catch (Exception
e) {
if ( logger.isWarnEnabled())
{
logger.warn( "Failed
to initialize an accepted socket.", e);
}
try {
acceptedSocket.close();
} catch (IOException
e2) {
if ( logger.isWarnEnabled())
{
logger.warn( "Failed
to close a partially accepted socket.",e2);
}
}
}
}
4. 接下来就是每个worker处理一个连接的读写服务,NioWorker中的 process 负责这些工作。
protected void process(Selector
selector) throws IOException {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
//如果集合为空就立即返回而不是每次创建迭代器,却无事可做。
if (selectedKeys.isEmpty())
{
return;
}
for (Iterator<SelectionKey>
i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
i.remove();
try {
//获取这个SelectionKey的就绪操作集合。
int readyOps
= k.readyOps();
if ((readyOps
& SelectionKey. OP_READ) != 0 || readyOps == 0) {
if (!read(k))
{
// Connection already closed - no need to handle write.
continue;
}
}
if ((readyOps
& SelectionKey. OP_WRITE) != 0) {
writeFromSelectorLoop(k);
}
} catch (CancelledKeyException
e) {
close(k);
}
if (cleanUpCancelledKeys())
{
break; //
break the loop to avoid ConcurrentModificationException
}
}
}
5.两种任务的层次结构图为: