Netty源码分析第2章(NioEventLoop)---->第7节: 处理IO事件

Netty源码分析第二章: NioEventLoop

第七节:处理IO事件

上一小节我们了解了执行select()操作的相关逻辑, 这一小节我们继续学习select()之后, 轮询到io事件的相关逻辑:

回到NioEventLoop的run()方法:

protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    //轮询io事件(1)
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
            }
            cancelledKeys = 0;
            needsToSelectAgain = false;
            //默认是50
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    runAllTasks();
                }
            } else {
                //记录下开始时间
                final long ioStartTime = System.nanoTime();
                try {
                    //处理轮询到的key(2)
                    processSelectedKeys();
                } finally {
                    //计算耗时
                    final long ioTime = System.nanoTime() - ioStartTime;
                    //执行task(3)
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        //代码省略
    }
}

我们首先看if (ioRatio == 100)这个判断, ioRatio主要是用来控制processSelectedKeys()方法执行时间和任务队列执行时间的比例, 其中ioRatio默认是50, 所以会走到下一步else

首先通过final long ioStartTime = System.nanoTime()记录下开始时间, 再通过processSelectedKeys()方法处理轮询到的key, 我们跟到processSelectedKeys()方法中:

private void processSelectedKeys() {
    if (selectedKeys != null) {
        //flip()方法会直接返回key的数组
        processSelectedKeysOptimized(selectedKeys.flip());
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

我们知道selector通过netty优化之后, 会初始化 selectedKeys这个属性, 所以这个属性不为空就会走到processSelectedKeysOptimized(selectedKeys.flip())方法, 这个方法就是对应优化过的selector进行操作的, 如果是优化的selector, 则会进入processSelectedKeysPlain(selector.selectedKeys())方法

selectedKeys.flip()为selectedKey中绑定的数组, 我们之前小节讲过selectedKeys其实是通过数组存储的, 所以经过select()操作如果监听到事件selectedKeys的数组就会有值

跟进到processSelectedKeysOptimized(selectedKeys.flip())方法中:

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
    //通过for循环遍历数组
    for (int i = 0;; i ++) {
        //拿到当前的selectionKey
        final SelectionKey k = selectedKeys[i];
        if (k == null) {
            break;
        }
        //将当前引用设置为null
        selectedKeys[i] = null;
        //获取channel(NioSeverSocketChannel)
        final Object a = k.attachment();
        //如果是AbstractNioChannel, 则调用processSelectedKey()方法处理io事件
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        //代码省略
    }
}

首先通过for循环遍历数组中的每一个key, 获得key之后首先将数组中对应的下标清空, 因为selector不会自动清空, 这与我们使用原生selector时候, 通过遍历selector.selectedKeys()的set的时候, 拿到key之后要执行remove()是一个意思

之后获取注册在key上的channel, 判断channel是不是AbstractNioChannel, 通常情况都是AbstractNioChannel, 所以这里会执行rocessSelectedKey(k, (AbstractNioChannel) a)

跟到rocessSelectedKey(k, (AbstractNioChannel) a)方法中:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    //获取到channel中的unsafe
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    //如果这个key不是合法的, 说明这个channel可能有问题
    if (!k.isValid()) {
        //代码省略
    }
    try {
        //如果是合法的, 拿到key的io事件
        int readyOps = k.readyOps();
        //链接事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);
            unsafe.finishConnect();
        }
        //写事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }
        //读事件和接受链接事件
        //如果当前NioEventLoop是work线程的话, 这里就是op_read事件
        //如果是当前NioEventLoop是boss线程的话, 这里就是op_accept事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {
                return;
            }
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

我们首先获取和channel绑定的unsafe, 之后拿到channel注册的事件

我们关注if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)这个判断, 这个判断相信注释上写的很明白, 如果当前NioEventLoop是work线程的话, 这里就是op_read事件, 如果是当前NioEventLoop是boss线程的话, 这里就是op_accept事件

然后会通过channel绑定的unsafe对象执行read()方法用于处理链接或者读写事件

以上就是NioEventLoop对io事件的处理过程, 有关read()方法执行逻辑, 会在以后的章节中详细剖析

原文地址:https://www.cnblogs.com/xiangnan6122/p/10203152.html

时间: 2024-08-02 09:44:22

Netty源码分析第2章(NioEventLoop)---->第7节: 处理IO事件的相关文章

Netty源码分析第4章(pipeline)----&gt;第4节: 传播inbound事件

