Netty中execution包功能详解

  1. 技术点描述

Netty中关于多线程处理的代码很多(netty框架的实现本身就是异步处理机制),此文档仅针对于execution包的功能做详细解说。

以下是整个包的目录结构:

包中的调用关系如下图所示:

  1. 实现方案

  1. 参考源码包

以下是对此包中的源码的分析(请注意后四个类为此包中最重要的类)

  1. ChannelEventRunnableFilter

此接口定义了一个抽象方法:

boolean filter(ChannelEventRunnable event);

如果传入的event是由Executor处理的,则返回true。

  1. ChannelUpstreamEventRunnableFilter

此类实现了ChannelEventRunnableFilter接口,判断传入的ChannelEventRunnable实例是不是属于ChannelDownstreamEventRunnable类

  1. ChannelDownstreamEventRunnableFilter

此类实现了ChannelEventRunnableFilter接口,判断传入的ChannelEventRunnable实例是不是属于ChannelDownstreamEventRunnable类

  1. ChannelEventRunnable

此抽象类实现了Runnable, EstimatableObjectWrapper两个接口

此类中有一个run方法:

public final void run() {

try {

PARENT.set(executor);

doRun();

finally {

PARENT.remove();

}

}

protected abstract void doRun();

其中PARENT是一个ThreadLocal<Executor>类型的变量,仅供内部使用,告诉Executor当前worker获取到一个工作线程。

其中doRum();方法是一个抽象方法,需要根据不同的业务做不同的处理(常常是发布上行数据sendUpstream或者下行数据sendDownstream)。

实现了doRun();方法的类为ChannelUpstreamEventRunnable和ChannelDownstreamEventRunnable

  1. ChannelUpstreamEventRunnable

继承自ChannelEventRunnable抽象类,重写了doRun();方法

  1. ChannelDownstreamEventRunnable

继承自ChannelEventRunnable抽象类,重写了doRun();方法

  1. ChainedExecutor

这是一个特殊的Executor,这个Executor可以将多个Executors和ChannelEventRunnableFilter连接起来使用。

此类中的比较重要的方法:

public ChainedExecutor(ChannelEventRunnableFilter filter, Executor cur, Executor next) {

}

按照给出的ChannelEventRunnableFilter实例filter创建一个ChainedExecutor实例,如果filter是ChannelEventRunnableFilter的实例,则调用当前的Executor实例cur执行,否则,调用next实例执行。

  1. ExecutionHandler

此类的功能就是将一个ChannelEvent转发给Executor执行多线程操作。

当我们自定义的ChannelHandler执行阻塞操作时会花费较长时间,或者当我们的ChannelHandler访问诸如数据库等非CPU业务逻辑处理单元时,我们会用到这个类。执行上述操作时,如果在我们的pipeline中不采用ExecutorHandler,而进程比较多时,就可能会出现当前I/O线程无法执行I/O操作,从而出现一些想不到的异常情况。

在大多数情况下,ExecutorHandler会与OrderedMemoryAwareThreadPoolExecutor耦合在一起使用,这是因为这么做可以保证事件的执行顺序而且在负载较大情况下能够有效防止OutOfMemoryError

下面是调用的关键代码:

设置ExecutionHandler:

static ExecutionHandler EXECUTIONHANDLER = new ExecutionHandler(

new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));

启动服务时factory设置

TCPSERVER_BOOTSTRAP.setPipelineFactory(new TCPServerPipelineFactory(EXECUTIONHANDLER));

PipelineFactory的设置:

