inBound事件的传播
- 何为inBound事件以及ChannelInboundHandler
- ChannelRead事件的传播
ChannelRead是典型的inbound事件,以他为例了解inbound事件的传播 - SimpleInBoundHandler处理器
何为inBound事件以及ChannelInboundHandler
ChannelHandler的继承关系
- ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter.
用户代码中常见.平时自定义channelHandler时都会继承与他们 - ChannelHandler,所有处理器的抽象
- ChannelHandlerAdapter
ChannelHandler的默认实现 - ChannelInboundHandler,ChannelOutboundHandler
在ChannelHandler的基础上,自定义功能
ChannelHandler定义了哪些功能
123456789101112131415161718 |
public interface { void handlerAdded(ChannelHandlerContext ctx) throws Exception; //handler被pipeline删除时的回调 void handlerRemoved(ChannelHandlerContext ctx) throws Exception; //出现异常时的回调 void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; //一个注解,是否可以被多个pipeline添加 @Inherited @Documented @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @interface Sharable { // no value }} |
ChannelInboundHandler在ChannelHandler的基础上扩展了什么
12345678910111213141516171819202122232425 |
public interface ChannelInboundHandler extends { //回调,Handler注册到nioEventLoop的selector上时 void channelRegistered(ChannelHandlerContext ctx) throws Exception; void channelUnregistered(ChannelHandlerContext ctx) throws Exception; //channel的激活或失效时的回调 void channelActive(ChannelHandlerContext ctx) throws Exception; void channelInactive(ChannelHandlerContext ctx) throws Exception; //channel读到数据或接收到连接时的回调 //对于服务端而言是连接,对于客户端channel则是bytebuf的数据 void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception; void channelReadComplete(ChannelHandlerContext ctx) throws Exception; //trigger一些用户自定义的事件 void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception; //可写状态发生了改变 void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception; //出现异常时的回调 @Override @SuppressWarnings("deprecation") void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;} |
这次由ChannelRead为例,看一下inbound事件是如何传播的.
ChannelRead事件的传播
这里创建3个自定义的InboundHandler测试ChannelRead事件
12345678910111213141516171819202122232425262728 |
public class InBoundHandlerA extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InBoundHandlerA: " + msg); ctx.fireChannelRead(msg); }}public class InBoundHandlerB extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InBoundHandlerB: " + msg);//打印 ctx.fireChannelRead(msg);//继续传播 } @Override public void channelActive(ChannelHandlerContext ctx) { //channel被激活时取到pipeline,激活channelRead事件 ctx.channel().pipeline().fireChannelRead("hello world"); }}public class InBoundHandlerC extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InBoundHandlerC: " + msg); ctx.fireChannelRead(msg); }} |
服务端启动代码添加childHandler的部分为
12345678 |
.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new InBoundHandlerA()); ch.pipeline().addLast(new InBoundHandlerB()); ch.pipeline().addLast(new InBoundHandlerC()); }}); |
此时启动后在控制台执行telnet 127.0.0.1 8888
,它的后台输出结果为
123 |
InBoundHandlerA: hello worldInBoundHandlerB: hello worldInBoundHandlerC: hello world |
如果把添加childHandler的部分改为下面的顺序
123 |
ch.pipeline().addLast(new InBoundHandlerA());ch.pipeline().addLast(new InBoundHandlerC());ch.pipeline().addLast(new InBoundHandlerB()); |
那么输出是
123 |
InBoundHandlerA: hello worldInBoundHandlerC: hello worldInBoundHandlerB: hello world |
可以推测出,inbound和添加顺序相关.在InboundHandlerB
的channelActive()
中打个断点.看一下它的fireChannelRead("hello world")
的逻辑.会看到它会调用:
12345678910111213141516171819202122 |
@Overridepublic final ChannelPipeline fireChannelRead(Object msg) { //head节点 AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this;}--- static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) {//true //此时的next为head next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } } |
也就是说,当遇到channelRead事件时它会从调用head的invokeChannelRead.继续看
123456789101112131415 |
private void invokeChannelRead(Object msg) { if (invokeHandler()) { //调用head的channelRead() ((ChannelInboundHandler) handler()).channelRead(this, msg); } else { fireChannelRead(msg); }}--- @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //也是调用head的fireChannelRead // 这里head会把fireChannelRead原封不动的进行传播 ctx.fireChannelRead(msg);} |
这个fireChannelRead是io.netty.channel.AbstractChannelHandlerContext#fireChannelRead
.它会调用findContextInbound()
来获取下一个inboundHandler
1234567891011121314 |
@Override public ChannelHandlerContext fireChannelRead(final Object msg) { //获取下一个inboundHandler invokeChannelRead(findContextInbound(), msg); return this; }--- private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound);//寻找下一个inbound节点 return ctx; } |
获取到下一个inboundHandler,也就是InboundHandlerA后它会调用io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead
,这个和之前head调用的是完全相同的方法,他回去调用当前inboundHandler,也就是InboundHandlerA的readChannel方法,它就是我们在用户代码中定义的部分.
12345678 |
public class InBoundHandlerA extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InBoundHandlerA: " + msg); ctx.fireChannelRead(msg);//继续往下传播 }} |
然后InBoundHandlerA
的channelRead()
又会调用下一个InBoundHandlerB
的channelRead()
,它又会接着调用InBoundHandlerC
的channelRead()
此时回顾一下InBoundHandlerB
中的逻辑,会发现.pipeline中inBound的传播方式为:
12 |
ctx.fireChannelRead(msg); //从当前节点继续往下传播ctx.channel().pipeline().fireChannelRead("hello world");//从head节点开始往下传播 |
当传播到最后一个节点(C)时,它会传播到最后的Tail节点,调用它的channelRead()
.之前说过tial是用来做一些收尾工作,
1234567891011121314151617 |
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //事件一直传播到tail, //tail会认为没有被处理,这个方法会打印一个logger.debug.来提醒开发者 onUnhandledInboundMessage(msg);}--- protected void onUnhandledInboundMessage(Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration.", msg); } finally { //如果时bytebuf就进行释放 ReferenceCountUtil.release(msg); }} |
SimpleInBoundHandler处理器
以下面的自定义Handler为例,看一下它的使用场景
123456789101112131415161718192021 |
public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //如果没有把这里的ByteBuf传播到tail,那么tail节点就不会帮你释放这段ByteBuf //通常这种情况下你需要手动进行释放, //而SimpleChannelInboundHandler会帮你自动释放他们 } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf password) throws Exception { if (paas(password)) { ctx.pipeline().remove(this); } else { ctx.close(); } } private boolean paas(ByteBuf password) { return false; }} |
看一下SimpleChannelInboundHandler
的channelRead()
12345678910111213141516171819202122 |
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { //如果是inbound if (acceptInboundMessage(msg)) { //转为bytebuf I imsg = (I) msg; //抽象方法 //继承SimpleChannelInboundHandler时你可以做一些你想干的,并且不用考虑释放 channelRead0(ctx, imsg); } else { release = false; ctx.fireChannelRead(msg); } } finally { if (autoRelease && release) { //自动释放 ReferenceCountUtil.release(msg); } }} |
也就是说使用SimpleChannelInboundHandler
时不需要管channelRead()
而是通过channelRead0()
来做原本在channelRead()
中的逻辑.因为SimpleChannelInboundHandler的channelRead()
中定义了从执行channelRead0()
直到释放的过程,所以当channelRead0()
被执行后它会自动帮你去释放
outBound事件的传播
- 何为outBound事件以及ChannelOutBoundHandler
- write()事件的传播
典型的outBound事件
何为outBound事件以及ChannelOutBoundHandler
ChannelOutboundHandler在ChannelHandler的基础上扩展了什么
1234567891011121314151617181920212223 |
public interface ChannelOutboundHandler extends { //端口绑定,服务端启动时的那个 void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception; void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception; void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; void read(ChannelHandlerContext ctx) throws Exception; void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception; void flush(ChannelHandlerContext ctx) throws Exception;} |
相对于InBounder,outBound更像是用户主动发起的操作.而InBounder更类似于事件触发
write()事件的传播
这里创建3个自定义的OutboundHandler测试write事件
12345678910111213141516171819202122232425262728293031323334 |
public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter { |