Netty源码分析之NioEventLoop执行流程

NioEventLoop启动触发条件:

1.服务端绑定本地端口

2.新连接接入通过chooser绑定一个NioEventLoop

服务端绑定本地端口

  绑定本地端口,使用下面方法;

ChannelFuture future = bootstrap.bind(host, port).sync();

  最终会调用doBind0()方法:

private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
    channel.eventLoop().execute(new Runnable() {
        public void run() {
            if(regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }

        }
    });
}

  这个时候就会调用channel对应NioEventLoop的execute方法,会判断是否在当前的eventloop对应的thread中,如果在,直接向任务队列中添加绑定端口的任务,如果不在,首先要start当前eventLoop对应的thread,再将任务放到任务队列中。这里的excute(task)方法,并不是让线程直接执行它,而是将它放到线程的任务队列中,等待线程去执行它。

public void execute(Runnable task) {
    if(task == null) {
        throw new NullPointerException("task");
    } else {
        boolean inEventLoop = this.inEventLoop();
        if(inEventLoop) {
            this.addTask(task);
        } else {
            this.startThread();
            this.addTask(task);
            if(this.isShutdown() && this.removeTask(task)) {
                reject();
            }
        }

        if(!this.addTaskWakesUp && this.wakesUpForTask(task)) {
            this.wakeup(inEventLoop);
        }

    }
}

NioEventLoop线程执行逻辑

  NioEventLoop对应线程的run方法,run()方法里面是一个死循环,主要的逻辑是首先采用select检查是否有IO事件,如果有IO事件,就采用processSelectedKey()对IO事件进行处理,最后调用runAllTasks()处理任务队列中的任务。

