netty-eventloop

DiscardServer

 1 package io.netty.example.discard;
 2
 3 import io.netty.bootstrap.ServerBootstrap;
 4
 5 import io.netty.channel.ChannelFuture;
 6 import io.netty.channel.ChannelInitializer;
 7 import io.netty.channel.ChannelOption;
 8 import io.netty.channel.EventLoopGroup;
 9 import io.netty.channel.nio.NioEventLoopGroup;
10 import io.netty.channel.socket.SocketChannel;
11 import io.netty.channel.socket.nio.NioServerSocketChannel;
12
13 /**
14  * Discards any incoming data.
15  */
16 public class DiscardServer {
17
18     private int port;
19
20     public DiscardServer(int port) {
21         this.port = port;
22     }
23
24     public void run() throws Exception {
25         EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
26         EventLoopGroup workerGroup = new NioEventLoopGroup();
27         try {
28             ServerBootstrap b = new ServerBootstrap(); // (2)
29             b.group(bossGroup, workerGroup)
30              .channel(NioServerSocketChannel.class) // (3)
31              .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
32                  @Override
33                  public void initChannel(SocketChannel ch) throws Exception {
34                      ch.pipeline().addLast(new DiscardServerHandler());
35                  }
36              })
37              .option(ChannelOption.SO_BACKLOG, 128)          // (5)
38              .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
39
40             // Bind and start to accept incoming connections.
41             ChannelFuture f = b.bind(port).sync(); // (7)
42
43             // Wait until the server socket is closed.
44             // In this example, this does not happen, but you can do that to gracefully
45             // shut down your server.
46             f.channel().closeFuture().sync();
47         } finally {
48             workerGroup.shutdownGracefully();
49             bossGroup.shutdownGracefully();
50         }
51     }
52
53     public static void main(String[] args) throws Exception {
54         int port;
55         if (args.length > 0) {
56             port = Integer.parseInt(args[0]);
57         } else {
58             port = 8080;
59         }
60         new DiscardServer(port).run();
61     }
62 }

构造Bootstrap时,有一个很重要的元素就是eventLoopGroup,eventLoopGroup继承了eventExecutorGroup,eventExecutorGroup可以认为是eventExecutor的容器,可以通过next()获取到eventExecutor。由下面的类图可知eventExecutorGroup继承了executorService,其具有executorService的特性,比如执行提交的任务(通过next()委托给具体的eventExecutor)并获得代表任务执行进度的Future、关闭执行器(shutdown)等。

eventLoopGroup在eventExecutorGroup的基础上提供了register方法,即可以将channel注册在eventLoop上(eventLoopGroup的register内部仍然会调用next(),将channel注册到具体的eventLoop上)。所谓注册就是将eventLoop与channel绑定,对于nio来说,一个eventLoop(NioEventLoop)会绑定多个channel(还会将channel注册在与eventLoop绑定的selector上),对于oio来说,一个eventLoop(ThreadPerChannelEventLoop)只能绑定一个channel。

下面是EventLoop的类图,EventLoop继承了EventLoopGroup,EventLoop额外提供了inEventLoop()方法来判断当前线程是否在EventLoop中。

我们知道ExecutorService屏蔽了任务如何执行的具体机制,但通常Executor都会持有一个线程池来执行任务,并通过一个阻塞队列来缓存待执行的任务,那EventLoopGroup是怎样的呢?

这里以Nio为例,serverSocketChannel在accept后会将socketChannel注册在EventLoop上,并在EventLoop上execute完成注册操作

//childGroup即在ServerBootstrap中配置的childGroup(EventLoopGroup)
childGroup.register(child)

NioEventLoop的execute方法

 1     public void execute(Runnable task) {
 2         if (task == null) {
 3             throw new NullPointerException("task");
 4         }
 5
 6         boolean inEventLoop = inEventLoop();
 7         if (inEventLoop) {
 8             addTask(task);
 9         } else {
10             //startThread()会调用executor的execute方法,这个executor就是一个java.util.concurrent.Executor
11             startThread();
12             addTask(task);
13             if (isShutdown() && removeTask(task)) {
14                 reject();
15             }
16         }
17
18         if (!addTaskWakesUp && wakesUpForTask(task)) {
19             wakeup(inEventLoop);
20         }
21     }

