接收数据:自适应缓冲区和连接读是为了解决什么问题

接收数据:自适应缓冲区和连接读是为了解决什么问题

目录

  • 接收数据:自适应缓冲区和连接读是为了解决什么问题

    • 1. 主线分析

      • 1.1 读数据技巧
      • 1.2 主线
      • 1.2 知识点
    • 2. 源码分析
      • 2.1 接收数据
      • 2.2 AdaptiveRecvByteBufAllocator

Netty 系列目录(https://www.cnblogs.com/binarylei/p/10117436.html)

到目前为止,我们已经启动服务,并接收了客户端连接,双方已经可以正式通信。下面就要处理请求:接收数据、业务处理、发送数据。

1. 主线分析

1.1 读数据技巧

接收数据我们会碰到以下问题:

  1. 缓冲区大小怎么分配。分配大小浪费空间,分配小了又需要频繁扩容。怎么样才能做到自适应分配缓冲区大小?
  2. 如何处理高并发。如果单个连接读取时间过长,那么请求的并发量会大大降低。我们需要限制单个连接处理的时间。事实上,如果要处理高并发,一个关键的因素就是:每个请求的处理时间都很短。

我们再看一下,Netty 是如何解决这两个问题的。这部分才是本小节内容的核心。当然发送数据也有同样的问题,写的数据太多怎么办,可以将接收数据和发送数据两小节的内部对比起来学习。

  1. 自适应数据大小的分配器(AdaptiveRecvByteBufAllocator)

    根据最近几次请求的数据包大小,猜测下一次数据包大小。AdaptiveRecvByteBufAllocator 对 ByteBuf 的猜测:放大果断,缩小谨慎(需要连续2 次判断)

  2. 连续读(defaultMaxMessagesPerRead)

    默认每个连接最多连接读取 16 次数据,即使还有数据也暂时不处理了,先处理下一个连接。

1.2 主线

NioEventLoop 不断的轮询,接收 OP_READ 事件;然后将读取到的数据通过 pipeline.fireChannelRead(byteBuf) 传播出去。

  1. 多路复用器( Selector )接收到 OP_READ 事件
  2. 处理 OP_READ 事件:NioSocketChannel.NioSocketChannelUnsafe.read()
    • 分配一个初始 1024 字节的 byte buffer 来接受数据
    • 从 Channel 接受数据到 byte buffer
    • 记录实际接受数据大小,调整下次分配 byte buffer 大小
    • 触发 pipeline.fireChannelRead(byteBuf) 把读取到的数据传播出去
    • 判断接受 byte buffer 是否满载而归:是,尝试继续读取直到没有数据或满16 次;否,结束本轮读取,等待下次 OP_READ 事件
NioEventLoop#run
    -> processSelectedKeys
        -> AbstractNioMessageChannel.NioMessageUnsafe#read
            -> NioServerSocketChannel#doReadMessages
            -> pipeline#fireChannelRead

1.2 知识点

(1)读取数据本质

  • sun.nio.ch.SocketChannelImpl#read(java.nio.ByteBuffer)

(2)fireChannelReadComplete 和 fireChannelRead 关系

  • pipeline.fireChannelReadComplete(): 一次读事件触发一次事件。
  • pipeline.fireChannelRead(byteBuf):每解析一条记录触发一次事件。

一次数据取的数据可能有多条记录,每条记录都会触发一次 fireChannelRead 事件,但一次读只会触发一次 fireChannelReadComplete 事件。

(3)缓冲区大小自适应

AdaptiveRecvByteBufAllocator 对 byteBuf 的猜测:放大果断,缩小谨慎(需要连续2 次判断)

(4)高并发处理

默认最多只能读取16 次。“雨露均沾”

2. 源码分析

在上一小节中,我们知道 Netty 对 OP_READ 和 OP_ACCEPT 事件是统一处理的。不同的是接收客户端连接使用 NioMessageUnsafe#read,而读取数据使用 NioByteUnsafe#read。

2.1 接收数据

我们就重点分析 NioByteUnsafe#read 这个方法。Netty 每次读取数据都要分以下几步:

  • 分配缓冲区:默认 1024 byte,之后根据最近几次请求的数据包大小,猜测下一次数据包大小。
  • 读取数据:没什么可说的,直接调用 Java nio 的底层代码。
  • 触发 pipeline.fireChannelRead(byteBuf):业务处理。
  • 判断是否继续读:有两个标准,一是不能超过最大的读取次数(默认 16 次);二是缓冲区的数据每次都要读满,比如分配 2 KB ByteBuf,则必须读取 2 KB 的数据。
@Override
public final void read() {
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    // 每次读取数据时,都重新开始计数
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            // 1. 分配缓冲区,大小自适应
            byteBuf = allocHandle.allocate(allocator);
            // 2. 从 socket revbuf 中接收数据
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            if (allocHandle.lastBytesRead() <= 0) {
                byteBuf.release();
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;
                if (close) {
                    readPending = false;
                }
                break;
            }

            allocHandle.incMessagesRead(1);
            readPending = false;
            // 3. 触发事件处理
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
            // 4. 判断是否继续读
        } while (allocHandle.continueReading());

        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();

        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}

