本章内容包括:
1)ChannelHandler和ChannelPipeline的APIs
2)检测内存泄漏
3)异常处理
在之前的一个章节中,我们学习了ByteBuf,Netty的数据容器,在这个章节中,我们将讲解Netty的数据流和对应的处理组件,然后我们将我们已经学过的所有组件整合在一起
你已经知道多个ChannelHandler可以被链式的放入ChannelPipeline来将所有的处理逻辑组织在一起,我们将学习包涵这些有关类的很多用户案例和他们之间的对应关系------ChannelHandlerContext
了解这些组件之间是如何交互的对构建多模块可重复利用的Netty项目是至关重要的
6.1 The ChannelHandler family
准备详细地学习ChannelHandler之前,我们将花一些时间理解一下这个Netty组件模型的基础知识
6.1.1 The Channel lifecycle
接口Channel定义了一些简单但是有用的状态模型,它与ChannelInboundHandler的API紧密关联,下表6.1展示了Channel的四种状态
状态 | 描述 |
ChannelUnregistered | Channel已经创建,但是还没有注册到EcentLoop上 |
ChannelRegistered | Channel已经注册到EventLoop |
ChannelActive | Channel已经激活了(已经连接到远程端),现在它已经准备好接受和发送信息 |
ChannelInactive | Channel没有连接到远程端 |
Channel的一个正常的生命周期是如图6.1展示的,一旦一个状态发生了改变,相对应的事件将会产生,然后会转向到ChannelPipeline中的ChannelHandler中,然后Channel会对事件进行处理
6.1.2 The ChannelHandler lifecycle
接口ChannelHandler定义了操作的生命周期,如下表6.2展示,这些操作将会在ChannelHandler加入或者移除ChannelPipeline的时候被调用,每一个方法接收一个ChannelHandlerContext的入参
类型 | 描述 |
handlerAdded | 当一个ChannelHandler被载入ChannelPipeline的时候触发 |
handlerRemoved | 当一个ChannelHandler从ChannelPipeline中移除的时候触发 |
exceptionCaught | 在处理过程中ChannelPipeline发生了异常 |
Netty定义了ChannelHandler两个比较重要的子接口
1)ChannelInboundHandler------处理输入的数据且处理一切状态的改变
2)ChannelOutboundHandler--------处理输出数据,运行拦截一切的操作
在下一个小节中,我们将进行详细的讲解
6.1.3 Interface ChannelInboundHandler
表6.3展示了在整个ChannelInboundHandler的生命周期里的所有方法,当接收到数据或者相关联的Channel状态改变的时候会调用这些方法,与我们之前提及的一样,这些方法与Channel的生命周期是一一映射对应的
类型 | 描述 |
channelRegistered | 当一个Channel被注册到EventLoop上的时候并且能够处理IO的时候调用执行 |
channelUnregistered | 当一个Channel从EventLoop中注销的时候且不能再处理I/O的时候调用执行 |
channelActive | 当一个Channel被激活是调用执行 |
channelInactive | 当一个Channel已经处于非激活的状态且不再连接到远程端的时候被调用执行 |
channelReadComplete | 当一个Channel的读操作已经准备好的时候被调用执行 |
channelRead | 当数据已经从Channel读取的时候执行 |
channelWritabilityChanged | 当一个Channel的可写的状态发生改变的时候执行,用户可以保证写的操作不要太快,这样可以防止OOM,写的太快容易发生OOM,如果当发现Channel变得再次可写之后重新恢复写入的操作,Channel中的isWritable方法可以监控该channel的可写状态,可写状态的阀门直接通过Channel.config().setWriterHighWaterMark()和Channel.config().setWriteLowWaterMark()配置 |
userEventTriggered | 当ChannelInboundHandler的fireUserEventTriggered被调用的时候执行,因为一个POJO对象传输通过了ChannelPipeline |
当一个ChannelInboundHandler具体实现重写了channelRead方法的时候,那么它需要负责显式的去释放池对象的ByteBuf实例相关联的内存空间,Netty提供了一个特别的方法来达到这个目的,ReferenceCountUtil.release(),如下面的代码清单所示:
Netty用WARN级别记录没有释放的资源,这样可以很方便的找出代码中有问题的实例,但是这样管控资源显得有些麻烦,一个简单的备选方案是使用SimpleChannelInboundHandler,下面的代码清单向你展示了这种改变
因为SimpleChannelInboundHandler这个对象可以自动的释放资源,那么你不能够获取任何信息的引用供以后使用,这将会是无效不合法的
小节6.1.6提供了引用处理的详细讲解
6.1.4 Interface
ChannelOutboundHandler
输出操作和数据处理是由ChannelOutboundHandler管控的,它的方法将由Channel,ChannelPipeline,ChannelHandlerContext调用执行
一个ChannelOutboundHandler强大有用的功能之一是可以推迟一个操作和事件的执行,它允许复杂的事件来请求处理,如果一个写数据到远程端的过程被挂起,你可以推迟flush的操作然后在之后的时间重新进行断点续传
ChannelOutboundHandler自己的一些本地的方法如6.4所示
类型 | 描述 |
bind(ChannelHandlerContext,
SocketAddress,ChannelPromise) |
绑定到本地的地址的channel的请求被执行 |
connect(ChannelHandlerContext,
SocketAddress,SocketAddress,ChannelPromise) |
连接到远程端的channel的请求被执行 |
disconnect(ChannelHandlerContext,
ChannelPromise) |
当从远程端停止连接的时候执行 |
close(ChannelHandlerContext,ChannelPromise) | 请求关闭channel的时候执行 |
deregister(ChannelHandlerContext,
ChannelPromise) |
请求当channel从EventLoop上注销的时候执行 |
read(ChannelHandlerContext) | 请求从channel中读取更多信息的时候执行 |
flush(ChannelHandlerContext) | 当从channel刷入队列信息到远程端的时候执行 |
write(ChannelHandlerContext,Object,
ChannelPromise) |
当从channel中写数据到远程端的时候执行 |
CHANNELPROMISE VS. CHANNELFUTURE ChannelOutboundHandler的很多方法用ChannelPromise作为参数来通知操作的完成,ChannelPromise作为ChannelFuture的子类来定义了一些写的方法,例如setSuccess和setFailure方法,这样可以使ChannelFuture看起来不变
接下来,我们看看一些类,这些类可以简化ChannelHandler的写入操作
6.1.5 ChannelHandler adapters
你可以使用ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter作为你写你自己的ChannelHandler的起点,这些适配的类分别为ChannelInboundHandler和ChannelOutboundHandler类提供了一些基本的实现,通过继承抽象类ChannelHandlerAdapter,我们可以获取它的父接口ChannelHandler一些常用的方法的具体实现,继承图如图6.2所示:
ChannelHandlerAdapter还提供一个特殊的方法isSharable的方法,如果该接口的具体实现以“Sharable”注解的话,那么将会返回true,这就表示这个实现可以被加入到多个不同的ChannelPipeline中去
ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter提供的方法体与ChannelHandlerContext中的方法是对等的,因此都会将事件转发至管道中的下一个ChannelHandler中
在你自己的处理器中使用这些适配类,可以很方便的扩展他们并且可以自定义重写父类方法
6.1.6 Resource management
当你通过调用ChannelInboundHandler.channelRead()或者ChannelOutboundHandler.write()这些方法来操作数据的时候,你都需要保证没有资源泄漏,应该还记得上一个章节讲解的知识,Netty使用引用计数来处理池化的ByteBuf,所以在你已经完成对ByteBuf使用的时候,修改引用计数的值是很重要的
为了帮助你诊断这些潜在的问题,Netty提供了一个叫做ResourceLeakDetector的类,这个类将会对你的引用中1%的buffer分配进行取样来分析你的内存泄漏,当然这样做的内存损耗也是很小的,可以接受的
如果诊断出游内存损耗的话,那么记录的日记信息应该和下面打印的信息有些类似
目前Netty定义了泄漏级别,在表6.5中展示
等级 | 描述 |
DISABLED | 禁止使用内存检测,只有经过全面的测试才能使用 |
SIMPLE | 使用默认的1%的测试样例来记录任何的使用泄漏的情况,使用这种默认的级别对于大多数的案例来说是一个不错的选择 |
ADVANCED | 报告发现泄漏的情况且找出该信息所在的位置,依旧使用默认的测试样例比例 |
PARANOID | 与ADVANCED一样,除了它将所有的buffer作为测试样例,这对性能有很大的负担,这应该用在测试debug阶段 |
泄漏检查等级可以在下面的java系统的变量中设置,设置的值就是上面表的任意属性
如果设置好相关的JVM参数后重新启动你的应用的话,你将会检测到你应用最近buffer内存泄漏的地方,一下是一个通过单元测试后经典的泄漏情景
当你实现ChannelInboundHandler.channelRead() 和ChannelOutboundHandler.write()的时候,你如何使用这个诊断工具来防止你内存泄漏,让我们举一个案例,当你使用channelRead来消费输入的信息的时候,通过调用ChannelHandlerContext类的fireChannelRead使其信息不通过下一个ChannelInboundHandler方法,代码清单向你展示了如何释放信息
CONSUMING
INBOUND MESSAGES THE EASY WAY 因为消费一个输入信息然后释放这个信息所占用的内存是一个很常见的任务,Netty提供了一个实现ChannelInboundHandler的特殊类被叫做SimpleChannelInboundHandler,这个可以自动释放一个信息一旦这个信息被channelRead0消费
释放资源重要但是同样通知ChannelPromise也同样重要,否则会出现一种场景,当该信息已经被释放了,但是ChannelFutureListener却并没有被通知到
总的来说,如果一个信息被消费然后被丢弃并没有通过ChannelPipeline中的下一个ChannelOutboundHandler的时候,用户应该调用ReferenceCountUtil.release(),如果一个信息到达真正的传输层的时候,当它被写入的时候或者channel关闭的时候,它能够自动释放
6.2
Interface ChannelPipeline
如果你将ChannelPipeline想象成ChannelHandler链式的形态用来与通过Channel的输入输出的数据交互的话,那么你将很容易的轻易理解将这些ChannelHandler联合在一起组成了应用程序数据业务逻辑的核心块
每一个新的Channel被创建的时候都会被分配给一个新的ChannelPipeline,这种关系绑定是稳定的,不易变更的,被绑定的channel不能再与其他的ChannelPipeline绑定也不能与当前的ChannelPipeline解除绑定,这对于Netty的来说,在这个组件的生命周期里都是一个固定的操作,是Netty自动完成的,不需要开发者对这部分的动作进行任何的处理
TIPS:ChannelHandlerContext
一个ChannelHandlerContext使ChannelHandler与它对应的ChannelInboundHandler和相关的ChannelHandler做交互,一个处理器可以通知ChannelPipeline中的下一个ChannelHandler,甚至可以动态地修改它所属的ChannelPipeline
ChannelHandlerContext提供了非常丰富的API来处理事件,或者进行I/O操作,章节6.3将对ChannelHandlerContext进行更加详细的讲解
根据事件的来源,一个事件要么被ChannelInboundHandler处理要么被ChannelOutboundHandler处理,随后通过调用一个ChannelHandlerContext的具体实现,被转发到下一个同样类型的处理器处理
图6.3向我们展示了一个包含输入输出的ChannelHandler的经典的ChannelPipeline,并说明了我们之前说过的那个模型,channelPipeline就是一个一系列ChannelHandler组成的,channelPipeline为通过它自己的事件传播也提供了一些方法,如果一个输入事件被触发了,那么它将从头到尾的通过channelPipeline,在图6.3所示,一个输出的I/O操作将从channelPipeline的右边末端开始一直处理到左端结束
TIPS:channelPipeline的相对说
如果从事件在ChannelPipeline传播的方向的视角来区分事件的话,开始端取决于事件是输入还是输出,但是Netty默认使用输入到ChannelPipeline作为开始端,输出entry作为结束端
当我们已经把输入输出的处理器混合的通过ChannelPipeline的add方法加入到ChannelPipeline中去后,每一个channelHandler的序号(也就是它的位置顺序)就是我们将其加入ChannelPipeline时候的定义的顺序
我们再来从左往右的数一下图6.3中的处理器,第一个输入的处理器记为1,左边第一个输出的处理器记为5
管道里传输事件的时候,需要确认管道中下一个ChannelHandler是否匹配事件传播的方向,如果不匹配,那么将会跳过这个ChannelHandler,匹配下一个Handler,直到找到一个匹配方向的handler(当然一个handler可以同时实现ChannelInboundHandler和ChannelOutboundHandler两个接口)
6.2.1 Modifying
a ChannelPipeline
一个ChannelHandler可以实时的通过移除,增加或者替换其他的channelHandler来修改一个ChannelPipeline的布局,这是ChannelHandler众多出众的能力中的一种,所以我们需要仔细研究一下它是如何工作的,相关的方法在表6.6中展示:
下面的代码清单向你展示了如何使用这些方法
待会你会看见这种重新组织ChannelHandler的能力可以使它自己能够很轻易地实现极其灵活的业务逻辑处理
TIPS:ChannelHandler的执行和阻塞
正常情况下,每一个在ChannelPipeline中的ChannelHandler都是通过它的EventLoop线程来处理通过它的事件的,这是非常重要的,因为它不会阻塞当前线程,但是对于整体的I/O处理来说,这会有些负面的影响
有时候,对于一些比较陈旧的代码而言,使用阻塞的API也许更为的适合,在这种情况下,1ChannelPipeline通过add方法来接收一个EventExcutorGroup,如果一个事件通过定制的EventExcutorGroup的话,它将会包含在EventExcutorGroup中的一个EventExcutor处理,并且会从Channel它自己的EventLoop中移除,在这种场景下,Netty提供了一个叫做DefaultEventExcutorGroup的实现
除了这些操作,还有一些方法获取ChannelHandler要不通过类型要么通过名字,表6.7列出了这些方法
名字 | 描述 |
get | 根据类型或者名称返回ChannelHandler |
context | 返回绑定在ChannelHandler上的ChannelHandlerContext |
names | 返回ChannelPipeline中的所有ChannelHandler名字 |
6.2.2 Firing events
ChannelPipeline的API额外提供了一些方法来调用输入输出数据的操作,表6.8列出了输入操作的一些常用方法,它用来负责通知ChannelInboundHandler发生在ChannelPipeline中的事件
对于输出数据方面,处理数据会引起一些底层的操作,表6.9向你展示了输出数据ChannelPipeline的一些API:
总而言之
1)一个ChannelPipeline持有关联一个Channel的所有ChannelHandler
2)一个ChannelPipeline可以根据需要动态地增加和移除ChannelHandler
3)ChannelPipeline有很丰富的API可以对输入输出事件作出响应