Netty事件监听和处理(下)

上一篇 介绍了事件监听、责任链模型、socket接口和IO模型、线程模型等基本概念,以及Netty的整体结构,这篇就来说下Netty三大核心模块之一:事件监听和处理。

前面提到,Netty是一个NIO框架,它将IO通道的建立、可读、可写等状态变化,抽象成事件,以责任链的方式进行传递,可以在处理链上插入自定义的Handler,对感兴趣的事件进行监听和处理。

通过介绍,你会了解到:

  • 事件监听和处理模型
  • 事件监听:EventLoop
  • 事件处理:ChannelPipeline和ChannelHandler
  • 使用Netty实现Websocket协议

文章末尾有福利 ~

事件监听和处理模型

进行网络编程时,一般的编写过程是这样的:

  • 创建服务端Socket,监听某个端口;
  • 当有客户端连接时,会创建一个新的客户端Socket,监听数据的可读、可写状态,每一个连接请求都会创建一个客户端Socket;
  • 读取和写入数据都会调用Socket提供的接口,接口列表在上一篇提到过;

传统的模型,每个客户端Socket会创建一个单独的线程监听socket事件,一方面系统可创建的线程数有限,限制了并发数,一方面线程过多,线程切换频繁,导致性能严重下降。

随着操作系统IO模型的发展,可以采用多路复用IO,一个线程监听多个Socket,另外,服务端处理客户端连接,与客户端Socket的监听,可以在不同的线程进行处理。

Netty就是采用多路复用IO进行事件监听,另外,使用不同的线程分别处理客户端的连接、数据读写。

整个处理结构如下图,简单说明下:

  • Boss EventLoopGroup主要处理客户端的connect事件,包含多个EventLoop,每个EventLoop一个线程;
  • Worker EventLoopGroup主要处理客户端Socket的数据read、write事件,包含多个EventLoop,每个EventLoop一个线程;
  • 无论是Boos还是Worker,事件的处理都是通过Channel Pipleline组织的,它是责任链模式的实现,包含一个或多个Handler;
  • 侦听一个端口,只会绑定到Boss EventLoopGroup中的一个Eventloop;
  • Worker EventLoopGroup中的一个Eventloop,可以监听多个客户端Socket;

EventLoop

一个EventLoop其实和一个特定的线程绑定, 并且在其生命周期内, 绑定的线程都不会再改。

EventLoop肩负着两种任务:

  • 第一个是作为 IO 线程, 执行与 Channel 相关的 IO 操作, 包括 调用select等待就绪的IO事件、读写数据与数据的处理等;
  • 第二个任务是作为任务队列, 执行 taskQueue 中的任务, 例如用户调用eventLoop.schedule提交的定时任务也是这个线程执行的;

第一个任务比较好理解,主要解释下第二个:从socket数据到数据处理,再到写入响应数据,Netty都在一个线程中处理,主要是为了线程安全考虑,减少竞争和线程切换,通过任务队列的方式,可以在用户线程提交处理逻辑,在Eventloop中执行。

整个EventLoop干的事情就是select -> processIO -> runAllTask,processIO处理IO事件相关的逻辑,runAllTask处理任务队列中的任务,如果执行的任务过多,会影响IO事件的处理,所以会限制任务处理的时间,整个处理过程如下图:

EventLoop的run代码如下:

1234567891011121314151617181920212223242526272829303132333435363738394041424344

