Netty源码分析第2章(NioEventLoop)---->第6节: 执行selector操作

Netty源码分析第二章: NioEventLoop

第六节: 执行select操作

分析完了selector的创建和优化的过程, 这一小节分析select相关操作

跟到跟到NioEventLoop的run方法:

protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    //轮询io事件(1)
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
            }
            cancelledKeys = 0;
            needsToSelectAgain = false;
            //默认是50
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    runAllTasks();
                }
            } else {
                //记录下开始时间
                final long ioStartTime = System.nanoTime();
                try {
                    //处理轮询到的key(2)
                    processSelectedKeys();
                } finally {
                    //计算耗时
                    final long ioTime = System.nanoTime() - ioStartTime;
                    //执行task(3)
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        //代码省略
    }
}

代码比较长, 其实主要分为三部分:

1.轮询io事件

2. 处理轮询到的key

3.执行task

这一小节, 主要剖析第一部分, 轮询io事件

首先switch块中默认会走到SelectStrategy.SELECT中, 执行select(wakenUp.getAndSet(false))方法

参数wakenUp.getAndSet(false)代表当前select操作是未唤醒状态

进入到select(wakenUp.getAndSet(false))方法中:

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        //当前系统的纳秒数
        long currentTimeNanos = System.nanoTime();
        //截止时间=当前时间+队列第一个任务剩余执行时间
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
        for (;;) {
            //阻塞时间(毫秒)=(截止时间-当前时间+0.5毫秒)
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }
            //进行阻塞式的select操作
            int selectedKeys = selector.select(timeoutMillis);
            //轮询次数
            selectCnt ++;
            //如果轮询到一个事件(selectedKeys != 0), 或者当前select操作需要唤醒(oldWakenUp),
            //或者在执行select操作时已经被外部线程唤醒(wakenUp.get()),
            //或者任务队列已经有任务(hasTask), 或者定时任务队列中有任务(hasScheduledTasks())
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                break;
            }
            //省略
            //记录下当前时间
            long time = System.nanoTime();
            //当前时间-开始时间>=超时时间(条件成立, 执行过一次select操作, 条件不成立, 有可能发生空轮询)
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                //代表已经进行了一次阻塞式select操作, 操作次数重置为1
                selectCnt = 1;
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                //省略日志代码
                //如果空轮询的次数大于一个阈值(512), 解决空轮询的bug
                rebuildSelector();
                selector = this.selector;
                selector.selectNow();
                selectCnt = 1;
                break;
            }
            currentTimeNanos = time;
        }
        //代码省略
    } catch (CancelledKeyException e) {
        //省略
    }
}

首先通过long currentTimeNanos = System.nanoTime()获取系统的纳秒数

继续往下看:

long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

delayNanos(currentTimeNanos)代表距定时任务中第一个任务剩余多长时间, 这个时间+当前时间代表这次操作不能超过的时间, 因为超过之后定时任务不能严格按照预定时间执行, 其中定时任务队列是已经按照执行时间有小到大排列好的队列, 所以第一个任务则是最近需要执行的任务, selectDeadLineNanos就代表了当前操作不能超过的时间

然后就进入到了无限for循环

for循环中我们关注:

long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L

selectDeadLineNanos - currentTimeNanos+500000L代表截止时间-当前时间+0.5毫秒的调整时间, 除以1000000表示将计算的时间转化为毫秒数

最后算出的时间就是selector操作的阻塞时间, 并赋值到局部变量的timeoutMillis中

后面有个判断if(imeoutMillis<0), 代表当前时间已经超过了最后截止时间+0.5毫秒, selectCnt == 0代表没有进行select操作, 满足这两个条件, 则执行selectNow()之后, 将selectCnt赋值为1之后跳出循环

如果没超过截止时间, 就进行了if(hasTasks() && wakenUp.compareAndSet(false, true))判断

这里我们只需要关注hasTasks()方法就行, 这里是判断当前NioEventLoop所绑定的taskQueue是否有任务, 如果有任务, 则执行selectNow()之后, 将selectCnt赋值为1之后跳出循环(跳出循环之后去执行任务队列中的任务)

hasTasks()方法可以自己跟一下, 非常简单

如果没有满足上述条件, 就会执行int selectedKeys = selector.select(timeoutMillis)进行阻塞式轮询, 并且自增轮询次数, 而后会进行如下判断:

if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
    break;
}

