上一篇 介绍了事件监听、责任链模型、socket接口和IO模型、线程模型等基本概念,以及Netty的整体结构,这篇就来说下Netty三大核心模块之一:事件监听和处理。
前面提到,Netty是一个NIO框架,它将IO通道的建立、可读、可写等状态变化,抽象成事件,以责任链的方式进行传递,可以在处理链上插入自定义的Handler,对感兴趣的事件进行监听和处理。
通过介绍,你会了解到:
- 事件监听和处理模型
- 事件监听:EventLoop
- 事件处理:ChannelPipeline和ChannelHandler
- 使用Netty实现Websocket协议
文章末尾有福利 ~
事件监听和处理模型
进行网络编程时,一般的编写过程是这样的:
- 创建服务端Socket,监听某个端口;
- 当有客户端连接时,会创建一个新的客户端Socket,监听数据的可读、可写状态,每一个连接请求都会创建一个客户端Socket;
- 读取和写入数据都会调用Socket提供的接口,接口列表在上一篇提到过;
传统的模型,每个客户端Socket会创建一个单独的线程监听socket事件,一方面系统可创建的线程数有限,限制了并发数,一方面线程过多,线程切换频繁,导致性能严重下降。
随着操作系统IO模型的发展,可以采用多路复用IO,一个线程监听多个Socket,另外,服务端处理客户端连接,与客户端Socket的监听,可以在不同的线程进行处理。
Netty就是采用多路复用IO进行事件监听,另外,使用不同的线程分别处理客户端的连接、数据读写。
整个处理结构如下图,简单说明下:
- Boss EventLoopGroup主要处理客户端的connect事件,包含多个EventLoop,每个EventLoop一个线程;
- Worker EventLoopGroup主要处理客户端Socket的数据read、write事件,包含多个EventLoop,每个EventLoop一个线程;
- 无论是Boos还是Worker,事件的处理都是通过Channel Pipleline组织的,它是责任链模式的实现,包含一个或多个Handler;
- 侦听一个端口,只会绑定到Boss EventLoopGroup中的一个Eventloop;
- Worker EventLoopGroup中的一个Eventloop,可以监听多个客户端Socket;
EventLoop
一个EventLoop其实和一个特定的线程绑定, 并且在其生命周期内, 绑定的线程都不会再改。
EventLoop肩负着两种任务:
- 第一个是作为 IO 线程, 执行与 Channel 相关的 IO 操作, 包括 调用select等待就绪的IO事件、读写数据与数据的处理等;
- 第二个任务是作为任务队列, 执行 taskQueue 中的任务, 例如用户调用eventLoop.schedule提交的定时任务也是这个线程执行的;
第一个任务比较好理解,主要解释下第二个:从socket数据到数据处理,再到写入响应数据,Netty都在一个线程中处理,主要是为了线程安全考虑,减少竞争和线程切换,通过任务队列的方式,可以在用户线程提交处理逻辑,在Eventloop中执行。
整个EventLoop干的事情就是select -> processIO -> runAllTask,processIO处理IO事件相关的逻辑,runAllTask处理任务队列中的任务,如果执行的任务过多,会影响IO事件的处理,所以会限制任务处理的时间,整个处理过程如下图:
EventLoop的run代码如下:
1234567891011121314151617181920212223242526272829303132333435363738394041424344 |
protected void run() { for (; ; ) { oldWakenUp = wakenUp.getAndSet(false); try { if (hasTasks()) { //如果有任务,快速返回 selectNow(); } else { select(); //如果没任务,等待事件返回 if (wakenUp.get()) { selector.wakeup(); } } cancelledKeys = 0; final long ioStartTime = System.nanoTime(); needsToSelectAgain = false; //处理IO事件 if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } //计算IO处理时间 final long ioTime = System.nanoTime() - ioStartTime; final int ioRatio = this.ioRatio; //默认为50 //处理提交的任务 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { try { Thread.sleep(1000); } catch (InterruptedException e) { } } } } |
ChannelPipeline和ChannelHandler
ChannelPipeline是一个接口,其有一个默认的实现类DefaultChannelPipeline,内部有两个属性:head和tail,
这两者都实现了ChannelHandler接口,对应处理链的头和尾。
1234567891011 |
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head;} |
每个Channel创建时,会创建一个ChannelPipeline对象,来处理channel的各种事件,可以在运行时动态进行动态修改其中的 ChannelHandler。
ChannelHandler承载业务处理逻辑的地方,我们接触最多的类,可以自定义Handler,加入处理链中,实现自定义逻辑。
ChannelHandler 可分为两大类:ChannelInboundHandler 和 ChannelOutboundHandle,这两接口分别对应入站和出站消息的处理,对应数据读取和数据写入。它提供了接口方法供我们实现,处理各种事件。
12345678910 |
public interface ChannelInboundHandler extends ChannelHandler { void channelRegistered(ChannelHandlerContext ctx) throws Exception; void channelUnregistered(ChannelHandlerContext ctx) throws Exception; void channelActive(ChannelHandlerContext ctx) throws Exception; void channelInactive(ChannelHandlerContext ctx) throws Exception; void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception; void channelReadComplete(ChannelHandlerContext ctx) throws Exception; void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception; void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;} |
自定义Handler时,一般继承ChannelInboundHandlerAdapter或 ChannelOutboundHandlerAdapter。
需要注意的是,不建议在 ChannelHandler 中直接实现耗时或阻塞的操作,因为这可能会阻塞 Netty 工作线程,导致 Netty 无法及时响应 IO 处理。
使用Netty实现Websocket协议
Websocket协议
不是本篇的重点,简单说明下:
- 是一种长连接协议,大部分浏览器都支持,通过websocket,服务端可以主动发消息给客户端;
- Websocket协议,在握手阶段使用HTTP协议,握手完成之后,走Websocket自己的协议;
- Websocket是一种二进制协议;
初始化
Netty提供了ChannelInitializer类方便我们初始化,创建WebSocketServerInitializer类,继承ChannelInitializer类,用于添加ChannelHandler:
123456789101112131415 |
public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> { @Resource private CustomTextFrameHandler customTextFrameHandler; @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("codec-http", new HttpServerCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); pipeline.addLast("websocket-protocal-handler",new WebSocketServerProtocolHandler()); pipeline.addLast("custome-handler", customTextFrameHandler); }} |
分析下这几个Handler,都是Netty默认提供的:
- HttpServerCodec:用于解析Http请求,主要在握手阶段进行处理;
- HttpObjectAggregator:用于合并Http请求头和请求体,主要在握手阶段进行处理;
- WebSocketServerProtocolHandler:处理Websocket协议;
- CustomTextFrameHandler:自定义的Handler,用于添加自己的业务逻辑。
是不是很方便,经过WebSocketServerProtocolHandler处理后,读取出来的就是文本数据了,不用自己处理数据合包、拆包问题。
CustomTextFrameHandler
自定义的Handler,进行业务处理:
12345678910111213 |
public class CustomTextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { @Override protected void channelRead0(final ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception { final String content = frame.text(); System.out.println("接收到数据:"+content); // 回复数据 TextWebSocketFrame respFrame = new TextWebSocketFrame("我收到了你的数据"); if (ctx.channel().isWritable()) { ChannelFuture future = ctx.writeAndFlush(respFrame); } }} |
福利说明
最后,说下福利:小爱音箱F码。
准备了2份,主要为了感谢「微信公众号」和「掘金社区」的朋友,每一份包括1个小爱音箱F码和1个小爱音箱 mini F码。
小米手机F码源自于英文单词”Friend”,是小米公司提供给小米核心用户及为小米做出贡献的网友的优先购买权,如果您有小米F码的话无需等待即可直接利用小米F码购买相关产品!
简单来说,F码就是不用抢了,可以直接购买 ~
抽奖截止时间
4月9号中午12点
抽奖规则
掘金社区
- 需要关注我的掘金账号才有效,个人主页;
- 使用微信抽奖助手随机抽取for掘金社区;
微信公众号
- 需要关注我的微信公众号才有效;
- 使用微信抽奖助手随机抽取for微信公众号;
原文地址:https://www.cnblogs.com/qqtalk/p/8726896.html