protected void run() {     for (; ; ) {         oldWakenUp = wakenUp.getAndSet(false);         try {             if (hasTasks()) { //如果有任务,快速返回                 selectNow();             } else {                 select(); //如果没任务,等待事件返回                 if (wakenUp.get()) {                     selector.wakeup();                 }             }             cancelledKeys = 0;             final long ioStartTime = System.nanoTime();             needsToSelectAgain = false;

//处理IO事件             if (selectedKeys != null) {                 processSelectedKeysOptimized(selectedKeys.flip());             } else {                 processSelectedKeysPlain(selector.selectedKeys());             }

//计算IO处理时间             final long ioTime = System.nanoTime() - ioStartTime;             final int ioRatio = this.ioRatio; //默认为50

//处理提交的任务             runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

if (isShuttingDown()) {                 closeAll();                 if (confirmShutdown()) {                     break;                 }             }         } catch (Throwable t) {             try {                 Thread.sleep(1000);             } catch (InterruptedException e) {             }         }     } }

ChannelPipeline和ChannelHandler

ChannelPipeline是一个接口,其有一个默认的实现类DefaultChannelPipeline,内部有两个属性:head和tail,
这两者都实现了ChannelHandler接口,对应处理链的头和尾。

1234567891011

 protected DefaultChannelPipeline(Channel channel) {     this.channel = ObjectUtil.checkNotNull(channel, "channel");     succeededFuture = new SucceededChannelFuture(channel, null);     voidPromise =  new VoidChannelPromise(channel, true);

tail = new TailContext(this);     head = new HeadContext(this);

head.next = tail;     tail.prev = head;}

每个Channel创建时,会创建一个ChannelPipeline对象,来处理channel的各种事件,可以在运行时动态进行动态修改其中的 ChannelHandler。

ChannelHandler承载业务处理逻辑的地方,我们接触最多的类,可以自定义Handler,加入处理链中,实现自定义逻辑。

ChannelHandler 可分为两大类:ChannelInboundHandler 和 ChannelOutboundHandle,这两接口分别对应入站和出站消息的处理,对应数据读取和数据写入。它提供了接口方法供我们实现,处理各种事件。

12345678910

public interface ChannelInboundHandler extends ChannelHandler {	void channelRegistered(ChannelHandlerContext ctx) throws Exception;	void channelUnregistered(ChannelHandlerContext ctx) throws Exception;	void channelActive(ChannelHandlerContext ctx) throws Exception;	void channelInactive(ChannelHandlerContext ctx) throws Exception;	void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;	void channelReadComplete(ChannelHandlerContext ctx) throws Exception;	void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;	void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;}

自定义Handler时,一般继承ChannelInboundHandlerAdapter或 ChannelOutboundHandlerAdapter。

需要注意的是,不建议在 ChannelHandler 中直接实现耗时或阻塞的操作,因为这可能会阻塞 Netty 工作线程,导致 Netty 无法及时响应 IO 处理。

使用Netty实现Websocket协议

Websocket协议

不是本篇的重点,简单说明下:

  • 是一种长连接协议,大部分浏览器都支持,通过websocket,服务端可以主动发消息给客户端;
  • Websocket协议,在握手阶段使用HTTP协议,握手完成之后,走Websocket自己的协议;
  • Websocket是一种二进制协议;
初始化

Netty提供了ChannelInitializer类方便我们初始化,创建WebSocketServerInitializer类,继承ChannelInitializer类,用于添加ChannelHandler:

123456789101112131415

public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {

@Resource	private CustomTextFrameHandler customTextFrameHandler;

@Override    public void initChannel(SocketChannel ch) throws Exception {        ChannelPipeline pipeline = ch.pipeline();        pipeline.addLast("codec-http", new HttpServerCodec());        pipeline.addLast("aggregator", new HttpObjectAggregator(65536));

pipeline.addLast("websocket-protocal-handler",new WebSocketServerProtocolHandler());        pipeline.addLast("custome-handler", customTextFrameHandler);    }}

分析下这几个Handler,都是Netty默认提供的:

  • HttpServerCodec:用于解析Http请求,主要在握手阶段进行处理;
  • HttpObjectAggregator:用于合并Http请求头和请求体,主要在握手阶段进行处理;
  • WebSocketServerProtocolHandler:处理Websocket协议;
  • CustomTextFrameHandler:自定义的Handler,用于添加自己的业务逻辑。

是不是很方便,经过WebSocketServerProtocolHandler处理后,读取出来的就是文本数据了,不用自己处理数据合包、拆包问题。

CustomTextFrameHandler

自定义的Handler,进行业务处理:

12345678910111213

public class CustomTextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {    @Override    protected void channelRead0(final ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {        final String content = frame.text();        System.out.println("接收到数据:"+content);   

// 回复数据        TextWebSocketFrame respFrame = new TextWebSocketFrame("我收到了你的数据");        if (ctx.channel().isWritable()) {		      ChannelFuture future = ctx.writeAndFlush(respFrame);		  }			            }}

福利说明

最后,说下福利:小爱音箱F码。

准备了2份,主要为了感谢「微信公众号」和「掘金社区」的朋友,每一份包括1个小爱音箱F码和1个小爱音箱 mini F码。

小米手机F码源自于英文单词”Friend”,是小米公司提供给小米核心用户及为小米做出贡献的网友的优先购买权,如果您有小米F码的话无需等待即可直接利用小米F码购买相关产品!

简单来说,F码就是不用抢了,可以直接购买 ~

抽奖截止时间

4月9号中午12点

抽奖规则
掘金社区
  • 需要关注我的掘金账号才有效,个人主页
  • 使用微信抽奖助手随机抽取for掘金社区;
微信公众号
  • 需要关注我的微信公众号才有效;
  • 使用微信抽奖助手随机抽取for微信公众号;

原文地址:https://www.cnblogs.com/qqtalk/p/8726896.html

时间: 2024-10-08 18:18:14

Netty事件监听和处理(下)的相关文章

Netty事件监听和处理(上)

通过介绍,你会了解到: 事件监听.NIO.线程模型等相关概念: Netty总体结构: 事件监听和处理: 项目实践总结: 本篇先介绍下前两节,下一篇介绍后两节. 本篇最后会说明下福利的抽取规则,大家积极参与 >_< 相关概念 Netty是一个NIO框架,它将IO通道的建立.可读.可写等状态变化,抽象成事件,以责任链的方式进行传递,可以在处理链上插入自定义的Handler,对感兴趣的事件进行监听和处理. 所以,先介绍下事件监听.责任链模型.socket接口和IO模型.线程模型等基本概念,对后面理解

js中如何在不影响既有事件监听的前提下新增监听器

一. 需求澄清 比如某个按钮已经绑定了2-3个对Window对象的load事件的监听,现在需要添加一个新的对click事件的监听器,但在一定条件下才会同时触发原有的2-3个load监听器,否则只触发新添加的这个事件. 假定新添加的监听函数为: function additionalListener(){ console.log('should do something else'); } 二. ES5方法 ES5中可以通过添加包装函数的方式来实现: _windowonload = window.

Zookeeper 事件监听 - 史上最详解读

目录 写在前面 1.1. Curator 事件监听 1.1.1. Watcher 标准的事件处理器 1.1.2. NodeCache 节点缓存的监听 1.1.3. PathChildrenCache 子节点监听 1.1.4. Tree Cache 节点树缓存 写在最后 疯狂创客圈 亿级流量 高并发IM 实战 系列 疯狂创客圈 Java 分布式聊天室[ 亿级流量]实战系列之 -25[ 博客园 总入口 ] 写在前面 ? 大家好,我是作者尼恩.目前和几个小伙伴一起,组织了一个高并发的实战社群[疯狂创客

屏幕触摸事件监听,判断上下左右的操作行为,判断方法缩小的操作行为

在手机屏幕上能够实现的人机交互行为,大致包括点击按钮,拉动滑动块,物体缩放,上下左右拉动等. 手机屏幕触摸事件的监听方法: 1.首先要设置一块布局区域,frameLayout/LinearLayout等都可以,并为布局设置id: 2.在Activity中声明相应的布局类型,并通过findViewById()方法找到该布局,然后为该布局区域设置setOnTouchListener()方法,就能监听在相应屏幕触摸操作 实现屏幕触摸事件监听的代码: private LinearLayout Land;

JavaScript-4.5 事件大全,事件监听---ShinePans

绑定事件 <input type="bubtton" onclick="javascript:alert('I am clicked');"> 处理事件 <script language="JavaScript" for="对象" event="事件"> ... (事件处理代码) ... </script> 鼠标事件举例 <script language="

[基础控件]---状态切换控件CompoundButton及其子类CheckBox、RadioButton、ToggleButton、switch事件监听与场景使用

一.事件监听 对于普通的Button,对其进行事件监听Google官方给出了常见的三种监听方式:1.对每一个button设置事件监听器button.setOnClickListener(View.OnclickListener  listener);此种方法当button按钮较多时代码显得多.乱.不够简洁明了. 2.在Activity中实现接口View.OnclickListener,然后重写void onClick(View v)方法,在方法中通过switch(v.getId())予以区分不同

Windows 8 应用程序前后台切换事件监听

在一些情况下,我们需要监听应用程序切换到后台或者从后台切换至前台的事件,从而进行相关处理操作.支付宝应用锁屏(IOS,Android平台)的处理中就需要监听此事件,在用户将应用切换至后台一段时间后再切换至前台的情况下就需要弹出锁屏页面. 下图给出Windows 应用商店应用的生命周期图,应用前后台切换就是在运行和挂起直接进行切换,关于生命周期的详细介绍可以参阅官方文档:http://msdn.microsoft.com/zh-cn/library/windows/apps/hh464925.as

Java中的事件监听机制

鼠标事件监听机制的三个方面: 1.事件源对象: 事件源对象就是能够产生动作的对象.在Java语言中所有的容器组件和元素组件都是事件监听中的事件源对象.Java中根据事件的动作来区分不同的事件源对象,动作发生在哪个组件上,那么该组件就是事件源对象 2.事件监听方法: addMouseListener(MouseListener ml) ;该方法主要用来捕获鼠标的释放,按下,点击,进入和离开的动作:捕获到相应的动作后,交由事件处理类(实现MouseListener接口)进行处理. addAction

【COCOS CREATOR 系列教程之二】脚本开发篇&事件监听、常用函数等示例整合

[Cocos Creator ](千人群):  432818031 上一篇,介绍了Himi在使用过cc所有组件后的一篇总结,没有具体介绍每个组件的原因在于官方文档很齐全,而且也有视频的介绍. 所以希望童鞋们可以把我这两篇博文当成对组件.脚本两部分开发的整理与总结. 后续的文章,Himi应该主要更新一些官方还未补充或者还没有的教程.避免无用功. 下面直接放出代码,因为不是很难理解.所以不再一一赘述,都是常用的函数.事件监听.动作回调.定时器等开发过程中必接触的. 大致内容如下: cc 属性介绍 获