selectedKeys != 0代表已经有轮询到的事件, oldWakenUp代表当前select操作是否需要唤醒, wakenUp.get()说明已经被外部线程唤醒, hasTasks()代表任务队列是否有任务, hasScheduledTasks()代表定时任务队列是否任务, 满足条件之一, 就跳出循环

long time = System.nanoTime()记录了当前的时间, 之后有个判断:

if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos)这里的意思是当前时间-阻塞时间>方法开始执行的时间, 这里说明已经完整的执行完成了一个阻塞的select()操作, 将selectCnt设置成1

如果此条件不成立, 说明没有完整执行select()操作, 可能触发了一次空轮询, 根据前一个selectCnt++这步我们知道, 每触发一次空轮询selectCnt都会自增

之后会进入第二个判断SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD

其中SELECTOR_AUTO_REBUILD_THRESHOLD默认是512, 这个判断意思就是空轮询的次数如果超过512次, 则会认为是发生了epoll bug, 这样会通过

rebuildSelector()方法重新构建selector, 然后将重新构建的selector赋值到局部变量selector, 执行一次selectNow(), 将selectCnt初始化1, 跳出循环

rebuildSelector()方法中, 看netty是如何解决epoll bug的:

public void rebuildSelector() {
    //是否是由其他线程发起的
    if (!inEventLoop()) {
        //如果是其他线程发起的, 将rebuildSelector()封装成任务队列, 由NioEventLoop进行调用
        execute(new Runnable() {
            @Override
            public void run() {
                rebuildSelector();
            }
        });
        return;
    }
    final Selector oldSelector = selector;
    final Selector newSelector;
    if (oldSelector == null) {
        return;
    }
    try {
        //重新创建一个select
        newSelector = openSelector();
    } catch (Exception e) {
        logger.warn("Failed to create a new Selector.", e);
        return;
    }
    int nChannels = 0;
    for (;;) {
        try {
            //拿到旧select中所有的key
            for (SelectionKey key: oldSelector.keys()) {
                Object a = key.attachment();
                try {
                    Object a = key.attachment();
                    //代码省略

                    //获取key注册的事件
                    int interestOps = key.interestOps();
                    //将key注册的事件取消
                    key.cancel();
                    //注册到重新创建的新的selector中
                    SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
                    //如果channel是NioChannel
                    if (a instanceof AbstractNioChannel) {
                        //重新赋值
                        ((AbstractNioChannel) a).selectionKey = newKey;
                    }
                    nChannels ++;
                } catch (Exception e) {
                    //代码省略
                }
            }
        } catch (ConcurrentModificationException e) {
            continue;
        }
        break;
    }
    selector = newSelector;
    //代码省略
}

首先会判断是不是当前NioEventLoop线程执行的, 如果不是, 则将构建方法封装成task由当前NioEventLoop执行

final Selector oldSelector = selector拿到旧的selector

然后通过newSelector = openSelector()创建新的selector

通过for循环遍历所有注册在selector中的key

Object a = key.attachment()是获取channel, 第一章讲过, 在注册时, 将自身作为属性绑定在key上

for循环体中, 通过int interestOps = key.interestOps()获取其注册的事件

key.cancel()将注册的事件进行取消

SelectionKey newKey = key.channel().register(newSelector, interestOps, a)将channel以及注册的事件注册在新的selector中

if (a instanceof AbstractNioChannel)判断是不是NioChannel

如果是NioChannel, 则通过((AbstractNioChannel) a).selectionKey = newKey将自身的属性selectionKey赋值为新返回的key

selector = newSelector将自身NioEventLoop属性selector赋值为新创建的newSelector

至此, 就是netty解决epoll bug的步骤, 其实就是创建一个新的selector, 将旧selector中注册的channel和事件重新注册到新的selector中, 然后将自身selector属性替换成新创建的selector

原文地址:https://www.cnblogs.com/xiangnan6122/p/10203139.html

时间: 2024-11-10 14:01:26

Netty源码分析第2章(NioEventLoop)---->第6节: 执行selector操作的相关文章

Netty源码分析第2章(NioEventLoop)----&gt;第5节: 优化selector

第二章: NioEventLoop 第五节: 优化selector 在剖析selector轮询之前, 我们先讲解一下selector的创建过程, 回顾之前的小节, 在创建NioEventLoop中初始化了唯一绑定的selector: NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecuti

Netty源码分析第2章(NioEventLoop)----&gt;第7节: 处理IO事件

