Netty学习之核心组件(EventLoop、EventLoopGroup)

一、EventLoop、EventLoopGroup概述

  由下图所示,NioEventLop是EventLoop的一个具体实现,EventLoop是EventLoopGroup的一个属性,NioEventLoopGroup是EventLoopGroup的具体实现,都是基于ExecutorService进行的线程池管理,因此EventLoop、EventLoopGroup组件的核心作用就是进行Selector的维护以及线程池的维护。

  

  其中EventLoop进行的是Selector的维护,如下图左;EventLoopGroup用于线程组维护,并发控制,任务处理,如下图右。

    

  关于EventLoop以及EventLoopGroup的映射关系为:

  • 一个EventLoopGroup 包含一个或者多个EventLoop;
  • 一个EventLoop 在它的生命周期内只和一个Thread 绑定;
  • 所有由EventLoop 处理的I/O 事件都将在它专有的Thread 上被处理;
  • 一个Channel 在它的生命周期内只注册于一个EventLoop;
  • 一个EventLoop 可能会被分配给一个或多个Channel。

  Channel 为Netty 网络操作抽象类,EventLoop 主要是为Channel 处理 I/O 操作,两者配合参与 I/O 操作。当一个连接到达时,Netty 就会注册一个 Channel,然后从 EventLoopGroup 中分配一个 EventLoop 绑定到这个Channel上,在该Channel的整个生命周期中都是有这个绑定的 EventLoop 来服务的。

二、NioEventLoopGroup实现

  NioEventLoopGroup对象可以理解为一个线程池,内部维护了一组线程,每个线程负责处理多个Channel上的事件,而一个Channel只对应于一个线程,这样可以回避多线程下的数据同步问题。如下代码

     // 服务器端应用程序使用两个NioEventLoopGroup创建两个EventLoop的组,EventLoop这个相当于一个处理线程,是Netty接收请求和处理IO请求的线程。
        // 主线程组, 用于接受客户端的连接,但是不做任何处理,跟老板一样,不做事
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 从线程组, 当boss接受连接并注册被接受的连接到worker时,处理被接受连接的流量。
        EventLoopGroup workerGroup = new NioEventLoopGroup();

  其职责如下:

  • 作为服务端 Acceptor 线程,负责处理客户端的请求接入。
  • 作为客户端 Connector 线程,负责注册监听连接操作位,用于判断异步连接结果。
  • 作为 IO 线程,监听网络读操作位,负责从 SocketChannel 中读取报文。
  • 作为 IO 线程,负责向 SocketChannel 写入报文发送给对方,如果发生写半包,会自动注册监听写事件,用 于后续继续发送半包数据,直到数据全部发送完成。
  • 作为定时任务线程,可以执行定时任务,例如链路空闲检测和发送心跳消息等。
  • 作为线程执行器可以执行普通的任务线程(Runnable)。

  上面的代码 创建bossGroup及workerGroup时,使用了NioEventLoopGroup的无参构造方法,因此我们查看下NioEventLoopGroup的具体实现:

    /**
     * 1、首先我们看看NioEventLoopGroup的无参构造方法:
     * 作用:线程数为0
     */
    public NioEventLoopGroup() {
        this(0);
    }

    /**
     * 2、继续调用构造函数。
     * 作用:指定线程为0,且Executor为null
     */
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }

    /**
     * 3、继续调用构造函数
     * 作用:此构造方法它会指定selector的辅助类 "SelectorProvider.provider()"
     */
    public NioEventLoopGroup(int nThreads, Executor executor) {
        this(nThreads, executor, SelectorProvider.provider());
    }

    /**
     * 4、继续调用构造函数
     * 作用:初始化了一个默认的选择策略工厂,用于生成select策略
     */
    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }

    /**
     * 5、继续调用构造函数
     * 作用:指定拒绝策略:RejectedExecutionHandlers.reject()
     */
    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

  经过上面一系列的构造方法调用,此时参数值对应如下:

  • nThreads: 0
  • executor: null
  • selectorProvider: SelectorProvider.provider()
  • selectStrategyFactory: DefaultSelectStrategyFactory.INSTANCE
  • 以及指定了拒绝策略: RejectedExecutionHandlers.reject()

  继续分析剩余代码:

    /**
     * 6、从这里开始 调用父类 MultithreadEventLoopGroup 的构造函数
     * 作用: 就是当指定的线程数为0时,使用默认的线程数DEFAULT_EVENT_LOOP_THREADS,
     *      而DEFAULT_EVENT_LOOP_THREAD是在静态代码块中就被执行。
     */
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

    /**
     * 6.1 我们看下静态代码块
     * 作用:到这一步得出关键的一点:`如果初始化NioEventLoopGroup未指定线程数,默认是CPU核心数*2`。
     */
    private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2))
    }

    /**
     * 7、继续调用父类 MultithreadEventLoopGroup 构造函数
     * 作用:指定了一个EventExecutor的选择工厂DefaultEventExecutorChooserFactory,
     *      此工厂主要是用于选择下一个可用的EventExecutor
     */
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }

    /**
     * 8、继续调用父类 MultithreadEventLoopGroup 构造函数 这里就是核心代码 删除部分非核心代码
     *    作用单独分析
     */
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {

        //1、
        //executor校验非空, 如果为空就创建ThreadPerTaskExecutor, 该类实现了 Executor接口
        // 这个executor 是用来执行线程池中的所有的线程,也就是所有的NioEventLoop,其实从
        //NioEventLoop构造器中也可以知道,NioEventLoop构造器中都传入了executor这个参数。
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        //2、
        //这里的children数组, 其实就是线程池的核心实现,线程池中就是通过指定的线程数组来实现线程池;
        //数组中每个元素其实就是一个EventLoop,EventLoop是EventExecutor的子接口。
        children = new EventExecutor[nThreads];

        //for循环实例化children数组,NioEventLoop对象
        for (int i = 0; i < nThreads; i++) {
            boolean success = false;

            //3、
            //newChild(executor, args) 函数在NioEventLoopGroup类中实现了,
            // 实质就是就是存入了一个 NIOEventLoop类实例
            children[i] = newChild(executor, args);
            success = true;
        }

        //4、实例化线程工厂执行器选择器: 根据children获取选择器
        chooser = chooserFactory.newChooser(children);

        //5、为每个EventLoop线程添加 线程终止监听器
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        //6、将children 添加到对应的set集合中去重, 表示只可读。
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }
}