public class TCPServerPipelineFactory implements ChannelPipelineFactory {

private final ExecutionHandler EXECUTIONHANDLER;

public TCPServerPipelineFactory(ExecutionHandler executionHandler) {

this.EXECUTIONHANDLER = executionHandler;

}

@Override

public ChannelPipeline getPipeline() throws Exception {

ChannelPipeline pipeline = Channels.pipeline();

//        pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));

//        pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));

pipeline.addLast("decoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));

//     pipeline.addLast("encoder", new LengthFieldPrepender(4, false));

//        pipeline.addLast("GLOBAL_TRAFFIC_SHAPING", new ChannelTrafficShapingHandler(new HashedWheelTimer()));

pipeline.addLast("handler", this.EXECUTIONHANDLER);

pipeline.addLast("handler", new TCPServerHandler());

return pipeline;

}

当服务关闭时,需注意:

public static void tcpServerShutDown(){

TCPSERVER_BOOTSTRAP.releaseExternalResources();

EXECUTIONHANDLER.releaseExternalResources();

LOGGER.info("TCP服务已关闭");

}

针对其中源码分析发现,主要起作用的两个方法如下:

public void handleUpstream(

ChannelHandlerContext context, ChannelEvent e) throws Exception {

if (handleUpstream) {

executor.execute(new ChannelUpstreamEventRunnable(context, e, executor));

else {

context.sendUpstream(e);

}

}

public void handleDownstream(

ChannelHandlerContext ctx, ChannelEvent e) throws Exception {

// check if the read was suspend

if (!handleReadSuspend(ctx, e)) {

if (handleDownstream) {

executor.execute(new ChannelDownstreamEventRunnable(ctx, e, executor));

else {

ctx.sendDownstream(e);

}

}

}

其中在处理上下行数据的时候加入了Executor,采用多线程执行,具体的是采用的哪种Executor,常用哪些Executor,以及各种Executor的区别将在下面分别予以介绍。

  1. MemoryAwareThreadPoolExecutor

继承自ThreadPoolExecutor,具有如下特性:

当队列中有太多任务时,ThreadPoolExecutor会阻塞任务的提交,此时,每个channel和每个Executor的limition都会起作用。

当一个任务(如 Runnable)被提交以后,MemoryAwareThreadPoolExecutor会调用ObjectSizeEstimator.estimateSize(Object)来获取任务的预估大小(按字节计算),以此来计算这个尚未执行的任务的内存开销。

当此未执行的任务的总大小超出了channel或者executor的可执行大小的阀值,为了保证总处理的大小在阀值以下,队列中的任务未执行完以前,任何execute(Runnable)将会阻塞(挂起)。

替代任务大小估算策略:

虽然默认的实现类已经尽了最大的可能去估算位置类型对象的大小,然而,我们仍然希望避免错误的估算任务大小,这样,一个好的办法就是采用一个ObjectSizeEstimator的实现类来替代DefaultObjectSizeEstimator。

应用场景:

1.将MemoryAwareThreadPoolExecutor从ExecutionHandler独立出来

2.提交的任务的类型不是ChannelEventRunnable,或者ChannelEventRunnable中的MessageEvent消息类型不是ChannelBuffer

下面是具体的对ObjectSizeEstimator的实现类的改写(此段代码未经测试)

public class MyRunnable implements Runnable {

private final byte[] data;

public MyRunnable(byte[] data) {

this.data = data;

}

public void run() {

// Process ‘data‘ ..

}

}

public class MyObjectSizeEstimator extends DefaultObjectSizeEstimator {

@Override

public int estimateSize(Object o) {

if (o instanceof MyRunnable) {

return ((MyRunnable) o).data.length + 8;

}

return super.estimateSize(o);

}

}

ThreadPoolExecutor pool = new MemoryAwareThreadPoolExecutor(

16, 65536, 1048576, 30, TimeUnit.SECONDS,

new MyObjectSizeEstimator(),

Executors.defaultThreadFactory());

pool.execute(new MyRunnable(data));

注意:这个Executor不会因为Channel相同而维持ChannelEvent的先后顺序(异步),对于此问题的解决办法,将在后文中给予说明。

下面针对此类中常用的方法进行源码分析:

参数最多的构造方法:

MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactorythreadFactory){

...

}

Parameters:

corePoolSize 活动线程的最大值

maxChannelMemorySize 每个channel中队列中的events最大总大小。0表示无效

maxTotalMemorySize当前channel中队列中的events最大总大小。0表示无效

keepAliveTime 线程不活动后多长时间后自动关闭,默认为30

unit 保持活动的时间单元,默认为TimeUnit.SECONDS

threadFactory 当前线程池的线程工厂,默认为Executors.defaultThreadFactory()

objectSizeEstimator 当前线程池的预估对象大小,默认为new DefaultObjectSizeEstimator()

  1. OrderedMemoryAwareThreadPoolExecutor

此类继承了MemoryAwareThreadPoolExecutor,并对MemoryAwareThreadPoolExecutor中来自同一个Channel的ChannelEvent事件不能按顺序执行做了改进,使其能够按顺序执行。这个类中的几乎所有的功能都是继承自他的父类。

ChannelEvent执行顺序

当采用MemoryAwareThreadPoolExecutor时,事件的处理顺序可能是这样的:

--------------------------------> Timeline -------------------------------->

Thread X: --- Channel A (Event 2) --- Channel A (Event 1) --------------------------->

Thread Y: --- Channel A (Event 3) --- Channel B (Event 2) --- Channel B (Event 3) --->

Thread Z: --- Channel B (Event 1) --- Channel B (Event 4) --- Channel A (Event 4) --->

而采用OrderedMemoryAwareThreadPoolExecutor后的事件处理顺序会变成这样:

-------------------------------------> Timeline ------------------------------------>

Thread X: --- Channel A (Event A1) --. .-- Channel B (Event B2) --- Channel B (Event B3) --->

\ /

X

/ \

Thread Y: --- Channel B (Event B1) --‘ ‘-- Channel A (Event A2) --- Channel A (Event A3) --->

但是,这样也有一个坏处,当同一个Channel中的前一个ChannelEvent未执行完时,那么下一个ChannelEvent就不会执行。

还有一点需要注意:并不是说ChannelA这次调用的是ThreadX,下次再发ChannelEvent时就还是会调用ThreadX,这点无法保证。

换种方式来维持每个通道的事件执行顺序

OrderedMemoryAwareThreadPoolExecutor中,Executor会采用Channel来作为主键,用以维持每个通道(Channel)的执行顺序,然而,我们可以修改为其他的值作为顺序执行的主键。例如:远程Channel的IP地址。

具体实现方式(代码)如下:

public class RemoteAddressBasedOMATPE extends OrderedMemoryAwareThreadPoolExecutor {

... Constructors ...

@Override

protected ConcurrentMap<Object, Executor> newChildExecutorMap() {

// The default implementation returns a special ConcurrentMap that

// uses identity comparison only (see IdentityHashMap).

// Because SocketAddress does not work with identity comparison,

// we need to employ more generic implementation.

return new ConcurrentHashMap<Object, Executor>

}

protected Object getChildExecutorKey(ChannelEvent e) {

// Use the IP of the remote peer as a key.

return ((InetSocketAddress) e.getChannel().getRemoteAddress()).getAddress();

}

// Make public so that you can call from anywhere.

public boolean removeChildExecutor(Object key) {

super.removeChildExecutor(key);

}

}

注意:此处一定要当心子类中executor map的内存泄露问题,当整个主键的生命周期结束(如来自相同IP的所有连接都已经关闭了)后,一定要调用removeChileExecutor(Object)方法。请牢记:调用removeChileExecutor(Object)方法后,此主键可能还会出现(如移除掉那个IP后可能还会出现来自那个IP的新的连接),如果不确定此种情况是否会发生,可以定期从子类的executor map中修改旧有的未使用的或是失效的key。解决方案如下:

RemoteAddressBasedOMATPE executor = ...;

on every 3 seconds:

for (Iterator<Object> i = executor.getChildExecutorKeySet().iterator; i.hasNext();) {

InetAddress ip = (InetAddress) i.next();

if (there is no active connection from ‘ip‘ now &&

there has been no incoming connection from ‘ip‘ for last 10 minutes) {

i.remove();

}

}

如果预期的key的最大值比较小而且比较确定,这样可以使用"弱"map来替代自己管理key的生命周期(如ConcurrentWeakHashMap或者 synchronized WeakHashMap)。

注:"弱"map 指仅当虚拟机内存不足时会对此map清空,正常情况下,不会主动做任何处理

此类能够按顺序执行来自同一个Channel的ChannelEvent源码分析:

此类定义了一个ConcurrentMap<Object, Executor>类型的全局变量childExecutors,它的key存放的是channel。

protected Object getChildExecutorKey(ChannelEvent e) {

return e.getChannel();

}

它的value存放的是Executor。

执行execute操作时会先根据map查找,从而实现顺序执行。

  1. OrderedDownstreamThreadPoolExecutor

前面所研究的均是针对于上行数据(客户端向到服务器段发送数据,数据处理的执行顺序),此类主要是针对下行数据处理,使得下行数据能够按照上来的顺序下发下去。此类继承了OrderedMemoryAwareThreadPoolExecutor类,功能几乎全部与OrderedMemoryAwareThreadPoolExecutor类相同。

  1. Demo实现

  1. SocketServer

static ExecutionHandler EXECUTION_UP_HANDLER = new ExecutionHandler(

new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576),false,true);

static ExecutionHandler EXECUTION_DOWN_HANDLER = new ExecutionHandler(

new OrderedDownstreamThreadPoolExecutor(16),true,false);

/**TCP方式*/

static ChannelFactory TCPCHANNEL_FACTORY = new NioServerSocketChannelFactory(

Executors.newCachedThreadPool(),

Executors.newCachedThreadPool());

/**TCP方式*/

static ServerBootstrap TCPSERVER_BOOTSTRAP = new ServerBootstrap(TCPCHANNEL_FACTORY);

public static void tcpServerStartUp() {

TCPSERVER_BOOTSTRAP.setPipelineFactory(newTCPServerPipelineFactory(EXECUTION_UP_HANDLER,EXECUTION_DOWN_HANDLER));

TCPSERVER_BOOTSTRAP.setOption("child.tcpNoDelay", true);

TCPSERVER_BOOTSTRAP.setOption("child.keepAlive", true);

TCPSERVER_BOOTSTRAP.setOption("reuseAddress", true);

LOGGER.info("SERVER_NAME:"+Constants.SERVER_NAME);

LOGGER.info("TCPSERVER_PORT:"+Constants.TCPSERVER_PORT);

TCPSERVER_BOOTSTRAP.bind(newInetSocketAddress(Constants.SERVER_NAME,Constants.TCPSERVER_PORT));

LOGGER.info("TCP服务已启动....");

}

public static void tcpServerShutDown(){

TCPSERVER_BOOTSTRAP.releaseExternalResources();

EXECUTION_UP_HANDLER.releaseExternalResources();

EXECUTION_DOWN_HANDLER.releaseExternalResources();

LOGGER.info("TCP服务已关闭");

}

  1. TCPPipelineFactory

ChannelPipeline pipeline = Channels.pipeline(

new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),

new LengthFieldPrepender(4, false),

this.EXECUTION_UP_HANDLER,

this.EXECUTION_DOWN_HANDLER,

new TCPServerHandler());

return pipeline;

