Netty源码分析第7章(编码器和写数据)---->第3节: 写buffer队列

Netty源码分析七章: 编码器和写数据

第三节: 写buffer队列

之前的小结我们介绍过, writeAndFlush方法其实最终会调用write和flush方法

write方法最终会传递到head节点, 调用HeadContext的write方法:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

这里通过unsafe对象的write方法, 将消息写入到缓存中, 具体的执行逻辑, 我们在这个小结进行剖析

我们跟到AbstractUnsafe的write方法中:

public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();
    //负责缓冲写进来的byteBuf
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
        ReferenceCountUtil.release(msg);
        return;
    }
    int size;
    try {
        //非堆外内存转化为堆外内存
        msg = filterOutboundMessage(msg);
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }
    //插入写队列
    outboundBuffer.addMessage(msg, size, promise);
}

首先看 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer

ChannelOutboundBuffer的功能就是缓存写入的ByteBuf

我们继续看try块中的 msg = filterOutboundMessage(msg)

这步的意义就是将非对外内存转化为堆外内存

filterOutboundMessage方法方法最终会调用AbstractNioByteChannel中的filterOutboundMessage方法:

protected final Object filterOutboundMessage(Object msg) {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        //是堆外内存, 直接返回
        if (buf.isDirect()) {
            return msg;
        }
        return newDirectBuffer(buf);
    }
    if (msg instanceof FileRegion) {
        return msg;
    }
    throw new UnsupportedOperationException(
            "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}

首先判断msg是否byteBuf对象, 如果是, 判断知否堆外内存, 如果是堆外内存, 则直接返回, 否则, 通过newDirectBuffer(buf)这种方式转化为堆外内存

回到write方法中:

outboundBuffer.addMessage(msg, size, promise)将已经转化为堆外内存的msg插入到写队列

我们跟到addMessage方法当中, 这是ChannelOutboundBuffer中的方法:

public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
        tailEntry = entry;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
        tailEntry = entry;
    }
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }
    incrementPendingOutboundBytes(size, false);
}

首先通过 Entry.newInstance(msg, size, total(msg), promise) 的方式将msg封装成entry

然后通过调整tailEntry, flushedEntry, unflushedEntry三个指针, 完成entry的添加

这三个指针均是ChannelOutboundBuffer的成员变量

flushedEntry指向第一个被flush的entry

unflushedEntry指向第一个未被flush的entry

也就是说, 从flushedEntry到unflushedEntry之间的entry, 都是被已经被flush的entry

tailEntry指向最后一个entry, 也就是从unflushedEntry到tailEntry之间的entry都是没flush的entry

我们回到代码中:

创建了entry之后首先判断尾指针是否为空, 在第一次添加的时候, 均是空, 所以会将flushedEntry设置为null, 并且将尾指针设置为当前创建的entry

最后判断unflushedEntry是否为空, 如果第一次添加这里也是空, 所以这里将unflushedEntry设置为新创建的entry

第一次添加如下图所示

7-3-1

如果不是第一次调用write方法, 则会进入 if (tailEntry == null) 中else块:

Entry tail = tailEntry  这里tail就是当前尾节点

tail.next = entry  代表尾节点的下一个节点指向新创建的entry

tailEntry = entry  将尾节点也指向entry

这样就完成了添加操作, 其实就是将新创建的节点追加到原来尾节点之后

第二次添加 if (unflushedEntry == null) 会返回false, 所以不会进入if块

第二次添加之后指针的指向情况如下图所示:

7-3-4

以后每次调用write, 如果没有调用flush的话都会在尾节点之后进行追加

回到代码中, 看这一步incrementPendingOutboundBytes(size, false)

这步时统计当前有多少字节需要被写出, 我们跟到这个方法中:

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size == 0) {
        return;
    }
    //TOTAL_PENDING_SIZE_UPDATER当前缓冲区里面有多少待写的字节
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
    //getWriteBufferHighWaterMark() 最高不能超过64k
    if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
        setUnwritable(invokeLater);
    }
}

看这一步:

long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size)

TOTAL_PENDING_SIZE_UPDATER表示当前缓冲区还有多少待写的字节, addAndGet就是将当前的ByteBuf的长度进行累加, 累加到newWriteBufferSize中

在继续看判断 if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark())

channel.config().getWriteBufferHighWaterMark() 表示写buffer的高水位值, 默认是64k, 也就是说写buffer的最大长度不能超过64k

如果超过了64k, 则会调用setUnwritable(invokeLater)方法设置写状态

我们跟到setUnwritable(invokeLater)方法中:

private void setUnwritable(boolean invokeLater) {
    for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue | 1;
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            if (oldValue == 0 && newValue != 0) {
                fireChannelWritabilityChanged(invokeLater);
            }
            break;
        }
    }
}

这里通过自旋和cas操作, 传播一个ChannelWritabilityChanged事件, 最终会调用handler的channelWritabilityChanged方法进行处理

以上就是写buffer的相关逻辑