/**
 * 8.3.1 我们再来看下 newChild(executor, args) 里的方法
 * 我们可以看到 返回的就是一个 NioEventLoop
 */
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

  总结以上:

1. NioEventLoopGroup初始化时未指定线程数,那么会使用默认线程数,即 `线程数 = CPU核心数 * 2`;
2. 每个NioEventLoopGroup对象内部都有一组可执行的`NioEventLoop数组`,其大小是 nThreads, 这样就构成了一个线程池, `一个NIOEventLoop可以理解成就是一个线程`。
3. 所有的NIOEventLoop线程是使用相同的 executor、SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler以及是属于某一个
    NIOEventLoopGroup的。这一点从 newChild(executor, args); 方法就可以看出:newChild()的实现是在NIOEventLoopGroup中实现的。
4. 当有IO事件来时,需要从线程池中选择一个线程出来执行,这时候的NioEventLoop选择策略是由GenericEventExecutorChooser实现的,并调用该类的next()方法。
5. 每个NioEventLoopGroup对象都有一个NioEventLoop选择器与之对应,其会根据NioEventLoop的个数,动态选择chooser(如果是2的幂次方,则按位运算,否则使用普通的轮询)

  综上所述,得出NioEventLoopGroup主要功能就是为了创建一定数量的NioEventLoop,而真正的重点就在NioEventLoop中,它是整个netty线程执行的关键。

原文地址:https://www.cnblogs.com/jing99/p/12515157.html

时间: 2024-10-08 05:50:14

Netty学习之核心组件(EventLoop、EventLoopGroup)的相关文章

