Netty3 源码分析 - NIO server接受连接请求过程分析

当服务器端的server Channel绑定某个端口之后,就可以处理来自客户端的连接请求,而且在构建 NioServerSocketChannelFactory
的时候已经生成了对应的 BossPool 和 WorkerPool,前者管理的 NioServerBoss 就是专门用来接受客户端连接的Selector封装,当,下面是关键的代码:

1. AbstractNioSelector中的run方法代表这个boss thread的核心工作。

public void run()
{

thread =
Thread. currentThread();

// 打开闭锁;

startupLatch.countDown();

int selectReturnsImmediately
= 0;

Selector selector = this. selector;

if (selector
== null) {

return;

}

// use 80% of the timeout for measure

final long minSelectTimeout
= SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;

boolean wakenupFromLoop
false;

for (;;)
{

wakenUp.set( false);

try {

long beforeSelect
= System. nanoTime();

// 返回有多少个Channel准备好了

int selected
= select(selector);

if (SelectorUtil. EPOLL_BUG_WORKAROUND &&
selected == 0 && !wakenupFromLoop && !wakenUp .get()) {

//上述select阻塞的时间;

long timeBlocked
= System. nanoTime() - beforeSelect;

//如果小于最小超时时间限制;

if (timeBlocked
< minSelectTimeout) {

boolean notConnected
false;

// loop over all keys as the selector may was unblocked because
of a closed channel

for (SelectionKey
key: selector.keys()) {

SelectableChannel ch = key.channel();

try {

if (ch instanceof DatagramChannel
&& !ch.isOpen() ||

ch instanceof SocketChannel
&& !((SocketChannel) ch).isConnected()) {

notConnected = true;

// cancel the key just to be on the safe side

key.cancel();

}

catch (CancelledKeyException
e) {

// ignore

}

}

if (notConnected)
{

selectReturnsImmediately = 0;

else {

//在超时限制之前就返回,并且返回的结果是0,这或许是导致 jdk epoll bug的原因,累积。

selectReturnsImmediately ++;

}

else {

// 是超时。

selectReturnsImmediately = 0;

}

// 这是jdk epoll bug,所以需要替换掉这个Selector!!!

//然后重新下一轮的select处理。

if (selectReturnsImmediately
== 1024) {

// The selector returned immediately for 10 times in a row,

// so recreate one selector as it seems like we hit the

// famous epoll (..) jdk bug.

rebuildSelector();

selector = this. selector;

selectReturnsImmediately = 0;

wakenupFromLoop = false;

// try to select again

continue;

}

else {

// reset counter

selectReturnsImmediately = 0;

}

/**

* 在调用selector.wakeup()之前总是先执行wakenUp.compareAndSet(false, true),

* 来减小wake -up的开销,因为Selector.wakeup()执行的代价很大。

* 然后这种方法存在一种竟态条件,发生在如果把 wakenUp 设置为true太早的时候:

* 1) Selecttor在‘wakenUp.set(false)‘和‘selector.select(...)‘之间醒来(BAD);

* 2)在‘selector.select(...)‘和‘if (wakenUp.get()) { ... }‘醒来时OK的。

* 在第一种情况下,‘wakenUp‘被置为了true,但是没有对那个select生效,所以他会让接下来的那个

* ‘selector.select(...)‘立即醒来。直到在下一轮循环当中‘wakenUp‘ 被再次置为FALSE的时候,

* 那么 ‘wakenUp.compareAndSet(false, true)‘就会失败,任何想惊醒Selector的尝试都会失败,

* 导致接下来的‘selector.select(...)‘方法无谓的阻塞。

*

* 为了解决这个问题,就在selector.select(...)之后,判断wakenUp是true的时候,立即调用一次

* selector.wakeup()。

* 对这两种情况来说,惊醒selector的操作都是低效的。

*/

if ( wakenUp.get())
{

wakenupFromLoop = true;

selector.wakeup();

else {

wakenupFromLoop = false;

}

cancelledKeys =
0;

processTaskQueue(); // 处理任务

selector = this. selector; //
processTaskQueue() can call rebuildSelector()

if ( shutdown)
{

this. selector = null;

// process one time again

processTaskQueue();

for (SelectionKey
k: selector.keys()) {

close(k);

}

try {

// 要关闭Selector;

selector.close();

catch (IOException
e) {

logger.warn( "Failed
to close a selector.", e);

}

// 打开这个闭锁;

shutdownLatch.countDown();

break;

else {

//核心的过程,有具体的NioSelector来实现

process(selector);

}

catch (Throwable
t) {

logger.warn(

"Unexpected exception in the selector loop.",
t);

// Prevent possible consecutive immediate failures that lead to

// excessive CPU consumption.

try {

Thread. sleep(1000);

catch (InterruptedException
e) {

// Ignore.

}

}

}

}

2. 具体的流程处理在具体的 NioServerBoss 中,具体的处理连接请求。

protected void process(Selector
selector) {

Set<SelectionKey> selectedKeys = selector.selectedKeys();

if (selectedKeys.isEmpty())
{

return;

}

for (Iterator<SelectionKey>
i = selectedKeys.iterator(); i.hasNext();) {

SelectionKey k = i.next();

i.remove(); //

// 得到监听套接字通道

NioServerSocketChannel channel = (NioServerSocketChannel) k.attachment();

try {

// accept connections in a for loop until no new connection is ready

for (;;)
{

SocketChannel acceptedSocket = channel.socket .accept();

// 非阻塞模式

if (acceptedSocket
== null) {

break;

}

// 有连接请求的到来,那么就分发给worker处理。

 registerAcceptedChannel(channel, acceptedSocket, thread );

}

catch (CancelledKeyException
e) {

// Raised by accept() when the server socket was closed.

k.cancel();

channel.close();

catch (SocketTimeoutException
e) {

// Thrown every second to get ClosedChannelException

// raised.

catch (ClosedChannelException
e) {

// Closed as requested.

catch (Throwable
t) {

if ( logger.isWarnEnabled())
{

logger.warn( "Failed
to accept a connection.", t);

}

try {

Thread. sleep(1000);

catch (InterruptedException
e1) {

// Ignore

}

}

}

}

3. 把这个连接套接字分发给一个worker pool中下一个worker来处理。

private static void registerAcceptedChannel(NioServerSocketChannel
parent, SocketChannel acceptedSocket,

Thread currentThread) {

try {

ChannelSink sink = parent.getPipeline().getSink();

ChannelPipeline pipeline =  parent.getConfig().getPipelineFactory().getPipeline();

//安排一个线程来处理这个连接通道 acceptedSocket

NioWorker worker = parent. workerPool.nextWorker();

worker.register( new NioAcceptedSocketChannel(

parent.getFactory(), pipeline, parent, sink

, acceptedSocket,

worker, currentThread), null);

catch (Exception
e) {

if ( logger.isWarnEnabled())
{

logger.warn( "Failed
to initialize an accepted socket.", e);

}

try {

acceptedSocket.close();

catch (IOException
e2) {

if ( logger.isWarnEnabled())
{

logger.warn( "Failed
to close a partially accepted socket.",e2);

}

}

}

}

4. 接下来就是每个worker处理一个连接的读写服务,NioWorker中的 process 负责这些工作。

protected void process(Selector
selector) throws IOException {

Set<SelectionKey> selectedKeys = selector.selectedKeys();

//如果集合为空就立即返回而不是每次创建迭代器,却无事可做。

if (selectedKeys.isEmpty())
{

return;

}

for (Iterator<SelectionKey>
i = selectedKeys.iterator(); i.hasNext();) {

SelectionKey k = i.next();

i.remove();

try {

//获取这个SelectionKey的就绪操作集合。

int readyOps
= k.readyOps();

if ((readyOps
& SelectionKey. OP_READ) != 0 || readyOps == 0) {

if (!read(k))
{

// Connection already closed - no need to handle write.

continue;

}

}

if ((readyOps
& SelectionKey. OP_WRITE) != 0) {

writeFromSelectorLoop(k);

}

catch (CancelledKeyException
e) {

close(k);

}

if (cleanUpCancelledKeys())
{

break; //
break the loop to avoid ConcurrentModificationException

}

}

}

5.两种任务的层次结构图为:

时间: 2024-07-28 22:17:15

Netty3 源码分析 - NIO server接受连接请求过程分析的相关文章

Netty3 源码分析 - NIO server绑定过程分析

一个框架封装的越好,越利于我们快速的coding,但是却掩盖了很多的细节和原理,但是源码能够揭示一切.服务器端代码在指定好ChannelFactory,设定好选项,而后Bootstrap.bind操作就会开启server,接受对端的连接.所以有必要对这后面的过程分析清楚,下图是关键流程.先是构建一个默认的Pipeline,为我们接下来要创建的监听通道服务,这个Pipeline里面会加入一个Binder的上行事件处理器:接下来创建了至关中的NioServerSocketChannel,在构造的过程

Netty3 源码分析 - ChannelUpstreamHandler

Netty3 源码分析 - ChannelUpstreamHandler ChannelUpstreamHandler处理上行的通道事件,并且在流水线中传送事件.这个接口最常用的场景是拦截IO工作现场产生的事件,传输消息或者执行相关的业务逻辑.在大部分情况下,我们是使用SimpleChannelUpstreamHandler 来实现一个具体的upstream handler,因为它为每个事件类型提供了单个的处理方法.大多数情况下ChannelUpstreamHandler 是向上游发送事件,虽然

Kubernetes 源码分析 -- API Server之编解码

--------------------- 作者:weixin_34037977 来源:CSDN 原文:https://blog.csdn.net/weixin_34037977/article/details/87058105 在Kubernetes源码分析-- API Server之API Install篇中,我们了解到K8S可以支持多版本的API,但是Rest API的不同版本中接口的输入输出参数的格式是有差别的,Kubernetes是怎么处理这个问题的呢?另外Kubernetes支持ya

zookeeper源码分析之一服务端处理请求流程

上文: zookeeper源码分析之一服务端启动过程 中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析各自一下消息处理过程: 前文可以看到在 1.在单机情况下NettyServerCnxnFactory中启动ZookeeperServer来处理消息: public synchronized void startup() { if (sessionTracker == null) { createSe

kafka源码分析之一server启动分析

1. 分析kafka源码的目的 深入掌握kafka的内部原理 深入掌握scala运用 2. server的启动 如下所示(本来准备用时序图的,但感觉时序图没有思维图更能反映,故采用了思维图): 2.1 启动入口Kafka.scala 从上面的思维导图,可以看到Kafka的启动入口是Kafka.scala的main()函数: def main(args: Array[String]): Unit = { try { val serverProps = getPropsFromArgs(args)

Netty3 源码分析 - 套接字绑定实现原理

前面关注的地方都是Netty采用的流水线处理方式的组织方式,ChannelHandler如何管理,通道状态,通道事件等这些上层的架构设计,那么Netty中如何实现诸如套接字绑定,连接,关闭等这些底层的操作呢?不能只顾着套用API写程序,却对细节不求甚解.这里大致追踪下OIO模式下Channel中套接字绑定的实现,(NIO以后分析)其实逻辑都是一样的,只是在线程模型的地方时不同的. 大致过程如下(详细的源码注释,看我的github): 1.我们在通过Bootstrap启动客户端或者服务端的时候会提

Netty3 源码分析 - Channel

何为通道(Channel)?代表的是一个网络套接字的连接点(nexus). 一个通道抽象的内容包括: 1)当前通道状态,是否打开,是否绑定等: 2)通道的配置参数信息,如套接字缓冲区大小: 3)通道支持的IO操作: 4)处理和这个Channel相关的IO事件和请求的ChannelPipeline. 在Netty中所有的IO操作都是异步的,即执行一个IO调用的时候不会阻塞到操作完成,而后立即返回一个ChannelFuture对象,这个ChannelFuture对象会在某个时候通知我们IO操作执行结

Netty3 源码分析 - ChannelFuture

ChannelFuture抽象的是Channel中异步IO操作的结果.在Netty中,所有的IO操作是异步的,意味着任何IO调用会立刻返回,而不是等到操作真正的执行完成.相反,会返回一个ChannelFuture 对象,在IO完成之后通过其得到结果状态.ChannelFuture 要么完成要么未完成,当IO操作开始执行会创建一个新的future对象,初始状态时uncompleted (不是成功,失败,也不是取消)因为IO操作还木有完成,一旦IO操作完成(成功,失败,或者被取消).这个CHanne

Netty3 源码分析 - ClientBootstrap

Bootstrap是通道初始化辅助类 提供了初始化通道或子通道所需要的数据结构,那么ClientBootstrap就是客户端的,而且会执行连接操作. 配置通道,就是把相应的键值对选项传递给底层: ClientBootstrap b = ...; // Options for a new channel b.setOption("remoteAddress", new InetSocketAddress("example.com", 8080)); b.setOpti