netty源码解解析(4.0)-6 线程模型-IO线程EventLoopGroup和NIO实现(一)

接口定义

io.netty.channel.EventLoopGroup extends EventExecutorGroup


方法

说明

ChannelFuture register(Channel channel)

把一个channel注册到一个EventLoop

ChannelFuture register(Channel channel, ChannelPromise promise);

同上

io.netty.channel.EventLoop extends OrderedEventExecutor, EventLoopGroup


方法

说明

EventLoopGroup parent()

得到创建这个eventLoop的EventLoopGroup

EventLoopGroup定义的主要方法是register, 这个方法的语义是把channel和eventLoop绑定在一起。一个channel对应一个eventLoop, 一个eventLoop会持有多个channel。

I/O线程EventLoopGroup的抽象实现

io.netty.channel.MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup

io.netty.channel.SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop

两个类主功能都是实现了EventLoopGroup定义的register方法

MultithreadEventLoopGroup

public ChannelFuture register(Channel channel) {

return next().register(channel);

}

public ChannelFuture register(Channel channel, ChannelPromise promise) {

return next().register(channel, promise);

}

SingleThreadEventLoop

public ChannelFuture register(Channel channel) {

return register(channel, new DefaultChannelPromise(channel, this));

}

public ChannelFuture register(final Channel channel, final ChannelPromise promise) {

channel.unsafe().register(this, promise);

return promise;

}

register的实现主要是为了调用Channel.Unsafe实例的register方法。

NIO实现

io.netty.channel.nio.NioEventLoopGroup extends MultithreadEventLoopGroup

io.netty.channel.nio.NioEventLoop extends SingleThreadEventLoop

NioEventLoopGroup是在MultithreadEventLoopGroup基础上实现了对JDK NIO Selector的封装, 它实现以下几个功能:

  • 创建selector
  • 在selector上注册channel感兴趣的NIO事件
  • 实现EventExecutor的run方法,定义NIO事件和Executor任务的处理流程。
  • 把NIO事件转换成对channel unsafe的调用或NioTask的调用
  • 控制线程执行I/O操作和排队任务的用时比例
  • 处理epoll selector cpu 100%的bug

下面来具体分析这几个功能的实现。

创建Selector

NioEventLoop#openSelector()实现了创建selector的功能,默认情况下,使用SelectorProvider#openSelector()方法创建一个新个selector:

final Selector unwrappedSelector = provider.openSelector();

如果设置环境变量io.netty.noKeySetOptimization=true, 会创建一个selectedKeySet = new SelectedSelectionKeySet(), 然后使用java的反射机制把selector的selectedKeys和publicSelectedKeys替换成selectedKeySet,具体步骤是:

1.得到selector的真正类型: sun.nio.ch.SelectorImpl

Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {

@Override

public Object run() {

try {

return Class.forName(

"sun.nio.ch.SelectorImpl",

false,

PlatformDependent.getSystemClassLoader());

} catch (Throwable cause) {

return cause;

}

}

});

final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;

2.替换selector是属性unwrappedSelector

Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");

Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

selectedKeysField.set(unwrappedSelector, selectedKeySet);

publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);

之所以会设计一个这样的优化选项,是因为一般情况下调用完selector的select或selectNow方法后需要调用Selector#selectedKeys()得到触发NIO事件的的SelectableChannel,这样优化之后,可以直接从selectedKeySet中得到已经触发了NIO事件的SelectableChannel。

在selector上注册channel感兴趣的NIO事件

NioEventLoop提供了unwrappedSelector方法,这个方法返回了它创建好的Selector实例。这样任何的外部类都可以把任意的SelectableChannel注册到这selector上。在AbstractNioChannel中, doRegister方法的实现就是使用了这个方法:

selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

另外,它还提供了一个register方法:

public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task)

这个方法会把task当成SelectableChannel的附件注册到selector上:

ch.register(selector, interestOps, task);

实现EventExecutor的run方法,定义NIO事件和Executor任务的处理流程

在NioEventLoop的run方法中实现NIO事件和EventExecutor的任务处理逻辑,这个run方法在io.netty.util.concurrent.SingleThreadEventExecutor中定义。在上一章中,我们看到了DefaultEventExecutor中是如何实现这个run方法的,这里我们将要看到这run方法的另一个实现。和SingleThreadEventExecutor中的run方法相比,NioEventLoop的run方法不仅要及时地执行taskQueue中的任务,还要能及时地处理NIO事件,因此它会同时检查selector中的NIO事件和和taskQueue队列,任何一个中有事件需要处理或有任务需要执行,它不会阻塞线程。同时它也保证了在没有NIO事件和任务的情况下线程不会无谓的空转浪费CUP资源。

run主要实现如下,为了更清晰的说明它的主要功能,我对原来的代码进行了一些删减。

