6. Netty源码分析之EventLoop与EventLoopGroup

一、NioEventLoop与NioEventLoopGroup的关系

二、NioEventLoop

1. 设计原理

1. 负责IO读写

2. 执行task。通过调用NioEventLoop的execute(Runnable task)方法实现。我们知道,为了防止资源竞争和并发操作,我们经常会判断当前操作线程是否为EventLoop线程,如果不是,则将操作封装成task放进NioEventLoop的执行队列中,这样就实现了局部无锁化。

3. 定时任务。通过调用NioEventLoop的schedule(Runnable command, long delay, TimeUnit unit)方法实现。

2. 继承关系类图

3. 成员变量及Selector初始化

  这里可以看到,NioEventLoop持有一个Selector引用,负责去轮询准备就绪的Channel。

Selector selector;//多路复用器
private SelectedSelectionKeySet selectedKeys;
private final SelectorProvider provider;//selector生产者

private Selector openSelector() {
    final Selector selector;
    try {
        // 1. 打开Selector
        selector = provider.openSelector();
    } catch (IOException e) {
        throw new ChannelException("failed to open a new selector", e);
    }

    // 2. 是否打开SelectionKey优化,默认关闭,直接返回Selector
    if (DISABLE_KEYSET_OPTIMIZATION) {
        return selector;
    }

    // 3. 用自己的SelectedSelectionKeySet代替Java自带的selectedKeys
    try {
        SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

        Class<?> selectorImplClass =
                Class.forName("sun.nio.ch.SelectorImpl", false, ClassLoader.getSystemClassLoader());

        // Ensure the current selector implementation is what we can instrument.
        if (!selectorImplClass.isAssignableFrom(selector.getClass())) {
            return selector;
        }

        Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
        Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

        selectedKeysField.setAccessible(true);
        publicSelectedKeysField.setAccessible(true);

        selectedKeysField.set(selector, selectedKeySet);
        publicSelectedKeysField.set(selector, selectedKeySet);

        selectedKeys = selectedKeySet;
        logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
    } catch (Throwable t) {
        selectedKeys = null;
        logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t);
    }

    return selector;
}

4. run方法

  NioEventLoop中最重要的方法,无限轮询准备好的Channel并处理。

  首先将wakenUp还原为false,并保存之前的状态。通过hasTash()判断当前消息队列中是否有待处理消息,如果有则调用selectNow()方法立即进行一次select操作,看是否有准备就绪的Channel。

protected void run() {
    for (;;) {
        oldWakenUp = wakenUp.getAndSet(false);
        try {
            if (hasTasks()) {
                selectNow();
            } else {
                select();

                if (wakenUp.get()) {
                    selector.wakeup();
                }
            }...

  selectorNow()方法会立即触发selector选择操作,如果由准备就绪的Channel,则返回就绪的Channel集合,否则返回0。操作完成之后,再次判断用户是否调用了Selector的wakenUp()方法,如果调用,则执行selector.wakenUp()操作。

    void selectNow() throws IOException {
        try {
            selector.selectNow();
        } finally {
            // restore wakup state if needed
            if (wakenUp.get()) {
                selector.wakeup();
            }
        }
    }

  如果消息队列中没有待处理消息,则执行select方法,由selector轮询,看是否由准备就绪的Channel。

private void select() throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        // 当前时间 + 定时任务延时时间 = 定时任务触发事件
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
        for (;;) {
            // 超时时间 = 定时任务触发时间 - 当前时间 + 0.5毫秒调整值   --> 转化为毫秒
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            // 如果需要立即执行(已超时),则轮询
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }

            // 阻塞timeout时间,等待定时任务,执行select
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;

            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) {
                // 有新事件待处理 || 用户调用了wakenUp()唤醒多路复用器 || 消息队列中有新任务
                break;
            }

            // 如果本次是空轮询,有可能出发了JDK的epoll bug,它会导致selector空轮询,使IO线程一直处于100%状态
            // 对空轮询进行判断,如果在一定周期内连续发生了N次空轮询,说明触发了bug
            // 需要重建selector,将原selector上的Channel注册到新的selector上,并将老selector关闭
            if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // The selector returned prematurely many times in a row.
                // Rebuild the selector to work around the problem.
                logger.warn(
                        "Selector.select() returned prematurely {} times in a row; rebuilding selector.",
                        selectCnt);

                rebuildSelector();
                selector = this.selector;

                // Select again to populate selectedKeys.
                selector.selectNow();
                selectCnt = 1;
                break;
            }

            currentTimeNanos = System.nanoTime();
        }

        if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
            }
        }
    } catch (CancelledKeyException e) {
        if (logger.isDebugEnabled()) {
            logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
        }
        // Harmless exception - log anyway
    }
}

  如果轮询到了处于就绪状态的SocketChannel,则需要处理IO事件。

  处理完IO事件后,NioEventLoop还需要处理非IO的task和定时任务。为了保证IO事件和task都有足够的CPU事件执行,这里用IO事件占比计算非IO事件的执行事件(默认50%),在执行task时,如果超过这个时间,则直接返回,队列中的任务留到以后执行(这里Netty为了提升性能,每执行60次判断一次超时时间)。