说明: 可以看到,接收数据时缓冲区自适应大小分配和是否继续读这两个重要的功能都委托给了 allocHandle。Netty 中默认的 allocHandle 是 AdaptiveRecvByteBufAllocator。

doReadBytes 方法从 socket revbuf 读取数据,但每次读取前都需要缓冲区中可写区域的大小,用于判断缓冲区是否读满,继而决定是否继续读取数据。

// NioSocketChannel
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    // 每次读取数据前,记录缓冲区中可写区域大小,判断是否将缓冲区读满
    allocHandle.attemptedBytesRead(byteBuf.writableBytes());
    return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}

2.2 AdaptiveRecvByteBufAllocator

在分析代码前,我们先比较一下 ByteBufAllocator 和 RecvByteBufAllocator 的区别:

  • ByteBufAllocator:用于分配缓冲区,可以分为池化和非池化,以及直接缓冲区和非直接缓冲区两种。默认是 PooledDirectByteBuf。
  • AdaptiveRecvByteBufAllocator:自各应缓冲区,用于决定缓冲区分配大小,以及是否继续读。

AdaptiveRecvByteBufAllocator 只是负责创建 Handle,真正的功能都委托给了 Handle 处理。相关默认配置见 DefaultChannelConfig。

(1)缓冲区分配

@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
    return alloc.ioBuffer(guess());
}

说明: 可以看到,缓冲区的分配直接委托给了 ByteBufAllocator。AdaptiveRecvByteBufAllocator 只是通过 guess() 方法决定分配缓冲区的大小。

(2)更新自适应缓冲区大小

guess() 方法直接返回 nextReceiveBufferSize 变量大小,默认为 1024 byte。每次最小读 64 byte,最大 64 KB。

static final int DEFAULT_MINIMUM = 64;
static final int DEFAULT_INITIAL = 1024;
static final int DEFAULT_MAXIMUM = 65536;

每次调用 allocHandle.lastBytesRead(doReadBytes(byteBuf)) 读取数据后,都会根据上一次的读取的数据包大小决定扩缩容缓冲区大小。

@Override
public void lastBytesRead(int bytes) {
    // attemptedBytesRead为读取前可写缓冲区大小,bytes表示当前读取的数据包大小。
    // 如果二者相等,说明 socket revbuf 中还有数据可读,判断是否扩缩容
    if (bytes == attemptedBytesRead()) {
        // 核心方法:判断是否扩容或缩容
        record(bytes);
    }
    super.lastBytesRead(bytes);
}

(3)自适应缓冲区策略

record 是最核心的方法,计算 AdaptiveRecvByteBufAllocator 缓冲区扩容或缩容的策略。