EventLoop的execute最终会调用EventLoop中executor的execute,这个executor是哪儿来的呢?

 1 public class NioEventLoopGroup extends MultithreadEventLoopGroup {
 2     //......
 3
 4     //threadFactory会被包装为一个executor,
 5     //该executor的execute方法就是threadFactory.newThread(command).start();
 6     //threadFactory的newThread一般都会新建thread,因为thread实例只能start一次。
 7     public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
 8       this(nThreads, threadFactory, SelectorProvider.provider());
 9     }
10
11     //这里传入的executor就是EventLoop中的executor,
12     //nThreads表示EventExecutor(EventLoop)的个数,这些eventExecutor会持有同一个executor,
13     //Group会持有这些eventExecutor,即children,但children还未启动。
14     //children = new EventExecutor[nThreads];
15     //迭代children[i] = newChild(executor, args);
16     public NioEventLoopGroup(int nThreads, Executor executor) {
17       this(nThreads, executor, SelectorProvider.provider());
18     }
19
20     //......
21 }

由上分析可知,netty中EventLoop的核心在于executor,executor提供了真正的执行能力(即线程),下面就是NioEventLoop在executor中的执行的任务(这里的run并不是Runnable的run),可以看出,所谓EventLoop就是一个循环器,该循环器不停的在处理channel的IO事件。

 1 protected void run() {
 2   for (;;) {
 3     try {
 4       switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
 5         case SelectStrategy.CONTINUE:
 6           continue;
 7         case SelectStrategy.SELECT:
 8           select(wakenUp.getAndSet(false));
 9           if (wakenUp.get()) {
10               selector.wakeup();
11           }
12         default:
13           // fallthrough
14       }
15
16       cancelledKeys = 0;
17       needsToSelectAgain = false;
18       final int ioRatio = this.ioRatio;
19       if (ioRatio == 100) {
20         try {
21           processSelectedKeys();
22         } finally {
23           // Ensure we always run tasks.
24           runAllTasks();
25         }
26       } else {
27         final long ioStartTime = System.nanoTime();
28         try {
29             processSelectedKeys();
30         } finally {
31             // Ensure we always run tasks.
32             final long ioTime = System.nanoTime() - ioStartTime;
33             runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
34         }
35       }
36     } catch (Throwable t) {
37       handleLoopException(t);
38     }
39     // Always handle shutdown even if the loop processing threw an exception.
40     try {
41       if (isShuttingDown()) {
42         closeAll();
43         if (confirmShutdown()) {
44           return;
45         }
46       }
47     } catch (Throwable t) {
48       handleLoopException(t);
49     }
50   }
51 }

时间: 2024-07-31 19:19:30

netty-eventloop的相关文章

从Netty EventLoop实现上可以学到什么

本文主要讨论Netty NioEventLoop原理及实践,关于Netty NioEventLoop,首先要知道NioEventLoop是什么,为什么它会是Netty核心Reactor处理器,实现原理是什么,进而再讨论Netty对其的实现及使用上我们可以学到哪些. EventLoop是一个Reactor模型的事件处理器,一个EventLoop对应一个线程,其内部会维护一个selector和taskQueue,负责处理客户端请求和内部任务,内部任务如ServerSocketChannel注册.Se

Java网络编程和NIO详解9:基于NIO的网络编程框架Netty

Java网络编程和NIO详解9:基于NIO的网络编程框架Netty 转自https://sylvanassun.github.io/2017/11/30/2017-11-30-netty_introduction/ netty是基于NIO实现的异步事件驱动的网络编程框架,学完NIO以后,应该看看netty的实现,netty框架涉及的内容特别多,这里只介绍netty的基本使用和实现原理,更多扩展的内容将在以后推出. 本系列文章首发于我的个人博客:https://h2pl.github.io/ 欢迎

聊聊高并发(二)结合实例说说线程封闭和背后的设计思想

高并发问题抛去架构层面的问题,落实到代码层面就是多线程的问题.多线程的问题主要是线程安全的问题(其他还有活跃性问题,性能问题等). 那什么是线程安全?下面这个定义来自<Java并发编程实战>,这本书强烈推荐,是几个Java语言的作者合写的,都是并发编程方面的大神. 线程安全指的是:当多个线程访问某个类时,这个类始终都能表现出正确的行为. 正确指的是"所见即所知",程序执行的结果和你所预想的结果一致. 理解线程安全的概念很重要,所谓线程安全问题,就是处理对象状态的问题.如果要