  1. ClientPipelineFactory

ChannelPipeline pipeline = pipeline();

pipeline.addLast("decoder", new LengthFieldBasedFrameDecoder(

Integer.MAX_VALUE, 0, 4, 0, 4));

pipeline.addLast("encoder", new LengthFieldPrepender(4, false));

pipeline.addLast("handler", new ClientHandler());

return pipeline;

时间: 2024-08-11 05:46:16

Netty中execution包功能详解的相关文章

CentOS中rpm包管理详解

环境说明:Win7+Vmware11+Centos6.6,使用CentOS6.6安装光盘 RPM全名是"RedHat Package Manager",RPM最大优点是将要安装的软件先编译过,并且打包成为RPM机制的安装包,通过包装好的软件里头默认的数据库记录这个软件要安装的时候必须具备的依赖属性软件,当安装在你的Linux主机是,RPM会先依照软件里头的数据查询Linux主机的依赖属性软件是否满足,若满足则予以安装,若不满足则不予以安装. 在执行rpm包管理之前,我们首先将CentO

【PHP发展史】PHP5.2 到 PHP5.6 中新增的功能详解

截至目前(2014.2), PHP 的最新稳定版本是 PHP5.5, 但有差不多一半的用户仍在使用已经不在维护的 PHP5.2, 其余的一半用户在使用 PHP5.3. 因为 PHP 那“集百家之长”的蛋疼语法,加上社区氛围不好,很多人对新版本,新特征并无兴趣. 本文将会介绍自 PHP5.2 起,直至 PHP5.6 中增加的新特征. PHP5.2 以前:autoload, PDO 和 MySQLi, 类型约束 PHP5.2:JSON 支持 PHP5.3:弃用的功能,匿名函数,新增魔术方法,命名空间

【转】PCB中3D相关功能详解

如果PCB Layout工程师能够在设计过程中,使用设计工具直观地看到自己设计板子的实际情况,将能够有效的帮助他们的工作.尤其现在PCB板的设计越来越复杂,密度越来越高,如果能够洞察多层板内部则可以帮助工程师避免很多不易察觉的错误.特别对于电子产品的机电一体化设计,Altium Designer对于STEP格式的3D模型的支持及导入导出,极大地方便了ECAD-MCAD之间的无缝协作. Altium Designer 凭借其突出的 3D 设计能力,提供当今公认一流的三维 PCB 设计平台.PCB

Lua中的模块(module)和包(package)详解1

这篇文章主要介绍了Lua中的模块(module)和包(package)详解,本文讲解了require函数.写一个模块.package.loaded.module函数等内容,需要的朋友可以参考下 前言 从Lua5.1版本开始,就对模块和包添加了新的支持,可是使用require和module来定义和使用模块和包.require用于使用模块,module用于创建模块.简单的说,一个模块就是一个程序库,可以通过require来加载.然后便得到了一个全局变量,表示一个table.这个table就像是一个命

Lua中的模块(module)和包(package)详解

这篇文章主要介绍了Lua中的模块(module)和包(package)详解,本文讲解了require函数.写一个模块.package.loaded.module函数等内容,需要的朋友可以参考下 前言 从Lua5.1版本开始,就对模块和包添加了新的支持,可是使用require和module来定义和使用模块和包.require用于使用模块,module用于创建模块.简单的说,一个模块就是一个程序库,可以通过require来加载.然后便得到了一个全局变量,表示一个table.这个table就像是一个命

在ASP.NET 5应用程序中的跨域请求功能详解

在ASP.NET 5应用程序中的跨域请求功能详解 浏览器安全阻止了一个网页中向另外一个域提交请求,这个限制叫做同域策咯(same-origin policy),这组织了一个恶意网站从另外一个网站读取敏感数据,但是一些特殊情况下,你需要允许另外一个站点跨域请求你的网站. 跨域资源共享(CORS:Cross Origin Resources Sharing)是一个W3C标准,它允许服务器放宽对同域策咯的限制,使用CORS,服务器可以明确的允许一些跨域的请求,并且拒绝其它的请求.CORS要比JSONP

jdk5.0 新增的 Concurrent包主要功能详解

我们都知道,在JDK1.5之前,Java中要进行业务并发时,通常需要有程序员独立完成代码实现,当然也有一些开源的框架提供了这些功能,但是这 些依然没有JDK自带的功能使用起来方便.而当针对高质量Java多线程并发程序设计时,为防止死蹦等现象的出现,比如使用java之前的wait(). notify()和synchronized等,每每需要考虑性能.死锁.公平性.资源管理以及如何避免线程安全性方面带来的危害等诸多因素,往往会采用 一些较为复杂的安全策略,加重了程序员的开发负担.万幸的是,在JDK1

Dynamics CRM2013 1:N关系 sub-grid中的“添加现有项”和“添加新建项”功能详解

CRM2013中sub-grid的样式和2011中有了较大的变化,2013和2011界面对比如下 在2011的时候按钮是在ribbon区,1:N的父子关系实体直接点击添加新纪录就可以,但2013就不行了点加号首先会有个下拉框把现有的子实体数据列出来,你可以选择现有的也可以新建 既然你的关系实体是1:N的父子实体,那子的存在肯定是依赖于与父实体的,所以这个地方就压根不存在关联现有实体一旦关联就会报错,所以纯碎新建的话这边的步骤就繁琐了,同时也会给用户带来迷惑 所以这个地方这种情况下完全没必要添加现

使用【百度云推送】第三方SDK实现推送功能详解

之前介绍过如何使用shareSDK实现新浪微博分享功能,今天介绍如何使用百度云推送SDK实现Android手机后台推送功能. 运行效果如下 第一步,如果使用百度的SDK,当然要先成为百度的开发者啦,这个就不详述了.成为开发者之后,我们要建立一个应用,如下图所示 第二步,创建好应用之后,我们点击开方者服务管理,进入工程管理页面,然后点击左侧云推送,进入云推送功能页面,具体如下图 进入云推送详细页面之后,我们点击推送设置,设置好我们的应用的包名,然后点击快速实例,将系统给我们产生的示例代码下载下来