在分析 record 方法前,我们先看一下缓冲区大小是怎么分配的。AdaptiveRecvByteBufAllocator 将缓冲区按 512 byte 分隔,小于 512 byte 时按 16 byte 扩容或缩容,大于 512 byte 时按两倍大小进行扩容或缩容。也就是 [16, 32, 48, ..., 512, 1024, 2048, .., Integer.MAX_VALUE],这就是 SIZE_TABLE,每次分配的缓冲区大小必定是上述数组中的一个值。

private void record(int actualReadBytes) {
    // 缩容
    if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) {
        if (decreaseNow) {
            index = max(index - INDEX_DECREMENT, minIndex);
            nextReceiveBufferSize = SIZE_TABLE[index];
            decreaseNow = false;
        } else {
            decreaseNow = true;
        }
    // 扩容
    } else if (actualReadBytes >= nextReceiveBufferSize) {
        index = min(index + INDEX_INCREMENT, maxIndex);
        nextReceiveBufferSize = SIZE_TABLE[index];
        decreaseNow = false;
    }
}

说明: record 的扩容或缩容时,都会重新调整 nextReceiveBufferSize 值。

自适应的整体策略是:放大果断,缩小谨慎。即缩容的条件需要连续 2 次,而扩容只需要读取 1 次即可。但要注意的是,INDEX_INCREMENT = 4,而 INDEX_DECREMENT = 1,如 512 KB,如果 512 KB * 24 则满足扩容条件,而 512 / 21 则满足缩容条件。

(4)继续读

private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = ()->
    attemptedBytesRead == lastBytesRead;

@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
    return config.isAutoRead() &&
        (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
        totalMessages < maxMessagePerRead &&
        totalBytesRead > 0;
}

说明: continueReading 的参数默认是 defaultMaybeMoreSupplier。如果继续读需要满足以下条件:

  1. autoRead = ture:默认为 true(DefaultChannelConfig)。
  2. maybeMoreDataSupplier:判断上一次读是否将写缓冲写满。如果写满则说明可能还有数据,可以继续读。
  3. maxMessagePerRead:表示每次最多读取的次数,默认为 16。每读取一次数据 totalMessages 就会自增,当超过16 次后,就停止读。避免某个连接数据量非常大,长时间点用资源。
  4. totalBytesRead:总共读取的字节数。


每天用心记录一点点。内容也许不重要,但习惯很重要!

原文地址:https://www.cnblogs.com/binarylei/p/12640521.html

时间: 2024-10-08 18:39:20

接收数据:自适应缓冲区和连接读是为了解决什么问题的相关文章

蓝牙连接后,启动子线程接收数据,主线程卡住了

============问题描述============ 我看了下google的bluetoothchat的demo,它那里把蓝牙建立客户端连接的部分也放在子线程里执行的. 我目前的程序,蓝牙建立连接的步骤是在UI线程里,但是bluetoothSocket接收数据的部分是在子线程中的,但是现在碰到一个问题,开启子线程后,主线程不接着往下执行了. 根据我的日志记录,connectedThread.run();之后的日志方法LogHelper.Write("已经运行过子线程");就一直卡着

socket 错误之:OSError: [WinError 10057] 由于套接字没有连接并且(当使用一个 sendto 调用发送数据报套接字时)没有提供地址,发送或接收数据的请求没有被接受。

出错的代码 #server端 import socket import struct sk=socket.socket() sk.bind(('127.0.0.1',8080)) sk.listen() conn,addr=sk.accept() str_len1=struct.unpack('i',conn.recv(4))[0] print(sk.recv(str_len1)) str_len2=struct.unpack('i',conn.recv(4))[0] print(sk.recv

2018最新mfc作为上位机接收硬件端USB或串口数据显示成图片 解决串口接收数据丢字节丢包问题

