Netty5 Read事件处理过程_源码讲解

netty是对Nio的一个封装,关于网络的所有操作都是通过事件的方式完成的。例如连接创建、read事件、write事件都是通过Nio来完成 的。那netty是怎么启动监听的呢? 在什么地方启动的呢?此处不为大家设置悬念,一次性告诉大家。通过循环扫描的方式来实现监听的。具体的方法类位于NioEventLoop的run方法中 (赶紧进去看看吧!! 浅显易懂)。

下面是netty的acceptor线程创建连接的代码。位于类NioEventLoop的processSelectedKey中(至于 processSelectedKey是怎么被调用的,自己看看调用链就行了(eclipse用ctrl+Shift+H就可以查看到选中方法的调用 链))。

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }
 
        try {
            //得到当前的key关注的事件
            int readyOps = k.readyOps();
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            //一个刚刚创建的NioServersocketChannel感兴趣的事件是0。
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {//可以读取操作  --对于serverSocket来说就是acceptor事件、对于socketChannel来说就是read事件 
                //INFO: channel类型为io.netty.channel.socket.nio.NioSocketChannel unsafe类型为io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe
                Object obj = k.attachment();//得到NioServerSocketChannel或者NioSocketChannel
                if(obj instanceof NioServerSocketChannel){
                    System.out.println(obj.getClass().getName()+ " 开始接收连接");
                }else{
                    System.out.println(obj.getClass().getName()+ " 开始接收字节");
                }
                //不同的socketChannel对于那个的unsafe是不同的。例如Server端的是messageUnsafe 而 clinet端是byteUnsafe
                unsafe.read();//对于接受链接或者read兴趣都会添加进入read操作调用serverSocket->NioMessageUnsafe
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {//对于半包消息进行输出操作
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
 
                unsafe.finishConnect();
            }
        } catch (CancelledKeyException e) {
            unsafe.close(unsafe.voidPromise());
        }
    }

这里我们以Read事件的处理(NioByteUnsafe)为线索进行讲解。后续会有基于byte的unsafe进行讲解的(Unsafe不知道为啥要这 么叫,本人也感到挺费解的,不过现在看来感觉就是一个工具对象。不要从名称上惧怕它)。下面来看NioByteUnsafe(该类是AbstractNioByteChannel的一个内部类)的read方法进行讲 解。直接讲代码(后面也会有图形讲解,方便大家理解):

public void read() {
            //得到config对象、pipeline对象
            final ChannelConfig config = config();
            //得到对应的管道对象
            final ChannelPipeline pipeline = pipeline();
            //实际的内存分配器---
            final ByteBufAllocator allocator = config.getAllocator();
            final int maxMessagesPerRead = config.getMaxMessagesPerRead();
            RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
            if (allocHandle == null) {
                //创建一个allocHandle对象--AdaptiveRecvByteBufAllocator
                //RecvByteBufAllocator负责内存分配的算法问题 
                this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
            }
            if (!config.isAutoRead()) {
                removeReadOp();
            }

            ByteBuf byteBuf = null;
            int messages = 0;
            boolean close = false;
            try {
                int byteBufCapacity = allocHandle.guess();
                int totalReadAmount = 0;
                do {
                    //可能是 direct或者 heap  从与当前socket相关的allocator得到byteBuf数组
//                    byteBuf =allocHandle.allocate(allocator);
                    //每次从内核中读取数据netty都会分配内存
                    byteBuf = allocator.ioBuffer(byteBufCapacity);
                     //获得可以写入的容量的大小
                    int writable = byteBuf.writableBytes(); //分一个多大的内存就从socket中读取多大的数据
                    int localReadAmount = doReadBytes(byteBuf);//从socket中读取数据到bytebuf中
                    if (localReadAmount <= 0) {//发生了读取事件,但是读取的长度是负数,
                        // not was read release the buffer
                        byteBuf.release();//释放到Thread Cache中
                        close = localReadAmount < 0;//是否进行关闭,关键要看读取到的数据的长度是否为-1;
                        break;
                    }
                    //发起读取事件---如果是第一次积累数据的话,那么就会将当前的bytebuf作为累积对象,供继续使用
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;//由pipeline进行byteBuf的释放
                    //避免内存溢出,
                    if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
                        // Avoid overflow.
                        totalReadAmount = Integer.MAX_VALUE;
                        break;
                    }

                    totalReadAmount += localReadAmount;
                    if (localReadAmount < writable) {
                        // Read less than what the buffer can hold,
                        // which might mean we drained the recv buffer completely.
                        break;
                    }
                } while (++ messages < maxMessagesPerRead);//每次读取的消息的数量都会有限制的,也就说,每次处理read事件的消息量是可以配置的
                //读取完成---处理完一次 读取事件
                pipeline.fireChannelReadComplete();
                //对本次读取的数据量进行记录,便于下一次为当前的Channel分配合适大小的buffer
                allocHandle.record(totalReadAmount);

                if (close) {
                    closeOnRead(pipeline);
                    close = false;
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close);
            }
        }
    }
