Netty3 源码分析 - ChannelPipeline

ChannelPipeline的作用就是组织一系列的ChannelHandlers 为某一个Channel服务,处理各种事件。实现了拦截过滤器模式的高级形式(an
advanced form of the Intercepting Filter pattern),进而有效控制如何处理一个事件以及ChannelHandlers之间如何交互。类型结构图为:

流水线的创建:对于每个新的通道,必须为其创建和添置一个Pipeline,一旦设置,他们之间的耦合就是永久的,这个通道不同添置另一个流水线,不能解耦当前的Pipeline。创建Pipeline的推荐方式是利用Channels中的辅助方法,而不是调用某个实现的构造器。

import static org.jboss.netty.channel.Channels.*;

ChannelPipeline pipeline = pipeline(); // same with Channels.pipeline()

Pipeline中事件流动:下图展示了事件被各个ChannelHandlers 处理的典型方式. 一个ChannelEvent可以被ChannelUpstreamHandle或ChannelDownstreamHandler
处理而后通过调用 ChannelHandlerContext.sendUpstream(ChannelEvent) 或者 ChannelHandlerContext.sendDownstream(ChannelEvent) 把事件转发到最近的一个Handler。根据事件的方向事件有不同的解释
(参见ChannelEvent )。

上行和下行事件分为被相应方向的Handler所处理。一个upstream handler通常处理由图中底部所示IO线程产生的带内数据( inbound
data)。带内数据通常是真正的输入流从远端主机读到的数据,即 InputStream.read(byte[]).。如果一个上行事件到达了upstream handler的最顶端,那么就会默默的丢弃。

与之类似,一个downstream handler 产生和传输带外数据(outbound
traffic)比如说写请求,如果一个下行事件到达了了最低端,就会被与这个Channel相关的IO线程所处理(就是之前说道的worker thread)。这里的IO线程通常执行真正的输出流操作,即OutputStream.write(byte[])。

看下面这个pipeline:

ChannelPipeline p = Channels.pipeline();

p.addLast("1", new UpstreamHandlerA());

p.addLast("2", new UpstreamHandlerB());

p.addLast("3", new DownstreamHandlerA());

p.addLast("4", new DownstreamHandlerB());

p.addLast("5", new UpstreamHandlerX());

要注意的是,站在不同的方向Handler顺序的解读是不一样的,而且要看这个Handler的职责范围。比如对于upstream event经过的Handler依次是1,2,5,对于downstream event经过的Handler顺序是4,3.

构建pipeline:我们可能会在程序中需要多个Handlers来处理IO请求如read,write,close等。比如说典型的服务器需要每个通道都有以下handlers,而且会取决于协议和业务罗杰的复杂度。

Protocol Decoder - translates binary data (e.g. ChannelBuffer) into a Java object.

Protocol Encoder - translates a Java object into binary data.

ExecutionHandler - applies a thread model.

Business Logic Handler - performs the actual business logic (e.g. database access).

代码形式像这样:

ChannelPipeline pipeline = Channels.pipeline();

pipeline.addLast("decoder", new MyProtocolDecoder());

pipeline.addLast("encoder", new MyProtocolEncoder());

pipeline.addLast("executor", new ExecutionHandler(

new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576)));

pipeline.addLast("handler", new MyBusinessLogicHandler());

线程安全:因为ChannelPipeline 是线程安全的,所以可以任何时刻可以将一个ChannelHandler加入或移除。比如,当出现敏感信息的时候加入SslHandler,交易完成后移除它。

缺陷(Pitfall):由于现在默认ChannelPipeline的内部实行细节,下面的代码不会如期工作,如果这里的FirstHandler 也是该pipeline中最后一个Handler的话。(不知道现在的netty5怎么做的呢?)

public class FirstHandler extends SimpleChannelUpstreamHandler {

@Override

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {

// Remove this handler from the pipeline,

ctx.getPipeline().remove(this);

// And let SecondHandler handle the current event.

ctx.getPipeline().addLast("2nd", new SecondHandler());

ctx.sendUpstream(e);

}

}

正确的做法是在移除前确保至少还有一个Handler(因为Handler context中的pre,next缘故?),或者先增加再移除。

详细的源码注释看我的github

时间: 2025-01-01 08:42:32

Netty3 源码分析 - ChannelPipeline的相关文章

Netty3 源码分析 - ChannelUpstreamHandler

