[编织消息框架][netty源码分析]4 eventLoop 实现类NioEventLoop职责与实现

NioEventLoop 是jdk nio多路处理实现同修复jdk nio的bug

1.NioEventLoop继承SingleThreadEventLoop 重用单线程处理

2.NioEventLoop是组成 pool EventLoopGroup 基本单元

总之好多边界判断跟业务经验之类的代码,非常烦碎

重要属性

public final class NioEventLoop extends SingleThreadEventLoop {
    //绑定 selector
    Selector selector;
    //优化过的Set集合
    private SelectedSelectionKeySet selectedKeys;
    //引用全局 SelectorProvider
    private final SelectorProvider provider;
    ///////////////////////////////////////////
    //为true时执行selector.wakeup()
    private final AtomicBoolean wakenUp = new AtomicBoolean();
    //io任务占时比率
    private volatile int ioRatio = 50;
    //记录selectionKey撤销次数
    private int cancelledKeys;
    //处理selector.selectNow() 标志
    private boolean needsToSelectAgain;
}

替换Selector selectedKeySet字段与重构Selector

优化selectedKeySet集合用的是double cache技术,这种技术在图形渲染处理比较多

    //netty 用到反射加 AccessController技术替换掉 Selector selectedKeySet 字段
    private Selector openSelector() {
        final Selector selector = provider.openSelector();
        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });

        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                //用到反射技术更改 SelectorImpl 字段
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
                selectedKeysField.setAccessible(true);
                publicSelectedKeysField.setAccessible(true);

                selectedKeysField.set(selector, selectedKeySet);
                publicSelectedKeysField.set(selector, selectedKeySet);
                return null;
            }
        });

        return selector;
    }

//重新构建Selector
    private void rebuildSelector0() {
        final Selector oldSelector = selector;
        final Selector newSelector;

        if (oldSelector == null) {
            return;
        }

       newSelector = openSelector();

        //迁移处理
        int nChannels = 0;
        for (SelectionKey key: oldSelector.keys()) {
            Object a = key.attachment();
            try {
                //过滤key是否合法 已处理
                if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
                    continue;
                }
                int interestOps = key.interestOps();
                key.cancel();
                SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
                if (a instanceof AbstractNioChannel) {
                    // channel重新绑定SelectionKey
                    ((AbstractNioChannel) a).selectionKey = newKey;
                }
                nChannels ++;
            } catch (Exception e) {
                //出错处理 netty认为 socket已关闭
                if (a instanceof AbstractNioChannel) {
                    AbstractNioChannel ch = (AbstractNioChannel) a;
                    ch.unsafe().close(ch.unsafe().voidPromise());
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    invokeChannelUnregistered(task, key, e);
                }
            }
        }
        selector = newSelector;
        oldSelector.close();
     }

double cache 实现

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

        private SelectionKey[] keysA;
        private int keysASize;
        private SelectionKey[] keysB;
        private int keysBSize;
        private boolean isA = true;

        SelectedSelectionKeySet() {
            keysA = new SelectionKey[1024];
            keysB = keysA.clone();
        }

        @Override
        public boolean add(SelectionKey o) {
            if (o == null) {
                return false;
            }
            //是A开关即处理A
            if (isA) {
                int size = keysASize;
                keysA[size ++] = o;
                keysASize = size;
                //双倍扩展容量
                if (size == keysA.length) {
                    doubleCapacityA();
                }
            } else {
                int size = keysBSize;
                keysB[size ++] = o;
                keysBSize = size;
                if (size == keysB.length) {
                    doubleCapacityB();
                }
            }

            return true;
        }

        private void doubleCapacityA() {
            SelectionKey[] newKeysA = new SelectionKey[keysA.length << 1];
            System.arraycopy(keysA, 0, newKeysA, 0, keysASize);
            keysA = newKeysA;
        }

        private void doubleCapacityB() {
            SelectionKey[] newKeysB = new SelectionKey[keysB.length << 1];
            System.arraycopy(keysB, 0, newKeysB, 0, keysBSize);
            keysB = newKeysB;
        }
        //获取keys并切换
        SelectionKey[] flip() {
            if (isA) {
                isA = false;
                keysA[keysASize] = null;
                keysBSize = 0;
                return keysA;
            } else {
                isA = true;
                keysB[keysBSize] = null;
                keysASize = 0;
                return keysB;
            }
        }

        @Override
        public int size() {
            return isA?keysASize : keysBSize;
        }
    }

重载Selector select 逻辑,修复jdk 会产生的 bug