Netty源码分析第四章: pipeline 第四节: 传播inbound事件 有关于inbound事件, 在概述中做过简单的介绍, 就是以自己为基准, 流向自己的事件, 比如最常见的channelRead事件, 就是对方发来数据流的所触发的事件, 己方要对这些数据进行处理, 这一小节, 以激活channelRead为例讲解有关inbound事件的处理流程 在业务代码中, 我们自己的handler往往会通过重写channelRead方法来处理对方发来的数据, 那么对方发来的数据是如何走到chan

Netty源码分析第2章(NioEventLoop)----&gt;第6节: 执行selector操作

Netty源码分析第二章: NioEventLoop 第六节: 执行select操作 分析完了selector的创建和优化的过程, 这一小节分析select相关操作 跟到跟到NioEventLoop的run方法: protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE

Netty源码分析第2章(NioEventLoop)----&gt;第5节: 优化selector

第二章: NioEventLoop 第五节: 优化selector 在剖析selector轮询之前, 我们先讲解一下selector的创建过程, 回顾之前的小节, 在创建NioEventLoop中初始化了唯一绑定的selector: NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecuti

Netty源码分析第4章(pipeline)----&gt;第7节: 前章节内容回顾

Netty源码分析第四章: pipeline 第七节: 前章节内容回顾 我们在第一章和第三章中, 遗留了很多有关事件传输的相关逻辑, 这里带大家一一回顾 首先看两个问题: 1.在客户端接入的时候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))为什么会调用到ServerBootstrap的内部类ServerBootstrapAcceptor中的channelRead()方法 2.客户端handler是什么时候被添加

Netty源码分析第5章(ByteBuf)----&gt;第4节: PooledByteBufAllocator简述

Netty源码分析第五章: ByteBuf 第四节: PooledByteBufAllocator简述 上一小节简单介绍了ByteBufAllocator以及其子类UnPooledByteBufAllocator的缓冲区分类的逻辑, 这一小节开始带大家剖析更为复杂的PooledByteBufAllocator, 我们知道PooledByteBufAllocator是通过自己取一块连续的内存进行ByteBuf的封装, 所以这里更为复杂, 在这一小节简单讲解有关PooledByteBufAlloca

Netty源码分析第5章(ByteBuf)----&gt;第6节: 命中缓存的分配

Netty源码分析第6章: ByteBuf 第六节: 命中缓存的分配 上一小节简单分析了directArena内存分配大概流程, 知道其先命中缓存, 如果命中不到, 则区分配一款连续内存, 这一小节带大家剖析命中缓存的相关逻辑 分析先关逻辑之前, 首先介绍缓存对象的数据结构 回顾上一小节的内容, 我们讲到PoolThreadCache中维护了三个缓存数组(实际上是六个, 这里仅仅以Direct为例, heap类型的逻辑是一样的): tinySubPageDirectCaches, smallSu

Netty源码分析第5章(ByteBuf)----&gt;第1节: AbstractByteBuf

Netty源码分析第五章: ByteBuf 概述: 熟悉Nio的小伙伴应该对jdk底层byteBuffer不会陌生, 也就是字节缓冲区, 主要用于对网络底层io进行读写, 当channel中有数据时, 将channel中的数据读取到字节缓冲区, 当要往对方写数据的时候, 将字节缓冲区的数据写到channel中 但是jdk的byteBuffer是使用起来有诸多不便, 比如只有一个标记位置的指针position, 在进行读写操作时要频繁的通过flip()方法进行指针位置的移动, 极易出错, 并且by

Netty源码分析第5章(ByteBuf)----&gt;第3节: 内存分配器

Netty源码分析第五章: ByteBuf 第三节: 内存分配器 内存分配器, 顾明思议就是分配内存的工具, 在netty中, 内存分配器的顶级抽象是接口ByteBufAllocator, 里面定义了有关内存分配的相关api 抽象类AbstractByteBufAllocator实现了ByteBufAllocator接口, 并且实现了其大部分功能 和AbstractByteBuf一样, AbstractByteBufAllocator也实现了缓冲区分配的骨架逻辑, 剩余的交给其子类 以其中的分配

Netty源码分析第6章(解码器)----&gt;第3节: 行解码器

Netty源码分析第六章: 解码器 第三节: 行解码器 这一小节了解下行解码器LineBasedFrameDecoder, 行解码器的功能是一个字节流, 以\r\n或者直接以\n结尾进行解码, 也就是以换行符为分隔进行解析 同样, 这个解码器也继承了ByteToMessageDecoder 首先看其参数: //数据包的最大长度, 超过该长度会进行丢弃模式 private final int maxLength; //超出最大长度是否要抛出异常 private final boolean fail