Netty3 源码分析 - ChannelUpstreamHandler ChannelUpstreamHandler处理上行的通道事件,并且在流水线中传送事件.这个接口最常用的场景是拦截IO工作现场产生的事件,传输消息或者执行相关的业务逻辑.在大部分情况下,我们是使用SimpleChannelUpstreamHandler 来实现一个具体的upstream handler,因为它为每个事件类型提供了单个的处理方法.大多数情况下ChannelUpstreamHandler 是向上游发送事件,虽然

Netty3 源码分析 - NIO server绑定过程分析

一个框架封装的越好,越利于我们快速的coding,但是却掩盖了很多的细节和原理,但是源码能够揭示一切.服务器端代码在指定好ChannelFactory,设定好选项,而后Bootstrap.bind操作就会开启server,接受对端的连接.所以有必要对这后面的过程分析清楚,下图是关键流程.先是构建一个默认的Pipeline,为我们接下来要创建的监听通道服务,这个Pipeline里面会加入一个Binder的上行事件处理器:接下来创建了至关中的NioServerSocketChannel,在构造的过程

Netty3 源码分析 - ClientBootstrap

Bootstrap是通道初始化辅助类 提供了初始化通道或子通道所需要的数据结构,那么ClientBootstrap就是客户端的,而且会执行连接操作. 配置通道,就是把相应的键值对选项传递给底层: ClientBootstrap b = ...; // Options for a new channel b.setOption("remoteAddress", new InetSocketAddress("example.com", 8080)); b.setOpti

Netty3 源码分析 - 套接字绑定实现原理

前面关注的地方都是Netty采用的流水线处理方式的组织方式,ChannelHandler如何管理,通道状态,通道事件等这些上层的架构设计,那么Netty中如何实现诸如套接字绑定,连接,关闭等这些底层的操作呢?不能只顾着套用API写程序,却对细节不求甚解.这里大致追踪下OIO模式下Channel中套接字绑定的实现,(NIO以后分析)其实逻辑都是一样的,只是在线程模型的地方时不同的. 大致过程如下(详细的源码注释,看我的github): 1.我们在通过Bootstrap启动客户端或者服务端的时候会提

Netty3 源码分析 - ChannelHandlerContext

ChannelHandlerContext存在的意义是可以让其管理的Handler与Pipeline或其他handlers进行交互,ChannelHandler的理解在前面说过. 发送事件:可以调用 sendUpstream(ChannelEvent) 或sendDownstream(ChannelEvent)将一个事件传递给这个Pipeline中与其最近的那个Handler. 修改pipeline:调用 getPipeline()可以得到这个Handler所属的ChannelPipeline对

Netty3 源码分析 - Channel

何为通道(Channel)?代表的是一个网络套接字的连接点(nexus). 一个通道抽象的内容包括: 1)当前通道状态,是否打开,是否绑定等: 2)通道的配置参数信息,如套接字缓冲区大小: 3)通道支持的IO操作: 4)处理和这个Channel相关的IO事件和请求的ChannelPipeline. 在Netty中所有的IO操作都是异步的,即执行一个IO调用的时候不会阻塞到操作完成,而后立即返回一个ChannelFuture对象,这个ChannelFuture对象会在某个时候通知我们IO操作执行结

Netty3 源码分析 - ChannelFuture

ChannelFuture抽象的是Channel中异步IO操作的结果.在Netty中,所有的IO操作是异步的,意味着任何IO调用会立刻返回,而不是等到操作真正的执行完成.相反,会返回一个ChannelFuture 对象,在IO完成之后通过其得到结果状态.ChannelFuture 要么完成要么未完成,当IO操作开始执行会创建一个新的future对象,初始状态时uncompleted (不是成功,失败,也不是取消)因为IO操作还木有完成,一旦IO操作完成(成功,失败,或者被取消).这个CHanne

Netty3 源码分析 - ChannelStateEvent

ChannelStateEvent是ChannelEvent的子接口,可以代表改变的通知或者是请求,取决于它是上行还是下行事件. UpstreamChannelStateEvent是默认的上行通道状态事件实现类.结合ChannelState中的表很容易看懂源码. DownstreamChannelStateEvent是默认的下行通道状态事件实现类.不同的是需要指定一个Future对象,当相应的动作完成后得到通知.源码很容易理解: public class DownstreamChannelSta

Netty3 源码分析 - ChannelHandler

每个通道关联一个Pipeline,在流水线中拦截处理各种事件的对象就是ChannelHandler,它处理ChannelEvent而后进行传递. 接口ChannelHandler没有提供任何方法,有两个子接口分别用来规范处理上行和下行的通道事件. ChannelHandler是随ChannelHandlerContext对象提供的,handler通过这个context对象参与这个Pipeline的交互管理,通过它所属的context对象,一个handler可以传递上行或下行事件,动态改变流水线,