protected void run() {
    while(true) {
        boolean oldWakenUp = this.wakenUp.getAndSet(false);

        try {
            if(this.hasTasks()) {
                this.selectNow();
            } else {
                this.select(oldWakenUp);
                if(this.wakenUp.get()) {
                    this.selector.wakeup();
                }
            }

            this.cancelledKeys = 0;
            this.needsToSelectAgain = false;
            int t = this.ioRatio;
            if(t == 100) {
                this.processSelectedKeys();
                this.runAllTasks();
            } else {
                long e = System.nanoTime();
                this.processSelectedKeys();
                long ioTime = System.nanoTime() - e;
                this.runAllTasks(ioTime * (long)(100 - t) / (long)t);
            }

            if(this.isShuttingDown()) {
                this.closeAll();
                if(this.confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable var8) {
            logger.warn("Unexpected exception in the selector loop.", var8);

            try {
                Thread.sleep(1000L);
            } catch (InterruptedException var7) {
                ;
            }
        }
    }
}

  这段代码中的ioRadio是控制执行IO事件和执行任务队列中的任务的一个事件比,默认是50,代表执行IO事件处理和执行任务队列的任务事件比是1:1。

1)使用select检测IO事件

  通过Selector的select()方法可以选择已经准备就绪的通道 (这些通道包含你感兴趣的的事件)。比如你对读就绪的通道感兴趣,那么select()方法就会返回读事件已经就绪的那些通道。Java中的Selector几个重载的select()方法:

  • int select():阻塞到至少有一个通道在你注册的事件上就绪了。
  • int select(long timeout):和select()一样,但最长阻塞时间为timeout毫秒。
  • int selectNow():非阻塞,只要有通道就绪就立刻返回。

  select()方法返回的int值表示有多少通道已经就绪,是自上次调用select()方法后有多少通道变成就绪状态。之前在select()调用时进入就绪的通道不会在本次调用中被记入,而在前一次select()调用进入就绪但现在已经不在处于就绪的通道也不会被记入。例如:首次调用select()方法,如果有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通道,但在每次select()方法调用之间,只有一个通道就绪了。

  一旦调用select()方法,并且返回值不为0时,则可以通过调用Selector的selectedKeys()方法来访问已选择键集合 。如下:

Set selectedKeys=selector.selectedKeys();

  Netty中首先判断任务队列是否为空,如果为空的话,就采用select(ltimeout)有超时设置的阻塞方法,如果不为空的话,就调用非阻塞的selectNow()方法,因为即使没有IO事件处理,也可以对任务队列中的任务进行处理。Netty中NioEventLoop的select和selectNow方法其实底层还是依靠selector的select方法。

void selectNow() throws IOException {
    try {
        this.selector.selectNow();
    } finally {
        if(this.wakenUp.get()) {
            this.selector.wakeup();
        }

    }

}

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;

    try {
        int e = 0;
        long currentTimeNanos = System.nanoTime();
        long selectDeadLineNanos = currentTimeNanos + this.delayNanos(currentTimeNanos);

        while(true) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if(timeoutMillis <= 0L) {
                if(e == 0) {
                    selector.selectNow();
                    e = 1;
                }
                break;
            }

            int selectedKeys = selector.select(timeoutMillis);
            ++e;
            if(selectedKeys != 0 || oldWakenUp || this.wakenUp.get() || this.hasTasks() || this.hasScheduledTasks()) {
                break;
            }

            if(Thread.interrupted()) {
                if(logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely because Thread.currentThread().interrupt() was called. Use NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }

                e = 1;
                break;
            }

            long time = System.nanoTime();
            if(time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                e = 1;
            } else if(SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && e >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding selector.", Integer.valueOf(e));
                this.rebuildSelector();
                selector = this.selector;
                selector.selectNow();
                e = 1;
                break;
            }

            currentTimeNanos = time;
        }

        if(e > 3 && logger.isDebugEnabled()) {
            logger.debug("Selector.select() returned prematurely {} times in a row.", Integer.valueOf(e - 1));
        }
    } catch (CancelledKeyException var13) {
        if(logger.isDebugEnabled()) {
            logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", var13);
        }
    }

}

  可以看到调用selectNow方法是直接调用java nio的select.selectNow方法,而Netty的select方法中有一个参数oldWakeUp记录当前操作是否是唤醒状态(不太清楚这个唤醒状态的作用),每次进行select操作之前,会将其标志位false,表示要进行select操作,而且是未唤醒状态。

  Netty中的select方法首先是根据当前时间时间去计算截止时间,这里使用到了超时队列(超时队列的作用也不太清楚),然后根据截止时间去计算超时时间,如果超时时间小于0,就执行selectNow操作,并退出此次select操作,否则执行带有超时时间的select方法,如果返回的selectKey不等于0,也就是有channel在select上注册了,或者该select操作被唤醒了(?),或者任务队列中有了任务,定时任务队列中有了任务,都会break出来。

  接下来的代码逻辑是避免JDK空轮询的,当JDK发生了空轮训,select会直接返回,这时并没有IO事件到达,也没有超过超时时间,这样会导致线程进入死循环,CPU利用率飙升至100%,JDK到现在也并没有解决这个问题。

  而Netty是通过记录空轮询的次数,如果这个次数达到了一个上限,上限默认是512,那么就新建一个selector,将注册在老selector上的channel注册到新的selector上,并且关闭老的selector,将新的selector替代老的selector。Netty通过rebuildSelector方法重建selector。

public void rebuildSelector() {
    if(!this.inEventLoop()) {
        this.execute(new Runnable() {
            public void run() {
                NioEventLoop.this.rebuildSelector();
            }
        });
    } else {
        Selector oldSelector = this.selector;
        if(oldSelector != null) {
            Selector newSelector;
            try {
                newSelector = this.openSelector();
            } catch (Exception var9) {
                logger.warn("Failed to create a new Selector.", var9);
                return;
            }

            int nChannels = 0;

            label69:
            while(true) {
                try {
                    Iterator t = oldSelector.keys().iterator();

                    while(true) {
                        if(!t.hasNext()) {
                            break label69;
                        }

                        SelectionKey key = (SelectionKey)t.next();
                        Object a = key.attachment();

                        try {
                            if(key.isValid() && key.channel().keyFor(newSelector) == null) {
                                int e = key.interestOps();
                                key.cancel();
                                SelectionKey var14 = key.channel().register(newSelector, e, a);
                                if(a instanceof AbstractNioChannel) {
                                    ((AbstractNioChannel)a).selectionKey = var14;
                                }

                                ++nChannels;
                            }
                        } catch (Exception var11) {
                            logger.warn("Failed to re-register a Channel to the new Selector.", var11);
                            if(a instanceof AbstractNioChannel) {
                                AbstractNioChannel var13 = (AbstractNioChannel)a;
                                var13.unsafe().close(var13.unsafe().voidPromise());
                            } else {
                                NioTask task = (NioTask)a;
                                invokeChannelUnregistered(task, key, var11);
                            }
                        }
                    }
                } catch (ConcurrentModificationException var12) {
                    ;
                }
            }

            this.selector = newSelector;

            try {
                oldSelector.close();
            } catch (Throwable var10) {
                if(logger.isWarnEnabled()) {
                    logger.warn("Failed to close the old Selector.", var10);
                }
            }

            logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
        }
    }
}