Netty学习之服务器端创建

一.服务器端开发时序图 图片来源:Netty权威指南(第2版) 二.Netty服务器端开发步骤 使用Netty进行服务器端开发主要有以下几个步骤: 1.创建ServerBootstrap实例 ServerBootstrap b=new ServerBootstrap(); ServerBootstrap是Netty服务器端的启动辅助类,提供了一系列的方法用于设置服务器端启动相关的参数. 2.设置并绑定Reactor线程池 EventLoopGroup bossGruop=new NioEvent

Netty实战七之EventLoop和线程模型

简单地说,线程模型指定了操作系统.编程语言.框架或者应用程序的上下文中的线程管理的关键方面.Netty的线程模型强大但又易用,并且和Netty的一贯宗旨一样,旨在简化你的应用程序代码,同时最大限度地提高性能和可维护性. 1.线程模型概述 线程模型确定了代码的执行方式,由于我们总是必须规避并发执行可能会带来的副作用,所以理解所采用的并发模型(也有单线程的线程模型)的影响很重要. 因为具有多核心或多个CPU的计算机现在已经司空见惯,大多数的现代应用程序都利用了复杂的多线程处理技术以有效地利用系统资源

Netty学习篇--整合springboot

经过前面的netty学习,大概了解了netty各个组件的概念和作用,开始自己瞎鼓捣netty和我们常用的项目的整合(很简单的整合) 项目准备 工具:IDEA2017 jar包导入:maven 项目框架:springboot+netty 项目操作 右键创建一个maven项目,项目名称: hetangyuese-netty-03(项目已上传github) 项目完整结构 ? maven导包 <!-- netty start --> <dependency> <groupId>

Netty学习——基于netty实现简单的客户端聊天小程序

Netty学习——基于netty实现简单的客户端聊天小程序 效果图,聊天程序展示 (TCP编程实现) 后端代码: package com.dawa.netty.chatexample; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEven

Netty学习——通过websocket编程实现基于长连接的双攻的通信

Netty学习(一)基于长连接的双攻的通信,通过websocket编程实现 效果图,客户端和服务器端建立起长连接,客户端发送请求,服务器端响应 但是目前缺少心跳,如果两个建立起来的连接,一个断网之后,另外一个是感知不到对方已经断掉的.以后使用心跳技术来进行连接检测 须知: 状态码101,代表 协议转换,从HTTP协议升级为WebSocket协议 HTTP协议,一般访问的时候:是 Http://localhost:8080/ws WebSocket协议,访问的时候,需要是:ws://localho

Netty学习——Netty和Protobuf的整合(一)

Netty学习——Netty和Protobuf的整合 Protobuf作为序列化的工具,将序列化后的数据,通过Netty来进行在网络上的传输 1.将proto文件里的java包的位置修改一下,然后再执行一下protoc 异常捕获:启动服务器端正常,在启动客户端的时候,发送消息,报错 警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the l

netty 学习资料

最近在做一个网页遥控器的项目,用到了netty,但还未发现比较系统完整的netty博客教程,所以打算自己写一个netty学习教程,会每天更新一篇,欢迎交流. 先给大家提供一些资料: 1. 比较简短易懂的有实例的系列教程,涉及到了netty关键特性:但个人觉得比较速成,不系统,不深入 http://www.coderli.com/netty-course-hello-world http://www.coderli.com/netty-two-concepts http://www.coderli

netty学习资源收集

Netty学习笔记 Netty+In+Action中文版.pdf

(二)Netty学习笔记之服务端启动

本文将不会对netty中每个点分类讲解,而是一个服务端启动的代码走读,在这个过程中再去了解和学习,这也是博主自己的学习历程.下面开始正文~~~~ 众所周知,在写netty服务端应用的时候一般会有这样的启动代码: (代码一) 1 EventLoopGroup bossGroup = new NioEventLoopGroup(1); 2 EventLoopGroup workerGroup = new NioEventLoopGroup(); 3 try { 4 ServerBootstrap b