final long ioStartTime = System.nanoTime();
needsToSelectAgain = false;
// 根据是否优化selectionKey执行不同的处理方法(都是处理IO事件)
if (selectedKeys != null) {
    processSelectedKeysOptimized(selectedKeys.flip());
} else {
    processSelectedKeysPlain(selector.selectedKeys());
}
final long ioTime = System.nanoTime() - ioStartTime;

final int ioRatio = this.ioRatio;
// 根据IO执行时间得到非IO执行时间,然后取执行task
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
protected boolean runAllTasks(long timeoutNanos) {
    // 从定时任务队列中将task放到tskQueue中(这里拿的是正在过期的任务)
    fetchFromDelayedQueue();
    // 从taskQueue中拿task
    Runnable task = pollTask();
    if (task == null) {
        return false;
    }

    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        try {
            task.run();
        } catch (Throwable t) {
            logger.warn("A task raised an exception.", t);
        }

        runTasks ++;

        // 为了提升性能,每执行60次,判断一次超时时间,如果执行时间已超出系统给定时间,则退出
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }

        // 循环处理task,如果没task了则退出
        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }

    this.lastExecutionTime = lastExecutionTime;
    return true;
}

  最后,判断系统是否处于停机状态,如果是,则调用closeAll方法释放资源,令NioEventLoop退出循环,关闭线程。

    if (isShuttingDown()) {
        closeAll();
        if (confirmShutdown()) {
            break;
        }
    }

  closeAll()方法遍历获取所有Channel,调用它的Unsafe.close()方法关闭所有链路,释放资源(具体close()方法可以看前面的Unsafe源码分析,close()方法最终调用的还是javaChannel的close()方法)。

private void closeAll() {
    selectAgain();
    Set<SelectionKey> keys = selector.keys();
    Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
    for (SelectionKey k: keys) {
        Object a = k.attachment();
        if (a instanceof AbstractNioChannel) {
            channels.add((AbstractNioChannel) a);
        } else {
            k.cancel();
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            invokeChannelUnregistered(task, k, null);
        }
    }

    for (AbstractNioChannel ch: channels) {
        ch.unsafe().close(ch.unsafe().voidPromise());
    }
}

三、NioEventLoopGroup

1. 构造方法(NioEventLoop创建)

  我们先来看EventLoopGroup的构造方法,这里通过构造方法,创建了指定线程数的NioEventLoop。

    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }

    public NioEventLoopGroup(int nThreads, Executor executor) {
        this(nThreads, executor, SelectorProvider.provider());
    }

    public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        super(nThreads, executor, selectorProvider);
    }

    // DEFAULT_EVENT_LOOP_THREADS = CPU个数*2
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    // EventLoop数组
    children = new EventExecutor[nThreads];
    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0]);
    }

2. NioEventLoop的分配

  当一个新的Channel连接时,NioEventLoopGroup需要拿出一个NioEventLoop让Channel绑定,这个Channel之后的IO操作都在这个NioEventLoop上操作。

    public EventExecutor next() {
        return children[Math.abs(childIndex.getAndIncrement() % children.length)];
    }

原文地址:https://www.cnblogs.com/lovezmc/p/11547912.html

时间: 2024-10-08 05:50:15

6. Netty源码分析之EventLoop与EventLoopGroup的相关文章

[编织消息框架][netty源码分析]4 eventLoop 实现类NioEventLoop职责与实现

