-
技术点描述
Netty中关于多线程处理的代码很多(netty框架的实现本身就是异步处理机制),此文档仅针对于execution包的功能做详细解说。
以下是整个包的目录结构:
包中的调用关系如下图所示:
-
实现方案
-
参考源码包
以下是对此包中的源码的分析(请注意后四个类为此包中最重要的类)
-
ChannelEventRunnableFilter
此接口定义了一个抽象方法:
boolean filter(ChannelEventRunnable event);
如果传入的event是由Executor处理的,则返回true。
-
ChannelUpstreamEventRunnableFilter
此类实现了ChannelEventRunnableFilter接口,判断传入的ChannelEventRunnable实例是不是属于ChannelDownstreamEventRunnable类
-
ChannelDownstreamEventRunnableFilter
此类实现了ChannelEventRunnableFilter接口,判断传入的ChannelEventRunnable实例是不是属于ChannelDownstreamEventRunnable类
-
ChannelEventRunnable
此抽象类实现了Runnable, EstimatableObjectWrapper两个接口
此类中有一个run方法:
public final void run() {
try {
PARENT.set(executor);
doRun();
} finally {
PARENT.remove();
}
}
protected abstract void doRun();
其中PARENT是一个ThreadLocal<Executor>类型的变量,仅供内部使用,告诉Executor当前worker获取到一个工作线程。
其中doRum();方法是一个抽象方法,需要根据不同的业务做不同的处理(常常是发布上行数据sendUpstream或者下行数据sendDownstream)。
实现了doRun();方法的类为ChannelUpstreamEventRunnable和ChannelDownstreamEventRunnable
-
ChannelUpstreamEventRunnable
继承自ChannelEventRunnable抽象类,重写了doRun();方法
-
ChannelDownstreamEventRunnable
继承自ChannelEventRunnable抽象类,重写了doRun();方法
-
ChainedExecutor
这是一个特殊的Executor,这个Executor可以将多个Executors和ChannelEventRunnableFilter连接起来使用。
此类中的比较重要的方法:
public ChainedExecutor(ChannelEventRunnableFilter filter, Executor cur, Executor next) {
…
}
按照给出的ChannelEventRunnableFilter实例filter创建一个ChainedExecutor实例,如果filter是ChannelEventRunnableFilter的实例,则调用当前的Executor实例cur执行,否则,调用next实例执行。
-
ExecutionHandler
此类的功能就是将一个ChannelEvent转发给Executor执行多线程操作。
当我们自定义的ChannelHandler执行阻塞操作时会花费较长时间,或者当我们的ChannelHandler访问诸如数据库等非CPU业务逻辑处理单元时,我们会用到这个类。执行上述操作时,如果在我们的pipeline中不采用ExecutorHandler,而进程比较多时,就可能会出现当前I/O线程无法执行I/O操作,从而出现一些想不到的异常情况。
在大多数情况下,ExecutorHandler会与OrderedMemoryAwareThreadPoolExecutor耦合在一起使用,这是因为这么做可以保证事件的执行顺序而且在负载较大情况下能够有效防止OutOfMemoryError
下面是调用的关键代码:
设置ExecutionHandler:
static ExecutionHandler EXECUTIONHANDLER = new ExecutionHandler(
new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));
启动服务时factory设置
TCPSERVER_BOOTSTRAP.setPipelineFactory(new TCPServerPipelineFactory(EXECUTIONHANDLER));
PipelineFactory的设置:
public class TCPServerPipelineFactory implements ChannelPipelineFactory {
private final ExecutionHandler EXECUTIONHANDLER;
public TCPServerPipelineFactory(ExecutionHandler executionHandler) {
this.EXECUTIONHANDLER = executionHandler;
}
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
// pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
// pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast("decoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
// pipeline.addLast("encoder", new LengthFieldPrepender(4, false));
// pipeline.addLast("GLOBAL_TRAFFIC_SHAPING", new ChannelTrafficShapingHandler(new HashedWheelTimer()));
pipeline.addLast("handler", this.EXECUTIONHANDLER);
pipeline.addLast("handler", new TCPServerHandler());
return pipeline;
}
当服务关闭时,需注意:
public static void tcpServerShutDown(){
TCPSERVER_BOOTSTRAP.releaseExternalResources();
EXECUTIONHANDLER.releaseExternalResources();
LOGGER.info("TCP服务已关闭");
}
针对其中源码分析发现,主要起作用的两个方法如下:
public void handleUpstream(
ChannelHandlerContext context, ChannelEvent e) throws Exception {
if (handleUpstream) {
executor.execute(new ChannelUpstreamEventRunnable(context, e, executor));
} else {
context.sendUpstream(e);
}
}
public void handleDownstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
// check if the read was suspend
if (!handleReadSuspend(ctx, e)) {
if (handleDownstream) {
executor.execute(new ChannelDownstreamEventRunnable(ctx, e, executor));
} else {
ctx.sendDownstream(e);
}
}
}
其中在处理上下行数据的时候加入了Executor,采用多线程执行,具体的是采用的哪种Executor,常用哪些Executor,以及各种Executor的区别将在下面分别予以介绍。
-
MemoryAwareThreadPoolExecutor
继承自ThreadPoolExecutor,具有如下特性:
当队列中有太多任务时,ThreadPoolExecutor会阻塞任务的提交,此时,每个channel和每个Executor的limition都会起作用。
当一个任务(如 Runnable)被提交以后,MemoryAwareThreadPoolExecutor会调用ObjectSizeEstimator.estimateSize(Object)来获取任务的预估大小(按字节计算),以此来计算这个尚未执行的任务的内存开销。
当此未执行的任务的总大小超出了channel或者executor的可执行大小的阀值,为了保证总处理的大小在阀值以下,队列中的任务未执行完以前,任何execute(Runnable)将会阻塞(挂起)。
替代任务大小估算策略:
虽然默认的实现类已经尽了最大的可能去估算位置类型对象的大小,然而,我们仍然希望避免错误的估算任务大小,这样,一个好的办法就是采用一个ObjectSizeEstimator的实现类来替代DefaultObjectSizeEstimator。
应用场景:
1.将MemoryAwareThreadPoolExecutor从ExecutionHandler独立出来
2.提交的任务的类型不是ChannelEventRunnable,或者ChannelEventRunnable中的MessageEvent消息类型不是ChannelBuffer
下面是具体的对ObjectSizeEstimator的实现类的改写(此段代码未经测试)
public class MyRunnable implements Runnable {
private final byte[] data;
public MyRunnable(byte[] data) {
this.data = data;
}
public void run() {
// Process ‘data‘ ..
}
}
public class MyObjectSizeEstimator extends DefaultObjectSizeEstimator {
@Override
public int estimateSize(Object o) {
if (o instanceof MyRunnable) {
return ((MyRunnable) o).data.length + 8;
}
return super.estimateSize(o);
}
}
ThreadPoolExecutor pool = new MemoryAwareThreadPoolExecutor(
16, 65536, 1048576, 30, TimeUnit.SECONDS,
new MyObjectSizeEstimator(),
Executors.defaultThreadFactory());
pool.execute(new MyRunnable(data));
注意:这个Executor不会因为Channel相同而维持ChannelEvent的先后顺序(异步),对于此问题的解决办法,将在后文中给予说明。
下面针对此类中常用的方法进行源码分析:
参数最多的构造方法:
MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactorythreadFactory){
...
}
Parameters:
corePoolSize 活动线程的最大值
maxChannelMemorySize 每个channel中队列中的events最大总大小。0表示无效
maxTotalMemorySize当前channel中队列中的events最大总大小。0表示无效
keepAliveTime 线程不活动后多长时间后自动关闭,默认为30
unit 保持活动的时间单元,默认为TimeUnit.SECONDS
threadFactory 当前线程池的线程工厂,默认为Executors.defaultThreadFactory()
objectSizeEstimator 当前线程池的预估对象大小,默认为new DefaultObjectSizeEstimator()
-
OrderedMemoryAwareThreadPoolExecutor
此类继承了MemoryAwareThreadPoolExecutor,并对MemoryAwareThreadPoolExecutor中来自同一个Channel的ChannelEvent事件不能按顺序执行做了改进,使其能够按顺序执行。这个类中的几乎所有的功能都是继承自他的父类。
ChannelEvent执行顺序
当采用MemoryAwareThreadPoolExecutor时,事件的处理顺序可能是这样的:
--------------------------------> Timeline -------------------------------->
Thread X: --- Channel A (Event 2) --- Channel A (Event 1) --------------------------->
Thread Y: --- Channel A (Event 3) --- Channel B (Event 2) --- Channel B (Event 3) --->
Thread Z: --- Channel B (Event 1) --- Channel B (Event 4) --- Channel A (Event 4) --->
而采用OrderedMemoryAwareThreadPoolExecutor后的事件处理顺序会变成这样:
-------------------------------------> Timeline ------------------------------------>
Thread X: --- Channel A (Event A1) --. .-- Channel B (Event B2) --- Channel B (Event B3) --->
\ /
X
/ \
Thread Y: --- Channel B (Event B1) --‘ ‘-- Channel A (Event A2) --- Channel A (Event A3) --->
但是,这样也有一个坏处,当同一个Channel中的前一个ChannelEvent未执行完时,那么下一个ChannelEvent就不会执行。
还有一点需要注意:并不是说ChannelA这次调用的是ThreadX,下次再发ChannelEvent时就还是会调用ThreadX,这点无法保证。
换种方式来维持每个通道的事件执行顺序
OrderedMemoryAwareThreadPoolExecutor中,Executor会采用Channel来作为主键,用以维持每个通道(Channel)的执行顺序,然而,我们可以修改为其他的值作为顺序执行的主键。例如:远程Channel的IP地址。
具体实现方式(代码)如下:
public class RemoteAddressBasedOMATPE extends OrderedMemoryAwareThreadPoolExecutor {
... Constructors ...
@Override
protected ConcurrentMap<Object, Executor> newChildExecutorMap() {
// The default implementation returns a special ConcurrentMap that
// uses identity comparison only (see IdentityHashMap).
// Because SocketAddress does not work with identity comparison,
// we need to employ more generic implementation.
return new ConcurrentHashMap<Object, Executor>
}
protected Object getChildExecutorKey(ChannelEvent e) {
// Use the IP of the remote peer as a key.
return ((InetSocketAddress) e.getChannel().getRemoteAddress()).getAddress();
}
// Make public so that you can call from anywhere.
public boolean removeChildExecutor(Object key) {
super.removeChildExecutor(key);
}
}
注意:此处一定要当心子类中executor map的内存泄露问题,当整个主键的生命周期结束(如来自相同IP的所有连接都已经关闭了)后,一定要调用removeChileExecutor(Object)方法。请牢记:调用removeChileExecutor(Object)方法后,此主键可能还会出现(如移除掉那个IP后可能还会出现来自那个IP的新的连接),如果不确定此种情况是否会发生,可以定期从子类的executor map中修改旧有的未使用的或是失效的key。解决方案如下:
RemoteAddressBasedOMATPE executor = ...;
on every 3 seconds:
for (Iterator<Object> i = executor.getChildExecutorKeySet().iterator; i.hasNext();) {
InetAddress ip = (InetAddress) i.next();
if (there is no active connection from ‘ip‘ now &&
there has been no incoming connection from ‘ip‘ for last 10 minutes) {
i.remove();
}
}
如果预期的key的最大值比较小而且比较确定,这样可以使用"弱"map来替代自己管理key的生命周期(如ConcurrentWeakHashMap或者 synchronized WeakHashMap)。
注:"弱"map 指仅当虚拟机内存不足时会对此map清空,正常情况下,不会主动做任何处理
此类能够按顺序执行来自同一个Channel的ChannelEvent源码分析:
此类定义了一个ConcurrentMap<Object, Executor>类型的全局变量childExecutors,它的key存放的是channel。
protected Object getChildExecutorKey(ChannelEvent e) {
return e.getChannel();
}
它的value存放的是Executor。
执行execute操作时会先根据map查找,从而实现顺序执行。
-
OrderedDownstreamThreadPoolExecutor
前面所研究的均是针对于上行数据(客户端向到服务器段发送数据,数据处理的执行顺序),此类主要是针对下行数据处理,使得下行数据能够按照上来的顺序下发下去。此类继承了OrderedMemoryAwareThreadPoolExecutor类,功能几乎全部与OrderedMemoryAwareThreadPoolExecutor类相同。
-
Demo实现
-
SocketServer
static ExecutionHandler EXECUTION_UP_HANDLER = new ExecutionHandler(
new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576),false,true);
static ExecutionHandler EXECUTION_DOWN_HANDLER = new ExecutionHandler(
new OrderedDownstreamThreadPoolExecutor(16),true,false);
/**TCP方式*/
static ChannelFactory TCPCHANNEL_FACTORY = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
/**TCP方式*/
static ServerBootstrap TCPSERVER_BOOTSTRAP = new ServerBootstrap(TCPCHANNEL_FACTORY);
public static void tcpServerStartUp() {
TCPSERVER_BOOTSTRAP.setPipelineFactory(newTCPServerPipelineFactory(EXECUTION_UP_HANDLER,EXECUTION_DOWN_HANDLER));
TCPSERVER_BOOTSTRAP.setOption("child.tcpNoDelay", true);
TCPSERVER_BOOTSTRAP.setOption("child.keepAlive", true);
TCPSERVER_BOOTSTRAP.setOption("reuseAddress", true);
LOGGER.info("SERVER_NAME:"+Constants.SERVER_NAME);
LOGGER.info("TCPSERVER_PORT:"+Constants.TCPSERVER_PORT);
TCPSERVER_BOOTSTRAP.bind(newInetSocketAddress(Constants.SERVER_NAME,Constants.TCPSERVER_PORT));
LOGGER.info("TCP服务已启动....");
}
public static void tcpServerShutDown(){
TCPSERVER_BOOTSTRAP.releaseExternalResources();
EXECUTION_UP_HANDLER.releaseExternalResources();
EXECUTION_DOWN_HANDLER.releaseExternalResources();
LOGGER.info("TCP服务已关闭");
}
-
TCPPipelineFactory
ChannelPipeline pipeline = Channels.pipeline(
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
new LengthFieldPrepender(4, false),
this.EXECUTION_UP_HANDLER,
this.EXECUTION_DOWN_HANDLER,
new TCPServerHandler());
return pipeline;
-
ClientPipelineFactory
ChannelPipeline pipeline = pipeline();
pipeline.addLast("decoder", new LengthFieldBasedFrameDecoder(
Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("encoder", new LengthFieldPrepender(4, false));
pipeline.addLast("handler", new ClientHandler());
return pipeline;