2)processSelectedKey()

netty中selectedKey的优化

  通过调用Selector的selectedKeys()方法来访问已选择键集合,此时返回的是HashSet。但是netty是通过反射的方式,将HashSet替换成数组pssSelectedKeysOptimized去处理IO事件。


private Selector openSelector() {
    AbstractSelector selector;
    try {
        selector = this.provider.openSelector();
    } catch (IOException var7) {
        throw new ChannelException("failed to open a new selector", var7);
    }

    if(DISABLE_KEYSET_OPTIMIZATION) {
        return selector;
    } else {
        try {
            SelectedSelectionKeySet t = new SelectedSelectionKeySet();
            Class selectorImplClass = Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());
            if(!selectorImplClass.isAssignableFrom(selector.getClass())) {
                return selector;
            }

            Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
            Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
            selectedKeysField.setAccessible(true);
            publicSelectedKeysField.setAccessible(true);
            selectedKeysField.set(selector, t);
            publicSelectedKeysField.set(selector, t);
            this.selectedKeys = t;
            logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
        } catch (Throwable var6) {
            this.selectedKeys = null;
            logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, var6);
        }

        return selector;
    }
}

  首先会调用JDK的openSelector方法返回创建的selector,然后会判断是否要对keySet进行优化,通过判断DISABLE_KEYSET_OPTIMIZATION,是否要对keyset进行优化,默认是要对keyset进行优化的。这里的SelectedSelectionKeySet是优化过后的keyset,底层是通过两个数组加上两个数组的大小进行实现的,这样可以使得add操作达到o(1)的时间复杂度(但是是HashSet的add操作时间复杂度不也是o(1))嘛,

processSelectedKey调用processSelectedKeysOptimized

  该方法的流程就是遍历数组中所有的selectedKey,一旦遍历完,就将该引用指向为空。获取每一个selectorKey对应的channel,然后通过调用processSelectedKey去处理该channel上感兴趣的事件。

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
    int i = 0;
    //遍历SelectedKsys
    while(true) {
        SelectionKey k = selectedKeys[i];
        if(k == null) {
            return;
        }

        selectedKeys[i] = null;     //获取selectKey对应的channel
        Object a = k.attachment();
        if(a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel)((AbstractNioChannel)a));
        } else {
            NioTask task = (NioTask)a;
            processSelectedKey(k, (NioTask)task);
        }

        if(this.needsToSelectAgain) {
            while(selectedKeys[i] != null) {
                selectedKeys[i] = null;
                ++i;
            }

            this.selectAgain();
            selectedKeys = this.selectedKeys.flip();
            i = -1;
        }

        ++i;
    }
}

  这里处理selector上面的IO事件,底层其实都是通过channel的unsafe类进行操作的,这里read和accept事件对应的都是channel的read方法。

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if(!k.isValid()) {
        unsafe.close(unsafe.voidPromise());
    } else {
        try {
            int ignored = k.readyOps();       //如果是read或者accept事件就对channel进行读操作
            if((ignored & 17) != 0 || ignored == 0) {
                unsafe.read();
                if(!ch.isOpen()) {
                    return;
                }
            }
       //write事件
            if((ignored & 4) != 0) {
                ch.unsafe().forceFlush();
            }
       //connect事件
            if((ignored & 8) != 0) {
                int ops = k.interestOps();
                ops &= -9;
                k.interestOps(ops);
                unsafe.finishConnect();
            }
        } catch (CancelledKeyException var5) {
            unsafe.close(unsafe.voidPromise());
        }

    }
}