//上述代码段说明:
/**
 config.getRecvByteBufAllocator().newHandle(); 负责内存分配算法
    而
ByteBufAllocator 负责具体的内存分配-分配到堆还是直接内存
*/

这就是对一个read的处理基本流程,就是将从socket中读取到的放入到分配器分配的bytebuf,然后将其传入到pipeline.fireChannelRead(byteBuf);中,至于在pipeline是怎样的传递的,我们从这个方法中是无法查看到的。这也是我们这篇文章的主要内容(别的内存也很重要哦!关键是我已经添加了很多注释了!)。就是要看看在得到bytebuf后,pipeline是怎么处理传入进去的bytebuf的。我们来对pipeline.fireChannelRead(byteBuf);穷追(ctrl+shift+H eclipse)到具体的实现,

我们发现,最终会调用到的ChannelHandler接口的

void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

ChannelHandler有很多我们具体选用哪一个呢?动动脑子就知道,我们pipeline中存储的都是ChannelHandler,有哪些个Handler,就要看我们在启动代码中是怎样设置了。来看看我的启动代码(精简版,没有写全,所以这里看不懂得话,建议你写个Netty的小demo).上代码:

ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
//             .option(ChannelOption.ALLOCATOR, )//设置内存分配器
              .option(ChannelOption.SO_SNDBUF, 1024)//发送缓冲器
              .option(ChannelOption.SO_RCVBUF, 1024)
            .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)//接收缓冲器
             .handler(new LoggingHandler(LogLevel.INFO))//serverSocketChannel对应的ChannelPipeline的handler
             .childHandler(new ChannelInitializer<SocketChannel>() {//客户端新接入的连接socketChannel对应的ChannelPipeline的Handler
                 @Override
                 public void initChannel(SocketChannel ch) {
                     SocketChannelConfig config=ch.config();
                     ChannelPipeline p = ch.pipeline();
                     p
                     .addLast(new LineBasedFrameDecoder(30))//也会将回车符删除掉--是以换行符作为分隔的
                     .addLast(new DiscardServerHandler());
                 }
             });

由此可以看到,这里第一个被调用的ChannelHandler是LineBasedFrameDecoder。看看LineBasedFrameDecoder是怎么实现ChannelRead方法的。翻看了弗雷之后,我们终于找到了channelRead方法。由此可以看到,在AbstractNioByteChannel的read方法中的pipeline.fireChannelRead(byteBuf);按照我的启动代码(虽然说是按照我的,但是按照你们的也是这样,因为byte在通过网络接收之后,都要进行decode,第一个经过的channelHandler肯定是ByteToMessageDecoder,不信,你看看自己的启动代码试试),最终调用的是ByteToMessageDecoder.channelRead()  ,上代码:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            RecyclableArrayList out = RecyclableArrayList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    //缓冲区的大小没有超过需要写入的数据的大小
                    if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) {
                        expandCumulation(ctx, data.readableBytes());//扩展缓冲区--查看实现后,就是通过分配一个更大的,然后复制一下字节数据
                    }
                    cumulation.writeBytes(data);//将数据写入到积累对象中
                    data.release();//释放bytebuffer(heap或者direct)--通过引用的方式进行释放缓冲区(至于什么是引用方式释放,我们会有一个特定的章节进行讲解)
                }
                //收集完毕之后解析收集到的字符串---通常调用子类的方法实现,在具体实现中,用out来承载解析出来的msg
                callDecode(ctx, cumulation, out);//实现的时候,不要释放我们的累积对象cumulation
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
            if (cumulation != null && !cumulation.isReadable()) {//如果累积对象为null或者没有可读内容的话,那么就将累积对象释放掉(因为空了或者为null了)
                    cumulation.release();
                    cumulation = null;
                }
                int size = out.size();
                decodeWasNull = size == 0;
                //针对解析后的out结果中的msg的对象,将解析出来的message(具体的类型,请自己看实现.是怎样做的)传递到pipeline中。
                for (int i = 0; i < size; i ++) {
                    ctx.fireChannelRead(out.get(i));
                }
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

提示: 一个pipeline,为某个socketChannel所有,也就是说pipeline里的channelHandler,也是为某个socketchannel所享用的。不会出现多个线程共享一个channelHanler的情况(我们可以让他们共享一个handler,但是我们得保证这个共享的handler是一个无状态的handler,例如我们现在就要讲解的ByteToMessageDecoder就是一个有状态的handler,所以就不能共享,就要在每次初始化socketChannel的pipeline时,都要重新new一个ByteToMessageDecoder,不信大家,可以可以看一下ByteToMessageDecoder的实现。我直接粘贴代码吧!!(看看我的注释哦)如下:).

public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {

    ByteBuf cumulation;//因为单词cumulation --累积 意思,也就是,这个成员对象,就是用来作为半包的累积存储的对象来使用的
    private boolean singleDecode;
    private boolean decodeWasNull;
    private boolean first;
}

