Netty源码学习(二)NioEventLoopGroup

0. NioEventLoopGroup简介

NioEventLoopGroup可以理解为一个线程池,内部维护了一组线程,每个线程负责处理多个Channel上的事件,而一个Channel只对应于一个线程,这样可以回避多线程下的数据同步问题。

1. NioEventLoopGroup类图

2. 构造方法

new NioEventLoopGroup()方法会调用到MultithreadEventLoopGroup的构造方法:

   private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));//默认值为系统core数的两倍

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }

    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
     */
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);//如果采用无参的构造函数,传入的nThreads变量为0,此时线程数会被设置为系统core数*2
    }

然后会调用MultithreadEventExecutorGroup的构造方法:

    /**
     * Create a new instance.
     *
     * @param nThreads          the number of threads that will be used by this instance.
     * @param executor          the Executor to use, or {@code null} if the default should be used.
     * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.
     * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
     */
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        children = new EventExecutor[nThreads];//设定线程池大小

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);//新建nThreads个子线程
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {//如果新建子线程的过程中出错,则关闭所有子线程
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        chooser = chooserFactory.newChooser(children);//设定新任务的分配策略

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {//注册一些回调函数用于清理工作
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

其中比较重要的地方有两处:调用子类实现的newChild方法设置子线程,以及设置新任务的分配策略

先看一下NioEventLoopGroup中实现的newChild方法:

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

很简单的新建一个NioEventLoop对象并返回,我们下一节会介绍NioEventLoop

而chooserFactory.newChooser最终会跳转到DefaultEventExecutorChooserFactory里:

    @SuppressWarnings("unchecked")
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

虽然分配策略都是round-robin,但是在子线程的数量为2的幂时,可以用位运算来加速,效率很高。

Netty为了追求效率确实不择手段。

时间: 2024-10-10 23:41:16

Netty源码学习(二)NioEventLoopGroup的相关文章

Netty源码学习——EventLoopGroup原理:NioEventLoopGroup分析

类结构图: 不了解Executor接口原理的可以查看concurrent包中的api介绍,这里只介绍Netty中EventExecutorGroup的主要功能! 从类的结构图中可以看到EventExecutorGroup是直接继承ScheduledExecutorService这个接口的,为了说明白Group的原理这里顺便提一下ScheduledExecutorService的用途! java.util.concurrent.ScheduledExecutorService An Executo

netty 源码分析二

以服务端启动,接收客户端连接整个过程为例分析, 简略分为 五个过程: 1.NioServerSocketChannel 管道生成, 2.NioServerSocketChannel 管道完成初始化, 3.NioServerSocketChannel注册至Selector选择器, 4.NioServerSocketChannel管道绑定到指定端口,启动服务 5.NioServerSocketChannel接受客户端的连接,进行相应IO操作 Ps:netty内部过程远比这复杂,简略记录下方便以后回忆

Netty源码学习——Included transports(传输方式)

Transport API的核心: Channel接口 类图表示Channel含有Pipeline和Config接口,pipeline上一节有所介绍. Channel是线程安全的,这表示在多线环境下操作同一个Channel,不会有数据问题. final Channel channel = null; final ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8); // #1 Runnable writ

Netty源码学习——ChannelPipeline模型分析

参考Netty API io.netty.channel.ChannelPipeline A list of ChannelHandlers which handles or intercepts inbound events and outbount operations of aChannel.ChannelPipeline implements an advanced form of theIntercepting Filter pattern to give a user full co

Dubbo源码学习(二)

@Adaptive注解 在上一篇ExtensionLoader的博客中记录了,有两种扩展点,一种是普通的扩展实现,另一种就是自适应的扩展点,即@Adaptive注解的实现类. @Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE, ElementType.METHOD}) public @interface Adaptive { String[] value() default {}; } @Adapt

python 协程库gevent学习--gevent源码学习(二)

在进行gevent源码学习一分析之后,我还对两个比较核心的问题抱有疑问: 1. gevent.Greenlet.join()以及他的list版本joinall()的原理和使用. 2. 关于在使用monkey_patchall()之后隐式切换的问题. 下面我将继续通过分析源码及其行为来加以理解和掌握. 1. 关于gevent.Greenlet.join()(以下简称join)先来看一个例子: import gevent def xixihaha(msg): print(msg) gevent.sl

[spring源码学习]二、IOC源码——配置文件读取

一.环境准备 对于学习源码来讲,拿到一大堆的代码,脑袋里肯定是嗡嗡的,所以从代码实例进行跟踪调试未尝不是一种好的办法,此处,我们准备了一个小例子: package com.zjl; public class Person { private String name; public String getName() { return name; } public void setName(String name) { this.name = name; } public void sayHello

NETTY源码学习-DELIMITERBASEDFRAMEDECODER

看DelimiterBasedFrameDecoder的API,有举例: 接收到的ChannelBuffer如下: +--------------+ | ABC\nDEF\r\n | +--------------+ 经过DelimiterBasedFrameDecoder(Delimiters.lineDelimiter())之后,得到: +-----+-----+ | ABC | DEF | +-----+-----+ 而不是 +----------+ | ABC\nDEF | 为什么 ?

Bottle 框架源码学习 二

上一篇简单分析了route的基本用法 本篇分析一下run函数的运行原理 def run(app=None, server='wsgiref', host='127.0.0.1', port=8080,         interval=1, reloader=False, quiet=False, plugins=None,         debug=None, **kargs):          if NORUN: return     if reloader and not os.env

Nmap 源码学习二 整体架构

目录功能: docs :相关文档 libdnet-stripped :开源网络接口库 liblinear:开源大型线性分类库 liblua:开源Lua脚本语言库 libnetutil:基本的网络函数 libpcap:开源抓包库 libpcre:开源正则表达式库 macosx:xcode项目文件 mswin32:vs项目文件 nbase:Nmap封装的基础使用函数库 ncat:netcat网络工具,由Nmap实现 ndiff:比较Nmap扫描结果的实用命令 nmap-update:负责Nmap更新