本文用的是VS2013MFC写串口数据接收: 第一步:首先建立一个MFC工程,成功后会跳出一个对话框,直接在对话框上点击右键->点击插入ACTIVAE控件->选择MicrosoftCommunications Control, version 6.0 成功后会显示一个电话的图标在对话框上,运行起来不会显示的 不用担心这个美观问题.如果没有这个插件的话,可能是版本太低  可以自己下载一个补上 第二步:大概的窗体搞好:   那个显示图片的大框是PICTURE控件变量 然后就要项目->类向导中

发送和接收数据包

发送和接收数据包 原文:Game Networking系列,作者是Glenn Fiedler,专注于游戏网络编程相关工作多年. 概述 在之前的网游中的网络编程系列1:UDP vs. TCP中(推荐先看前面那篇),我们经过讨论得出:网游中传输数据应该使用UDP而不是TCP.我们选择UDP是为了不需要等待重发数据包,从而达到数据的实时性. 注意,因为接下来英文原文中所有的代码是C++写的,而我是个pythoner,我的计划是:通过理解文章,我用python实现UDP收发数据包.虚拟连接(原文后两章的

Android Socket 发送与接收数据问题: 发送后的数据接收到总是粘包

先说明一下粘包的概念: 发送时是两个单独的包.两次发送,但接收时两个包连在一起被一次接收到.在以前 WinCE 下 Socket 编程,确实也要处理粘包的问题,没想到在 Android 下也遇到了.首先想从发送端能否避免这样的问题,例如: (1) 调用强制刷数据完成发送的函数:(2) 设置发送超时.1 先试了调用 flush() 函数,但运行后现象依旧2 设置发送超时是 Windows 平台的做法,但在 Android 平台下是否有类似的设置呢?查看 Socket 类的实现代码:java.net

Qt5自带串口调试 --使用signal接收数据,自动侦测端口列表

Qt5自带串口初步用一下感觉还不错. 调试记录 .pro文件增加 QT += serialport .h文件增加 #include <QtSerialPort/QSerialPort> #include <QtSerialPort/QSerialPortInfo> private slots: void my_readuart();//串口接收数据槽函数 private: QSerialPort *my_serialport; .cpp文件 构造函数内     foreach (c

关于原子哥ENC28J60网络通信模块接收数据代码的一点失误

这几天做STM32的ENC28J60网络通信模块,自己在原子哥的代码上进行修改测试,,发现一个问题,电脑和板子进行通信的时候总隔一段时间板子就死机了. 直接源码 struct netbuf *recvbuf;//接收buf struct pbuf *q; err_t recv_err;//接收数据返回信息 u32 data_len = 0; //客户端接收数组的长度 u8 tcp_server_recvbuf[TCP_SERVER_RX_BUFSIZE]; //TCP客户端接收数据缓冲区 mem

android蓝牙(二)——接收数据

在蓝牙开发中,我们有这样的一个需求:我们的android客户端要始终保持和蓝牙的连接,当蓝牙有数据返回的时候,android客户端就要及时的收取数据,当蓝牙没有数据返回的时候我们就要保持android客户端和蓝牙之间的连接.这个时候我们就要采取socket来实现和蓝牙之间的连接.做项目使用过http轮询去获取数据,但是发现那样总是有一定的弊端.于是就才用了socket方式去获取数据. 实现步骤:1.启动一个service去监听是否有数据返回.一旦有数据返回就启动一个线程去处理数据 2.处理完数据

Java基础知识强化之网络编程笔记06:TCP之TCP协议发送数据 和 接收数据

1. TCP协议发送数据 和 接收数据 TCP协议接收数据:• 创建接收端的Socket对象• 监听客户端连接.返回一个对应的Socket对象• 获取输入流,读取数据显示在控制台• 释放资源 TCP协议发送数据: • 创建发送端的Socket对象• 这一步如果成功,就说明连接已经建立成功了.• 获取输出流,写数据• 释放资源 2. 代码实现: (1)发送端: 1 package cn.itcast_06; 2 3 import java.io.IOException; 4 import java