for(;;){

try{

//phase1: 同时检查NIO事件和任务

switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {

case SelectStrategy.CONTINUE:

continue;

case SelectStrategy.SELECT:

select(wakenUp.getAndSet(false)); //在taskQueue中没有任务的时候执行select

}

//phase2: 进入处理NIO事件,执行executor任务

try{

//处理NIO事件

processSelectedKeys();

}finally{

//处理taskQueu中的任务

runAllTasks();

}

}catch(Throwable t){

handleLoopException(t);

}

}

run方法有两个阶段构成:

phase1: 检查NIO事件或executor任务,如果有任何的NIO事件或executor任务进入phase2。

这样阶段的主要工作在selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())和select中完成。

selectStrategy.calculateStrategy实现

selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())

这行代码的含义是: 如果hasTasks() == true, 调用以下selector#selectNow, 然后进入phase2。 否则调用select。这里使用了strategy模式,默认的strategy实现是io.netty.channe.DefaultSelectStrategy implements SelectStrategy

@Override

public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {

return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;

}

DefaultSelectStrategy实现了SelectStrategy接口,这接口定义了两个常量:

int SELECT = -1;

int CONTINUE = -2;

运行时selectSuppler参数传入的是selectNowSupplier, 它的实现如下:

private final IntSupplier selectNowSupplier = new IntSupplier() {

@Override

public int get() throws Exception {

return selectNow();

}

};

这里的get方法调用了selectNow, selectNow调用的是Selector#selectNew方法,这个方法的返回值是>=0。

hashTasks的传入的参数是hasTask()的返回值: return !taskQueue.isEmpty();

代码读到这里就会发现,使用默认的的SelectStrategy实现,calculateStrategy在hasTasks()==true时返回值>=0, hasTasks() == false时返回值是SelectStrategy.SELECT,不会返回SelectStrategy.CONTINUE。

select实现

select的执行逻辑是:

1. 计算超select方法的结束时间selectDeadLineNanos

long currentTimeNanos = System.nanoTime();

long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

2. 进入循环,检查超时--超时跳出循环。

long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;

if (timeoutMillis <= 0) {

if (selectCnt == 0) {

selector.selectNow();

selectCnt = 1;

}

break;

}

3. 如果在select执行过程中有executor任务提交或可以当前的wakeUp由false变成true, 跳出循环

if (hasTasks() && wakenUp.compareAndSet(false, true)) {

selector.selectNow();

selectCnt = 1;

break;

}

4. 调用selector#select等待NIO事件。

int selectedKeys = selector.select(timeoutMillis);

selectCnt ++;

5. 如果满足这些条件的任何一个,跳出循环: 有NIO事件、wakeUp的新旧值都是true、taskQueue中有任务、有定时任务到期。

if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {

break;

}

6. 如果线程被中断,跳出循环。

if (Thread.interrupted()) {

break;

}

7. 如果selector.select超时,没有检查到任何NIO事件, 会在下次循环开始时跳出循环。 如果每次超时,跳到第2步继续下一次循环。

long time = System.nanoTime();

if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {

selectCnt = 1;

}

currentTimeNanos = time;

select 最迟会在当前时间>= selectDeadLineNanos时返回,这个时间是最近一个到期的定时任务执行的时间,换言之如果没有任何的NIO事件或executor任务,select会在定时任务到期时返回。如果没有定时任务,delayNanos(currentTimeNanos)返回的值是 TimeUnit.SECONDS.toNanos(1),即1秒。 select会在检查到任何NIO事件或executor任务时返回,为了保证这点,在selector.select(timeoutMillis)前后都会调用hasTasks检查executor任务,为了能在调用executet提交任务时唤醒selector.select,NioEventLoop覆盖了SingleThreadEventExecutor的wake方法:

protected void wakeup(boolean inEventLoop) {

if (!inEventLoop && wakenUp.compareAndSet(false, true)) {

selector.wakeup();

}

}

这个方法会及时的唤醒selector.select, 保证新提交的任务可以得到及时的执行。

phase2: 进入处理NIO事件,执行executor任务

这个阶段是先调用processSelectedKeys()处理NIO事件,然后掉用 runAllTasks()处理所有已经到期的定时任务和已经在排队的任务。这个阶段还实现了NIO事件和executor任务的用时比例管理,这个特性稍后会详细分析。

原文地址:https://www.cnblogs.com/brandonli/p/10100139.html

时间: 2024-08-27 14:54:06

netty源码解解析(4.0)-6 线程模型-IO线程EventLoopGroup和NIO实现(一)的相关文章

netty源码解解析(4.0)-1 核心架构