private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;

            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            //通过delayNanos计算出 select结束时间
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
            for (;;) {
                //计算出超时并转换成毫秒,再加上延时固定0.5毫秒
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }

                //如果有非IO任务,优先等侍selector操作
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }
                //阻塞当前线程
                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;
                //有IO,非IO,计划任务,wakenUp状态认为已完成 select 处理
                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    break;
                }
                //如果当前线程中断,netty认为关闭了服务,退出处理
                if (Thread.interrupted()) {
                    selectCnt = 1;
                    break;
                }

                //相当于下面等价,意思是当前时间大于或等于 (selectDeadLineNanos + 0.5毫秒) selectCnt 重置
                //currentTimeNanos + (System.nanoTime() -  selectDeadLineNanos - 500000L )   >= currentTimeNanos
                //System.nanoTime() -  selectDeadLineNanos - 500000L >= 0
                //System.nanoTime() >= selectDeadLineNanos + 500000L
                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {

                    // selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD 默认值512,重构selector
                    rebuildSelector();
                    selector = this.selector;
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }
                //刷新当前时间
                currentTimeNanos = time;
            }

    }

分发io与非io任务逻辑实现

//这部分做了代码整理
    @Override
    protected void run() {
        for (;;) {
            try {
                //检查是否有非IO任务同WAKEUP_TASK任务
                if(!hasTasks()){
                    continue;
                }
                //有任务就触发重写的 select
                select(wakenUp.getAndSet(false));
                if (wakenUp.get()) {
                    selector.wakeup();
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;//默认值50

               try {
                    final long ioStartTime = System.nanoTime();
                    //processSelectedKeys();
                    //一般会selectedKeys不会为null做了优化处理
                    if (selectedKeys != null) {
                        processSelectedKeysOptimized(selectedKeys.flip());
                    } else {
                        processSelectedKeysPlain(selector.selectedKeys());
                    }
                } finally {
                    //当ioRatio等于100时,百分百执行非IO全部任务
                    if (ioRatio == 100) {
                        runAllTasks();
                    }else{
                        final long ioTime = System.nanoTime() - ioStartTime;
                        //计算时非IO任务超时时间,公式 = 100 - ioRatio 算出非IO比率再跟IO相比 执行过的IO时间 * (非IO:IO)
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
             } catch (Throwable t) {
                //防止过多失败
                Thread.sleep(1000);
            }

            //处理完任务判断是否结束
             try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                  Thread.sleep(1000);
            }
        }
    }
    private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
        for (int i = 0;; i ++) {
            final SelectionKey k = selectedKeys[i];
            if (k == null) {
                break;
            }
            //依赖外部逻辑清理
            selectedKeys[i] = null;
            final Object a = k.attachment();

            //处理SelectedKey
            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
            //这里用到比较奇怪的处理,应该是个补丁来的。。。
             //从资料来源上说:当触发needsToSelectAgain时 channel全是关闭,所以忽略selectedKeys剩余的key,然后再重获取获取selectedKeys
            // null out entries in the array to allow to have it GC‘ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            if (needsToSelectAgain) {
                for (;;) {
                    i++;
                    if (selectedKeys[i] == null) {
                        break;
                    }
                    selectedKeys[i] = null;
                }

                selectAgain();
                selectedKeys = this.selectedKeys.flip();
                i = -1;
            }
        }
    }
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                return;
            }
            //这里忽略情况是 在执行 registerd deregistration 时不能关闭,至于前后顺序无需要太多关心,读者可以进去看看
            //每个人出现情况不一样,再加上eventLoop不可能为null的,这段代码明显没有经过测试
            // Only close ch if ch is still registerd to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            // 如果出现OP_CONNECT 状态必须先完成Connect 才能触发 read or wirte 操作
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                //清除SelectionKey.OP_CONNECT状态
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            //ByteBuffer 发送出去
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                 ch.unsafe().forceFlush();
            }

            //netty将OP_READ,OP_ACCEPT 状态统一执行read操作,那netty如何区分 read accept的呢,后面才分析
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

    //处理任务,失败策略执行注销处理
    private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
        try {
            task.channelReady(k.channel(), k);
            if (!k.isValid()) {
                 task.channelUnregistered(k.channel(), null);
            }
        } catch (Exception e) {
            k.cancel();
            task.channelUnregistered(k.channel(), null);
        }
    }

总结:

1.防cpu假死,超过一定时间重建Selector迁移SelectionKey

2.用反射技术替换Selector selectedKeySet字段,Set集合用到double cache技术

3.优先处理io任务,剩下时间处理非IO任务,通过ioRatio占比分配执行时间

4.在分发IO任务时做了大量的优化处理,如线程中断,读写IO、链路建立处理优先级,Selector 重建情况等