NioEventLoop 是jdk nio多路处理实现同修复jdk nio的bug 1.NioEventLoop继承SingleThreadEventLoop 重用单线程处理 2.NioEventLoop是组成 pool EventLoopGroup 基本单元 总之好多边界判断跟业务经验之类的代码,非常烦碎 重要属性 public final class NioEventLoop extends SingleThreadEventLoop { //绑定 selector Selector sel

netty 源码分析二

以服务端启动,接收客户端连接整个过程为例分析, 简略分为 五个过程: 1.NioServerSocketChannel 管道生成, 2.NioServerSocketChannel 管道完成初始化, 3.NioServerSocketChannel注册至Selector选择器, 4.NioServerSocketChannel管道绑定到指定端口,启动服务 5.NioServerSocketChannel接受客户端的连接,进行相应IO操作 Ps:netty内部过程远比这复杂,简略记录下方便以后回忆

netty源码分析之揭开reactor线程的面纱(二)

如果你对netty的reactor线程不了解,建议先看下上一篇文章netty源码分析之揭开reactor线程的面纱(一),这里再把reactor中的三个步骤的图贴一下 reactor线程 我们已经了解到netty reactor线程的第一步是轮询出注册在selector上面的IO事件(select),那么接下来就要处理这些IO事件(process selected keys),本篇文章我们将一起来探讨netty处理IO事件的细节 我们进入到reactor线程的 run 方法,找到处理IO事件的代

Netty源码分析第3章(客户端接入流程)----&gt;第5节: 监听读事件

Netty源码分析第三章: 客户端接入流程 第五节: 监听读事件 我们回到AbstractUnsafe的register0()方法: private void register0(ChannelPromise promise) { try { //省略代码 //做实际的注册 doRegister(); neverRegistered = false; registered = true; //触发事件 pipeline.invokeHandlerAddedIfNeeded(); safeSetS

Netty源码分析第3章(客户端接入流程)----&gt;第4节: NioSocketChannel注册到selector

Netty源码分析第三章: 客户端接入流程 第四节: NioSocketChannel注册到selector 我们回到最初的NioMessageUnsafe的read()方法: public void read() { //必须是NioEventLoop方法调用的, 不能通过外部线程调用 assert eventLoop().inEventLoop(); //服务端channel的config final ChannelConfig config = config(); //服务端channel

Netty源码分析第3章(客户端接入流程)----&gt;第3节: NioSocketChannel的创建

Netty源码分析第三章: 客户端接入流程 第三节: NioSocketChannel的创建 回到上一小结的read()方法: public void read() { //必须是NioEventLoop方法调用的, 不能通过外部线程调用 assert eventLoop().inEventLoop(); //服务端channel的config final ChannelConfig config = config(); //服务端channel的pipeline final ChannelPi

Netty源码分析第4章(pipeline)----&gt;第4节: 传播inbound事件

Netty源码分析第四章: pipeline 第四节: 传播inbound事件 有关于inbound事件, 在概述中做过简单的介绍, 就是以自己为基准, 流向自己的事件, 比如最常见的channelRead事件, 就是对方发来数据流的所触发的事件, 己方要对这些数据进行处理, 这一小节, 以激活channelRead为例讲解有关inbound事件的处理流程 在业务代码中, 我们自己的handler往往会通过重写channelRead方法来处理对方发来的数据, 那么对方发来的数据是如何走到chan

Netty源码分析第4章(pipeline)----&gt;第7节: 前章节内容回顾

Netty源码分析第四章: pipeline 第七节: 前章节内容回顾 我们在第一章和第三章中, 遗留了很多有关事件传输的相关逻辑, 这里带大家一一回顾 首先看两个问题: 1.在客户端接入的时候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))为什么会调用到ServerBootstrap的内部类ServerBootstrapAcceptor中的channelRead()方法 2.客户端handler是什么时候被添加

4. Netty源码分析之Unsafe

Unsafe类实际上是Channel接口的辅助类,实际的IO操作都是由Unsafe接口完成的. 一.Unsafe继承关系图 二.AbstractUnsafe源码分析 1. register方法 register方法主要用于将当前Unsafe对应的Channel注册到EventLoop的多路复用器上,然后调用DefaultChannelPipeline的fireChannelRegisted方法,如果Channel被激活,则调用fireChannelActive方法. public final v