netty是java开源社区的一个优秀的网络框架.使用netty,我们可以迅速地开发出稳定,高性能,安全的,扩展性良好的服务器应用程序.netty封装简化了在服务器开发领域的一些有挑战性的问题:jdk nio的使用:多线程并发:扩展性.它还提供了多种应用层协议的支持:http/https/websock, protobuf, 自定义协议, 简化了服务器协议的开发. netty是一个基于事件驱动的框架,它把事件分成两种类型:输入事件(inbound)和输出事件(outbound), 整个框架都是围

netty源码解解析(4.0)-14 Channel NIO实现:读取数据

 本章分析Nio Channel的数据读取功能的实现. Channel读取数据需要Channel和ChannelHandler配合使用,netty设计数据读取功能包括三个要素:Channel, EventLoop和ChannelHandler.Channel有个read方法,这个方法不会直接读取数据,它的作用是通知持有当前channel的eventLoop可以从这个这个channel读取数据了,这个方法被调用之后eventLoop会在channel有数据可读的时候从channel读出数据然后把数

netty源码解解析(4.0)-21 ByteBuf的设计原理

????io.netty.buffer包中是netty ByteBuf的实现.ByteBuf是一个二进制缓冲区的抽象接口,它的功能有: 可以随机访问.顺序访问. 支持基本数据类型(byte, short, int, long, float, double)的序列化和反序列化. 不限制底层原始的数据类型,可以是byte[], NIO Buffer, String, IO Stream, 直接内(C语言中可以表示为指向一块内置起始地址的指针void*, 内存的长度), 等等. 为什么需要ByteBu

netty源码解解析(4.0)-8 ChannelPipeline的设计

io.netty.channel.ChannelPipeline 设计原理 上图中,为了更直观地展示事件处理顺序, 故意有规律地放置两种handler的顺序,实际上ChannelInboundHandler和ChanneOutboundHandler的顺序可以是任意,取决于用户调用add方法把handler方在哪里. ChannelPipeline的特性: 1. 它是一个双向链表 2. 每个节点持有一个ChannelHandler实例,这个实例可以是ChannelInboundHandler类型

netty源码解解析(4.0)-16 ChannelHandler概览

本章开始分析ChannelHandler实现代码.ChannelHandler是netty为开发者提供的实现定制业务的主要接口,开发者在使用netty时,最主要的工作就是实现自己的ChannelHandler.ChannelHandler在设计上需要和ChannelPipeline配合共同实现pipeline的事件传递能力,这要求ChannelHandler需要实现一些固定的基本功能.由于这个原因,如果让用户自己完整地实现,会显得比较麻烦.为此netty实现类一系列的类来帮助开发者以简单的方式实

netty源码解解析(4.0)-23 ByteBuf内存管理:分配和释放

ByteBuf内存分配和释放由具体实现负责,抽象类型只定义的内存分配和释放的时机. 内存分配分两个阶段: 第一阶段,初始化时分配内存.第二阶段: 内存不够用时分配新的内存.ByteBuf抽象层没有定义第一阶段的行为,但定义了第二阶段的方法: public abstract ByteBuf capacity(int newCapacity) 这个方法负责分配一个长度为newCapacity的新内存. 内存释放的抽象实现在AbstractReferenceCountedByteBuf中实现,这个类实

netty源码解解析(4.0)-24 ByteBuf基于内存池的内存管理

PooledByteBuf的初始化过程分为两个步骤:创建实例:初始化内存.这两个步骤的代码如下: 507383170 protected PooledByteBuf(Recycler.Handle recyclerHandle, int maxCapacity) { super(maxCapacity); this.recyclerHandle = recyclerHandle; } void init(PoolChunk<T> chunk, long handle, int offset,

JAVA框架底层源码剖析系列Spring,Mybatis,Springboot,Netty源码深度解析

<Spring源码深度解析>从核心实现和企业应用两个方面,由浅入深.由易到难地对Spring源码展开了系统的讲解,包括Spring的设计理念和整体架构.容器的基本实现.默认标签的解析.自定义标签的解析.bean的加载.容器的功能扩展.AOP.数据库连接JDBC.整合MyBatis.事务.SpringMVC.远程服务.Spring消息服务等内容. <Spring源码深度解析>不仅介绍了使用Spring框架开发项目必须掌握的核心概念,还指导读者如何使用Spring框架编写企业级应用,并

netty源码解析(4.0)-29 Future模式的实现

Future模式是一个重要的异步并发模式,在JDK有实现.但JDK实现的Future模式功能比较简单,使用起来比较复杂.Netty在JDK Future基础上,加强了Future的能力,具体体现在: 更加简单的结果返回方式.在JDK中,需要用户自己实现Future对象的执行及返回结果.而在Netty中可以使用Promise简单地调用方法返回结果. 更加灵活的结果处理方式.JDK中只提供了主动得到结果的get方法,要么阻塞,要么轮询.Netty除了支持主动get方法外,还可以使用Listener被