5.逻辑有时看起来好怪,再加上解决问题是修修补补的没经过优化代码,甚至作者没有经过测试就合并了,这是开源框架的通病

时间: 2024-10-23 17:24:47

[编织消息框架][netty源码分析]4 eventLoop 实现类NioEventLoop职责与实现的相关文章

[编织消息框架][netty源码分析]9 Promise 实现类DefaultPromise职责与实现

netty Future是基于jdk Future扩展,以监听完成任务触发执行Promise是对Future修改任务数据DefaultPromise是重要的模板类,其它不同类型实现基本是一层简单的包装,如DefaultChannelPromise主要是分析await是如何等侍结果的 public interface Future<V> extends java.util.concurrent.Future<V> { Future<V> addListener(Gener

[编织消息框架][netty源码分析]5 EventLoopGroup 实现类NioEventLoopGroup职责与实现

分析NioEventLoopGroup最主有两个疑问 1.next work如何分配NioEventLoop 2.boss group 与child group 是如何协作运行的 从EventLoopGroup接口约定通过register方法从channel或promise转换成ChannelFuture对象 next方法就是用来分配NioEventLoop public interface EventLoopGroup extends EventExecutorGroup { @Overrid

[编织消息框架][netty源码分析]7 Unsafe 实现类NioSocketChannelUnsafe职责与实现

Unsafe 是channel的内部接口, 负责跟socket底层打交道.从书写跟命名上看是不公开给开发者使用的,直到最后实现NioSocketChannelUnsafe也没有公开出去 public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> { interface Unsafe { RecvByteBufAllocator.Handle recvBufAllocHand

[编织消息框架][netty源码分析]11 UnpooledHeapByteBuf 与 ByteBufAllocator

每种ByteBuf都有相应的分配器ByteBufAllocator,类似工厂模式.我们先学习UnpooledHeapByteBuf与其对应的分配器UnpooledByteBufAllocator 如何知道alloc分配器那是个? 可以从官方下载的TimeServer 例子来学习,本项目已有源码可在 TestChannelHandler.class里断点追踪 从图可以看出netty 4.1.8默认的ByteBufAllocator是PooledByteBufAllocator,可以参过启动参数-D

[编织消息框架][netty源码分析]14 PoolChunk 的 PoolSubpage

final class PoolSubpage<T> implements PoolSubpageMetric { //该page分配的chunk final PoolChunk<T> chunk; //内存使用记录 private final long[] bitmap; //该page是否已释放 boolean doNotDestroy; //该page在chunk中的id,通过区段计算偏移 private final int memoryMapIdx; //该page在chu

6. Netty源码分析之EventLoop与EventLoopGroup

一.NioEventLoop与NioEventLoopGroup的关系 二.NioEventLoop 1. 设计原理 1. 负责IO读写 2. 执行task.通过调用NioEventLoop的execute(Runnable task)方法实现.我们知道,为了防止资源竞争和并发操作,我们经常会判断当前操作线程是否为EventLoop线程,如果不是,则将操作封装成task放进NioEventLoop的执行队列中,这样就实现了局部无锁化. 3. 定时任务.通过调用NioEventLoop的sched

Netty源码分析第4章(pipeline)----&gt;第4节: 传播inbound事件

Netty源码分析第四章: pipeline 第四节: 传播inbound事件 有关于inbound事件, 在概述中做过简单的介绍, 就是以自己为基准, 流向自己的事件, 比如最常见的channelRead事件, 就是对方发来数据流的所触发的事件, 己方要对这些数据进行处理, 这一小节, 以激活channelRead为例讲解有关inbound事件的处理流程 在业务代码中, 我们自己的handler往往会通过重写channelRead方法来处理对方发来的数据, 那么对方发来的数据是如何走到chan

Netty源码分析第6章(解码器)----&gt;第1节: ByteToMessageDecoder

Netty源码分析第六章: 解码器 概述: 在我们上一个章节遗留过一个问题, 就是如果Server在读取客户端的数据的时候, 如果一次读取不完整, 就触发channelRead事件, 那么Netty是如何处理这类问题的, 在这一章中, 会对此做详细剖析 之前的章节我们学习过pipeline, 事件在pipeline中传递, handler可以将事件截取并对其处理, 而之后剖析的编解码器, 其实就是一个handler, 截取byteBuf中的字节, 然后组建成业务需要的数据进行继续传播 编码器,

Netty源码分析第7章(编码器和写数据)----&gt;第3节: 写buffer队列

Netty源码分析七章: 编码器和写数据 第三节: 写buffer队列 之前的小结我们介绍过, writeAndFlush方法其实最终会调用write和flush方法 write方法最终会传递到head节点, 调用HeadContext的write方法: public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, prom