[编织消息框架][netty源码分析]4 eventLoop 实现类NioEventLoop职责与实现

NioEventLoop 是jdk nio多路处理实现同修复jdk nio的bug 1.NioEventLoop继承SingleThreadEventLoop 重用单线程处理 2.NioEventLoop是组成 pool EventLoopGroup 基本单元 总之好多边界判断跟业务经验之类的代码,非常烦碎 重要属性 public final class NioEventLoop extends SingleThreadEventLoop { //绑定 selector Selector sel

Netty in Action (十七) 第七章节 EventLoop和线程模型

本章节包括: 1)线程模型总览 2)Event Loop概念和具体实现 3)任务调度 4)实现细节 简单地陈述一下,对于一个操作系统,编程语言,框架,或者应用来说,线程模型对其都是至关重要的一部分,在什么时间如何创建一个线程都会对你的代码执行有很重要的影响,所以对于开发人员而言,懂得在各种线程模型里面权衡利弊就是一个很重要的事情,是直接使用线程模型本身还是通过一些框架或者语言提供的线程框架对于开发者而言都是需要选择的 在这个章节,我们将会详细地讲解Netty的线程模型,这个模型是很强大的,且易于

Netty实战七之EventLoop和线程模型

简单地说,线程模型指定了操作系统.编程语言.框架或者应用程序的上下文中的线程管理的关键方面.Netty的线程模型强大但又易用,并且和Netty的一贯宗旨一样,旨在简化你的应用程序代码,同时最大限度地提高性能和可维护性. 1.线程模型概述 线程模型确定了代码的执行方式,由于我们总是必须规避并发执行可能会带来的副作用,所以理解所采用的并发模型(也有单线程的线程模型)的影响很重要. 因为具有多核心或多个CPU的计算机现在已经司空见惯,大多数的现代应用程序都利用了复杂的多线程处理技术以有效地利用系统资源

005——Netty之EventLoop与EventLoopGroup

Netty解决的事情 Netty主要解决两个相应关注领域.(1)异步和事件驱动的实现.(2)一组设计模式,将应用逻辑与网络层解耦. EventLoop接口 用于处理连接的生命周期中所发生的事件. 一个EventLoopGroup包含一个或者多个EventLoop 一个EventLoop在它的生命周期内只和一个Thread绑定 所有由EventLoop处理的I/O事件都将在它专有的Thread上被处理 一个Channel在它的生命周期内只注册于一个EventLoop 一个EventLoop可能会被

6. Netty源码分析之EventLoop与EventLoopGroup

一.NioEventLoop与NioEventLoopGroup的关系 二.NioEventLoop 1. 设计原理 1. 负责IO读写 2. 执行task.通过调用NioEventLoop的execute(Runnable task)方法实现.我们知道,为了防止资源竞争和并发操作,我们经常会判断当前操作线程是否为EventLoop线程,如果不是,则将操作封装成task放进NioEventLoop的执行队列中,这样就实现了局部无锁化. 3. 定时任务.通过调用NioEventLoop的sched

Netty学习之核心组件(EventLoop、EventLoopGroup)

一.EventLoop.EventLoopGroup概述 由下图所示,NioEventLop是EventLoop的一个具体实现,EventLoop是EventLoopGroup的一个属性,NioEventLoopGroup是EventLoopGroup的具体实现,都是基于ExecutorService进行的线程池管理,因此EventLoop.EventLoopGroup组件的核心作用就是进行Selector的维护以及线程池的维护. 其中EventLoop进行的是Selector的维护,如下图左:

Netty线程设计4——EventLoop

EventLoop的注释是这样的:用来处理被注册的Channel的I/O操作,根据具体实现来处理一个或多个Channel的I/O操作. EventLoop本身没有提供有用的接口,它主要是整合了EventExecutor和EventLoopGroup接口(这样的话,EventLoop就具备了任务执行和通道注册的能力). EventLoop的实现类有EmbeddedEventLoop和SingleThreadEventLoop,SingleThreadEventLoop中包含用于处理任务的线程而Em