----------------------------------------------------------------------------------------------------------

讲到这里,会涉及到一个解析出来的message在被pipeline中的其它handler处理完毕后的内存释放问题。怎么解决? 什么时候释放这些message占用的空间呢?

时间: 2024-08-02 15:12:57

Netty5 Read事件处理过程_源码讲解的相关文章

你也可以用java的swing可以做出这么炫的mp3播放器_源码下载

I had published the blog : 你用java的swing可以做出这么炫的mp3播放器吗? and to display some screenshots about this MP3 Player. The target for this blog is that to publish the source code, and you can download the source code and give me some advices if possible. The

cocos2D-X源码讲解之从cocos2D-X学习OpenGL(1)----cocos2D-X渲染结构

 个人原创,欢迎转载,转载请注明原文地址http://blog.csdn.net/bill_man 从本篇文章开始,将分析cocos2D-X 3.0源代码,第一部分是从cocos2D-X学习OpenGL,也就是分析cocos2D-X 3.0的渲染代码,本篇首先介绍cocos2D-X 3.0的渲染结构,使用的是3.0正式版. void DisplayLinkDirector::mainLoop() { if (_purgeDirectorInNextLoop) { //只有一种情况会调用到这里来,

Android应用程序启动过程——Launcher源码分析

当我们在Launcher界面单击一个应用程序图标时就会启动一个程序,那这一个过程究竟发生了些哪样呢?让我们跟踪Launcher源码来分析一下吧. 先上流程图: step1.追踪Launcher  从源码中我们可以发现Launcher其实也是一个程序,它继承于Activity.找到该文件中的onCreate()方法,代码片段如下: protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceSta

java画图程序_图片用字母画出来_源码发布_版本二

在上一个版本:java画图程序_图片用字母画出来_源码发布 基础上,增加了图片同比例缩放,使得大像素图片可以很好地显示画在Notepad++中. 项目结构: 运行效果1: 原图:http://images.cnblogs.com/cnblogs_com/hongten/356471/o_imagehandler_result1.png 运行效果2: 原图:http://images.cnblogs.com/cnblogs_com/hongten/356471/o_imagehandler_res

源码讲解ActionBar的各种用法

1. Navigation Drawer 许多应用程序都使用了Navigation Drawer,如网易邮箱客户端.该控件位于 android.support.v4.widget.DrawerLayout ,用法如下,点击下载源码: <android.support.v4.widget.DrawerLayout xmlns:android="http://schemas.android.com/apk/res/android" android:id="@+id/draw

java画图程序_图片用字母画出来_源码发布

在之前写了一篇blog:java画图程序_图片用字母画出来 主要是把一些调试的截图发布出来,现在程序调试我认为可以了(当然,你如果还想调试的话,也可以下载源码自己调试). 就把源码发布出来. 项目结构: 资源文件: 原图:http://images.cnblogs.com/cnblogs_com/hongten/356471/o_imagehandler_resource.png 运行效果: 原图:http://images.cnblogs.com/cnblogs_com/hongten/356

Pixhawk之姿态控制篇(1)_源码算法分析(超级有料)

一.开篇 姿态控制篇终于来了.来了.来了~~~ 心情爽不爽?愉悦不愉悦?开心不开心? 喜欢的话就请我吃顿饭吧,哈哈. 其实这篇blog一周前就应该写的,可惜被上一篇blog霸占了.但是也不算晚,整理了很多算法基础知识,使得本篇blog更充实.一人之力总是有限的,难免有不足之处,大家见谅,有写的不好的地方劳烦指正.看到标题了吧,属于连载篇,所以后续还会有相关问题的补充的. 二.版权声明 博主:summer 声明:喝水不忘挖井人,转载请注明出处. 原文地址:http://blog.csdn.net/

第5章 软件包管理(2)_源码包安装和脚本安装

3 源码包管理 3.1 源码包和RPM包的区别 (1)区别:①安装之前的区别:概念上的区别:②安装之后:安装位置不同 (2)RPM包的默认安装位置 RPM包默认安装路径 /etc 配置文件安装目录 /usr/bin 可执行的命令安装目录 /usr/lib 程序所使用的函数库保存位置 /usr/share/doc 基本软件使用手册保存位置 /usr/share/man 帮助文件保存位置 (3)源码包安装位置:安装在指定位置中,一般是/usr/local/软件名/ (4)安装位置不同带来的影响 ①R

深入Redis内部-Redis 源码讲解(转)

Redis作为 NoSQL 数据库的杰出代表,一直广受关注,其轻量级的敏捷架构,向来有存储中的瑞士军刀之称.下面推荐的一篇文章,从源码的角度讲解了Redis 的整个工作流程,是了解 Redis 流程的绝佳文章.英文的,想搞懂还是要花些时间的 原文链接:Redis: under the hood 目录: Startup Beginning global server state initialization Setting up command table Loading config file