原文地址:https://www.cnblogs.com/xiangnan6122/p/10208177.html

时间: 2024-10-14 04:07:47

Netty源码分析第7章(编码器和写数据)---->第3节: 写buffer队列的相关文章

Netty源码分析第7章(编码器和写数据)----&gt;第4节: 刷新buffer队列

Netty源码分析第七章: 编码器和写数据 第四节: 刷新buffer队列 上一小节学习了writeAndFlush的write方法, 这一小节我们剖析flush方法 通过前面的学习我们知道, flush方法通过事件传递, 最终会传递到HeadContext的flush方法: public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); } 这里最终会调用AbstractUnsafe的flush方法

Netty源码分析第7章(编码器和写数据)----&gt;第2节: MessageToByteEncoder

Netty源码分析第七章: Netty源码分析 第二节: MessageToByteEncoder 同解码器一样, 编码器中也有一个抽象类叫MessageToByteEncoder, 其中定义了编码器的骨架方法, 具体编码逻辑交给子类实现 解码器同样也是个handler, 将写出的数据进行截取处理, 我们在学习pipeline中我们知道, 写数据的时候会传递write事件, 传递过程中会调用handler的write方法, 所以编码器码器可以重写write方法, 将数据编码成二进制字节流然后再继

Netty源码分析第6章(解码器)----&gt;第1节: ByteToMessageDecoder

Netty源码分析第六章: 解码器 概述: 在我们上一个章节遗留过一个问题, 就是如果Server在读取客户端的数据的时候, 如果一次读取不完整, 就触发channelRead事件, 那么Netty是如何处理这类问题的, 在这一章中, 会对此做详细剖析 之前的章节我们学习过pipeline, 事件在pipeline中传递, handler可以将事件截取并对其处理, 而之后剖析的编解码器, 其实就是一个handler, 截取byteBuf中的字节, 然后组建成业务需要的数据进行继续传播 编码器,

Netty源码分析第3章(客户端接入流程)----&gt;第5节: 监听读事件

Netty源码分析第三章: 客户端接入流程 第五节: 监听读事件 我们回到AbstractUnsafe的register0()方法: private void register0(ChannelPromise promise) { try { //省略代码 //做实际的注册 doRegister(); neverRegistered = false; registered = true; //触发事件 pipeline.invokeHandlerAddedIfNeeded(); safeSetS

Netty源码分析第3章(客户端接入流程)----&gt;第4节: NioSocketChannel注册到selector

Netty源码分析第三章: 客户端接入流程 第四节: NioSocketChannel注册到selector 我们回到最初的NioMessageUnsafe的read()方法: public void read() { //必须是NioEventLoop方法调用的, 不能通过外部线程调用 assert eventLoop().inEventLoop(); //服务端channel的config final ChannelConfig config = config(); //服务端channel

Netty源码分析第3章(客户端接入流程)----&gt;第3节: NioSocketChannel的创建

Netty源码分析第三章: 客户端接入流程 第三节: NioSocketChannel的创建 回到上一小结的read()方法: public void read() { //必须是NioEventLoop方法调用的, 不能通过外部线程调用 assert eventLoop().inEventLoop(); //服务端channel的config final ChannelConfig config = config(); //服务端channel的pipeline final ChannelPi

Netty源码分析第4章(pipeline)----&gt;第4节: 传播inbound事件

Netty源码分析第四章: pipeline 第四节: 传播inbound事件 有关于inbound事件, 在概述中做过简单的介绍, 就是以自己为基准, 流向自己的事件, 比如最常见的channelRead事件, 就是对方发来数据流的所触发的事件, 己方要对这些数据进行处理, 这一小节, 以激活channelRead为例讲解有关inbound事件的处理流程 在业务代码中, 我们自己的handler往往会通过重写channelRead方法来处理对方发来的数据, 那么对方发来的数据是如何走到chan

Netty源码分析第4章(pipeline)----&gt;第7节: 前章节内容回顾

Netty源码分析第四章: pipeline 第七节: 前章节内容回顾 我们在第一章和第三章中, 遗留了很多有关事件传输的相关逻辑, 这里带大家一一回顾 首先看两个问题: 1.在客户端接入的时候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))为什么会调用到ServerBootstrap的内部类ServerBootstrapAcceptor中的channelRead()方法 2.客户端handler是什么时候被添加

Netty源码分析第5章(ByteBuf)----&gt;第1节: AbstractByteBuf

Netty源码分析第五章: ByteBuf 概述: 熟悉Nio的小伙伴应该对jdk底层byteBuffer不会陌生, 也就是字节缓冲区, 主要用于对网络底层io进行读写, 当channel中有数据时, 将channel中的数据读取到字节缓冲区, 当要往对方写数据的时候, 将字节缓冲区的数据写到channel中 但是jdk的byteBuffer是使用起来有诸多不便, 比如只有一个标记位置的指针position, 在进行读写操作时要频繁的通过flip()方法进行指针位置的移动, 极易出错, 并且by