在TCP长连接模式下,我们需要及时释放那些未授权的TCP链接,让系统运行得更稳健一些。
首先是connect上来的TCP报文需要设置一个存活期,通过在pipleline上设置超时处理器ReadTimeoutHandler
ch.pipeline().addLast(new ReadTimeoutHandler(120));
使得一个TCP在120秒内没有收到数据就断掉。
这样做的目的是让连接者必须发TCP报文才能维持连接。
下一步在业务层对ChannelHandlerContext进行鉴权。与HTTP不同,TCP长连接的会话ChannelHandlerContext是一直存活的。只要TCP不断,每次进入处理器的ctx都是同一个对象,也是在同一个Handler对象中处理消息。基于这个考虑,可以在第一次收到TCP消息的时候启动一个定时器,等待类似鉴权的TCP消息。如果定时器超时的时候鉴权都没成功,则主动关闭这个TCP连接:ctx.close();。
在这个鉴权过程中Handler对象有四个状态:
- 初始态(瞬态)init
- 等待鉴权 waiting
- 鉴权成功 success
- 鉴权失败(瞬态) failed
在收到第一个消息之前是“1.初始态”,当收到第一个消息后进入“2.等待鉴权”,当鉴权成功的时候转为“3.鉴权成功”,当定时器超时后状态不是“3”的时候就进入“4.鉴权失败”。
由于收到第一个消息之前,Handler对象并没有实例化,而在实例化之后马上进入了状态“2”,所以状态“1”是瞬态;
由于鉴权失败之后关闭了ctx,之后Handler对象将不会再进入,所以状态“4”也可以理解为瞬态。
现在整理一下实现过程:
- 在Handler中设置一个域表示状态status,初始化为“init”;
- 收到第一个消息的时候,即read() 或者 read0()回调被触发的时候,status初始化为"waiting",在ChannelHandlerContext上启动一个5秒定时器(第一个消息,指status.equals("init"))
ctx.executor().schedule(new ConnectionTerminator(ctx), 5, TimeUnit.SECONDS);
- 其中ConnectionTerminator对象是自定义的一个线程回调,这个回调唯一的作用是是当状态不为success的时候,关掉连接
private class ConnectionTerminator implements Runnable{ ChannelHandlerContext ctx; public ConnectionTerminator(ChannelHandlerContext ctx) { // TODO Auto-generated constructor stub this.ctx = ctx; } @Override public void run() { // TODO Auto-generated method stub if (!status.equals("success")) { ctx.close(); status="failed"; } } }
- 在Login消息返回成功之后,把status置为"success"即可。
完整的例子如下:
第一步:加载读超时
ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("readTimeout", new ReadTimeoutHandler(120));
第二步:处理器入口
/** * 以Protocol buf作为消息体的Handler * @author yuantao * */ public class ProtoBufHandler extends SimpleChannelInboundHandler<Protocol> { private String status = "init"; @Override protected void channelRead0(ChannelHandlerContext ctx, Protocol msg) throws Exception { // TODO Auto-generated method stub // 放在Netty循环中跑线程 if (status.equals("init")) { ctx.executor().schedule(new ConnectionTerminator(ctx), 5, TimeUnit.SECONDS); status = "waiting"; } if (msg.hasLogin()) { int result = new MessageProcessor(msg, ctx).processLogin(msg.getLogin()); if (result == 0) { status = "success"; } } else { ctx.executor().execute(new MessageProcessor(msg, ctx)); } } private class ConnectionTerminator implements Runnable{ ChannelHandlerContext ctx; public ConnectionTerminator(ChannelHandlerContext ctx) { // TODO Auto-generated constructor stub this.ctx = ctx; } @Override public void run() { // TODO Auto-generated method stub if (!status.equals("success")) { ctx.close(); status = "failed"; } } } }
服务器运行在这种机制下的时候,需要客户端在120秒内发握手消息来维持长连接,比如30秒发一次握手。
时间: 2024-10-12 04:42:57