3)使用runAllTasks()执行任务队列中的事件

  

PriorityQueue(优先级队列),也是非线程安全的队列。

private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    if(task == null) {
        throw new NullPointerException("task");
    } else {
        if(this.inEventLoop()) {
            this.delayedTaskQueue.add(task);
        } else {
            this.execute(new Runnable() {
                public void run() {
                    SingleThreadEventExecutor.this.delayedTaskQueue.add(task);
                }
            });
        }

        return task;
    }
}

  runAllTask首先从定时任务队列中拉取定时任务,将需要执行的定时任务加入到普通任务队列中,并计算截止时间,然后循环的从普通任务队列中拉取任务,并执行任务,这里判断是否到达超时时间,是每相隔64个任务,就判断是否到达最大任务执行时间。为啥要每隔64个任务判断是否超时呢?因为nanoTime也是比较费时的。

protected boolean runAllTasks(long timeoutNanos) {
    this.fetchFromDelayedQueue();
    Runnable task = this.pollTask();
    if(task == null) {
        return false;
    } else {
        long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0L;

        long lastExecutionTime;
        while(true) {
            try {
                task.run();
            } catch (Throwable var11) {
                logger.warn("A task raised an exception.", var11);
            }

            ++runTasks;
            if((runTasks & 63L) == 0L) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if(lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = this.pollTask();
            if(task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        this.lastExecutionTime = lastExecutionTime;
        return true;
    }
}

  从定时队列中拉取任务,这里拉取的任务是拉取截止时间不超过nanoTime的任务,将任务从定时任务队列中删除,将任务加入到普通任务队列中。这个while循环执行完成之后,所有需要执行的定时任务全部都加入到普通任务队列中。

private void fetchFromDelayedQueue() {
    long nanoTime = 0L;

    while (true) {
        ScheduledFutureTask delayedTask = (ScheduledFutureTask) this.delayedTaskQueue.peek();
        if (delayedTask == null) {
            break;
        }

        if (nanoTime == 0L) {
            nanoTime = ScheduledFutureTask.nanoTime();
        }

        if (delayedTask.deadlineNanos() > nanoTime) {
            break;
        }

        this.delayedTaskQueue.remove();
        this.taskQueue.add(delayedTask);
    }
}

   定时任务队列是一个优先级队列,队列按照优先级进行排序,这里的优先级是每个任务的截止时间,队列是按照截止时间的早晚对任务进行排序的。

public int compareTo(Delayed o) {
    if(this == o) {
        return 0;
    } else {
        ScheduledFutureTask that = (ScheduledFutureTask)o;
        long d = this.deadlineNanos() - that.deadlineNanos();
        if(d < 0L) {
            return -1;
        } else if(d > 0L) {
            return 1;
        } else if(this.id < that.id) {
            return -1;
        } else if(this.id == that.id) {
            throw new Error();
        } else {
            return 1;
        }
    }
}

总结:

1.默认情况下,NioEventLoopGroup会创建2*cpu个数的线程池,在调用NioEventLoop.execute(task)的时候,如果当前的NioEventLoop没有创建自己的线程,就会创建线程。

2.Netty如何解决JDK空轮训bug?通过计算空轮训操作的个数,这里的空轮训的判断是既没有IO事件的到达,也没有达到超时时间,如果空轮训的个数超过阈值(512),就会新建一个selector,将旧selector的selectorKey注册到新的selector上,将旧的selector关闭,用新的selector替代旧的selector。

3.Netty在所有外部线程调用NioEventLoop的操作时,如果通过InEventLoop判断是否在NioEventLoop所属的线程,如果不在通过startThread启动NioEventLoop的线程,并且将任务添加到NioEventLoop的任务队列中,所有NioEventLoop对应一个线程,其中的操作只会被一个线程所执行,实现了异步串行无锁化。

原文地址:https://www.cnblogs.com/xiaobaituyun/p/10801336.html

时间: 2024-11-08 20:56:40

Netty源码分析之NioEventLoop执行流程的相关文章

Nginx源码分析—HTTP框架执行流程

HTTP框架动态执行中的大概流程:先与客户端建立TCP连接,接收HTTP请求行.头部并解析出他们的意义,再根据nginx.conf配置文件找到一些HTTP模块,使其一次合作者处理这个请求. 为了精确地控制超时,还需要把读写事件放置到定时器中. 通过事件模块提东的ngx_handle_read_event方法和ngx_handle_write_event方法,可以把相应的事件添加到epoll中,我们可以起到在满足事件触发条件时,ngxin进程会调用ngx_event_t事件的handler回调方法

Netty源码分析第2章(NioEventLoop)----&gt;第6节: 执行selector操作

Netty源码分析第二章: NioEventLoop 第六节: 执行select操作 分析完了selector的创建和优化的过程, 这一小节分析select相关操作 跟到跟到NioEventLoop的run方法: protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE

Netty源码分析第3章(客户端接入流程)----&gt;第4节: NioSocketChannel注册到selector

Netty源码分析第三章: 客户端接入流程 第四节: NioSocketChannel注册到selector 我们回到最初的NioMessageUnsafe的read()方法: public void read() { //必须是NioEventLoop方法调用的, 不能通过外部线程调用 assert eventLoop().inEventLoop(); //服务端channel的config final ChannelConfig config = config(); //服务端channel

Netty源码分析第2章(NioEventLoop)----&gt;第7节: 处理IO事件

Netty源码分析第二章: NioEventLoop 第七节:处理IO事件 上一小节我们了解了执行select()操作的相关逻辑, 这一小节我们继续学习select()之后, 轮询到io事件的相关逻辑: 回到NioEventLoop的run()方法: protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case Sele

Netty源码分析第3章(客户端接入流程)----&gt;第5节: 监听读事件

Netty源码分析第三章: 客户端接入流程 第五节: 监听读事件 我们回到AbstractUnsafe的register0()方法: private void register0(ChannelPromise promise) { try { //省略代码 //做实际的注册 doRegister(); neverRegistered = false; registered = true; //触发事件 pipeline.invokeHandlerAddedIfNeeded(); safeSetS

Netty源码分析第3章(客户端接入流程)----&gt;第3节: NioSocketChannel的创建

Netty源码分析第三章: 客户端接入流程 第三节: NioSocketChannel的创建 回到上一小结的read()方法: public void read() { //必须是NioEventLoop方法调用的, 不能通过外部线程调用 assert eventLoop().inEventLoop(); //服务端channel的config final ChannelConfig config = config(); //服务端channel的pipeline final ChannelPi

netty 源码分析二

以服务端启动,接收客户端连接整个过程为例分析, 简略分为 五个过程: 1.NioServerSocketChannel 管道生成, 2.NioServerSocketChannel 管道完成初始化, 3.NioServerSocketChannel注册至Selector选择器, 4.NioServerSocketChannel管道绑定到指定端口,启动服务 5.NioServerSocketChannel接受客户端的连接,进行相应IO操作 Ps:netty内部过程远比这复杂,简略记录下方便以后回忆

Netty源码分析第4章(pipeline)----&gt;第7节: 前章节内容回顾

Netty源码分析第四章: pipeline 第七节: 前章节内容回顾 我们在第一章和第三章中, 遗留了很多有关事件传输的相关逻辑, 这里带大家一一回顾 首先看两个问题: 1.在客户端接入的时候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))为什么会调用到ServerBootstrap的内部类ServerBootstrapAcceptor中的channelRead()方法 2.客户端handler是什么时候被添加

Netty源码分析-- 处理客户端接入请求(八)

这一节我们来一起看下,一个客户端接入进来是什么情况.首先我们根据之前的分析,先启动服务端,然后打一个断点. 这个断点打在哪里呢?就是NioEventLoop上的select方法上. 然后我们启动一个客户端. 然后我们debug看到,selectedKey的数量 = 1,说明有accept或者读写等事件发生. 接下就会进 processSelectedKeys() 我们上一节讲到,这里的attach就是NioServerSocketChannel, 我们进入 processSelectedKey(