Netty源码分析第二章: NioEventLoop 第七节:处理IO事件 上一小节我们了解了执行select()操作的相关逻辑, 这一小节我们继续学习select()之后, 轮询到io事件的相关逻辑: 回到NioEventLoop的run()方法: protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case Sele

Netty源码分析第4章(pipeline)----&gt;第7节: 前章节内容回顾

Netty源码分析第四章: pipeline 第七节: 前章节内容回顾 我们在第一章和第三章中, 遗留了很多有关事件传输的相关逻辑, 这里带大家一一回顾 首先看两个问题: 1.在客户端接入的时候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))为什么会调用到ServerBootstrap的内部类ServerBootstrapAcceptor中的channelRead()方法 2.客户端handler是什么时候被添加

Netty源码分析第4章(pipeline)----&gt;第4节: 传播inbound事件

Netty源码分析第四章: pipeline 第四节: 传播inbound事件 有关于inbound事件, 在概述中做过简单的介绍, 就是以自己为基准, 流向自己的事件, 比如最常见的channelRead事件, 就是对方发来数据流的所触发的事件, 己方要对这些数据进行处理, 这一小节, 以激活channelRead为例讲解有关inbound事件的处理流程 在业务代码中, 我们自己的handler往往会通过重写channelRead方法来处理对方发来的数据, 那么对方发来的数据是如何走到chan

Netty源码分析第5章(ByteBuf)----&gt;第4节: PooledByteBufAllocator简述

Netty源码分析第五章: ByteBuf 第四节: PooledByteBufAllocator简述 上一小节简单介绍了ByteBufAllocator以及其子类UnPooledByteBufAllocator的缓冲区分类的逻辑, 这一小节开始带大家剖析更为复杂的PooledByteBufAllocator, 我们知道PooledByteBufAllocator是通过自己取一块连续的内存进行ByteBuf的封装, 所以这里更为复杂, 在这一小节简单讲解有关PooledByteBufAlloca

Netty源码分析第5章(ByteBuf)----&gt;第6节: 命中缓存的分配

Netty源码分析第6章: ByteBuf 第六节: 命中缓存的分配 上一小节简单分析了directArena内存分配大概流程, 知道其先命中缓存, 如果命中不到, 则区分配一款连续内存, 这一小节带大家剖析命中缓存的相关逻辑 分析先关逻辑之前, 首先介绍缓存对象的数据结构 回顾上一小节的内容, 我们讲到PoolThreadCache中维护了三个缓存数组(实际上是六个, 这里仅仅以Direct为例, heap类型的逻辑是一样的): tinySubPageDirectCaches, smallSu

Netty源码分析第5章(ByteBuf)----&gt;第1节: AbstractByteBuf

Netty源码分析第五章: ByteBuf 概述: 熟悉Nio的小伙伴应该对jdk底层byteBuffer不会陌生, 也就是字节缓冲区, 主要用于对网络底层io进行读写, 当channel中有数据时, 将channel中的数据读取到字节缓冲区, 当要往对方写数据的时候, 将字节缓冲区的数据写到channel中 但是jdk的byteBuffer是使用起来有诸多不便, 比如只有一个标记位置的指针position, 在进行读写操作时要频繁的通过flip()方法进行指针位置的移动, 极易出错, 并且by

Netty源码分析第5章(ByteBuf)----&gt;第3节: 内存分配器

Netty源码分析第五章: ByteBuf 第三节: 内存分配器 内存分配器, 顾明思议就是分配内存的工具, 在netty中, 内存分配器的顶级抽象是接口ByteBufAllocator, 里面定义了有关内存分配的相关api 抽象类AbstractByteBufAllocator实现了ByteBufAllocator接口, 并且实现了其大部分功能 和AbstractByteBuf一样, AbstractByteBufAllocator也实现了缓冲区分配的骨架逻辑, 剩余的交给其子类 以其中的分配

Netty源码分析第6章(解码器)----&gt;第3节: 行解码器

Netty源码分析第六章: 解码器 第三节: 行解码器 这一小节了解下行解码器LineBasedFrameDecoder, 行解码器的功能是一个字节流, 以\r\n或者直接以\n结尾进行解码, 也就是以换行符为分隔进行解析 同样, 这个解码器也继承了ByteToMessageDecoder 首先看其参数: //数据包的最大长度, 超过该长度会进行丢弃模式 private final int maxLength; //超出最大长度是否要抛出异常 private final boolean fail