Netty 零拷贝(三)Netty 对零拷贝的改进
Netty 系列目录 (https://www.cnblogs.com/binarylei/p/10117436.html)
Netty 的“零拷贝”主要体现以下几个方面:
- Netty 的接收和发送 ByteBuffer 采用 DIRECT BUFFERS,使用堆外直接内存进行 Socket 读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(HEAP BUFFERS)进行 Socket 读写,JVM 会将堆内存 Buffer 拷贝一份到直接内存中,然后才写入 Socket 中。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。
- Netty 提供了 CompositeByteBuf 类,它可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免了传统通过内存拷贝的方式将几个小 Buffer 合并成一个大的 Buffer。
- 通过 FileRegion 包装的 FileChannel.tranferTo 方法 实现文件传输,可以直接将文件缓冲区的数据发送到目标 Channel,避免了传统通过循环 write 方式导致的内存拷贝问题。
- 通过 wrap 操作,我们可以将 byte[] 数组、ByteBuf、ByteBuffer 等包装成一个 Netty ByteBuf 对象,进而避免了拷贝操作
一、直接缓冲区的应用
Netty 中使用直接缓冲区来读写数据,首先看一下 read 方法中缓冲区的创建方法。
// AbstractNioByteChannel.NioByteUnsafe#read
public final void read() {
final ChannelConfig config = config();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
ByteBuf byteBuf = allocHandle.allocate(allocator);
// 省略...
}
Netty 中接收缓冲区 ByteBuffer 由 ChannelConfig 分配,而 ChannelConfig 创建 ByteBufAllocator 默认使用 Direct Buffer,这就避免了读写数据的二次内存拷贝问题,从而实现了读写 Socket 的零拷贝功能。
// ByteBufAllocator 用于分配缓冲区
private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
// RecvByteBufAllocator 可伸缩的缓冲区,根据每次接收的数据大小自动分配缓冲区大小
private volatile RecvByteBufAllocator rcvBufAllocator;
public DefaultChannelConfig(Channel channel) {
this(channel, new AdaptiveRecvByteBufAllocator());
}
AdaptiveRecvByteBufAllocator 继承自 DefaultMaxMessagesRecvByteBufAllocator,在 allocate 分配缓冲区。
// DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle.allocate
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
}
// AbstractByteBufAllocator
public ByteBuf ioBuffer(int initialCapacity) {
if (PlatformDependent.hasUnsafe()) {
return directBuffer(initialCapacity);
}
return heapBuffer(initialCapacity);
}
PlatformDependent.hasUnsafe() 通过判断能否加载到 sun.misc.Unsafe 类就使用直接缓冲区,正常情况下返回 true
private static final boolean HAS_UNSAFE = hasUnsafe0();
public static boolean hasUnsafe() {
return HAS_UNSAFE;
}
// 默认如果能通过反射获取 sun.misc.Unsafe 实例则使用直接缓冲区,因为直接缓冲区底层就是使用 Unsafe 类
private static boolean hasUnsafe0() {
// 1. 加载 android.app.Application 类则返回 true
if (isAndroid()) {
return false;
}
// 2. -Dio.netty.noUnsafe=true
if (PlatformDependent0.isExplicitNoUnsafe()) {
return false;
}
// 3. 通过反射获取 sun.misc.Unsafe 类。Unsafe.class.getDeclaredField("theUnsafe")
try {
boolean hasUnsafe = PlatformDependent0.hasUnsafe();
logger.debug("sun.misc.Unsafe: {}", hasUnsafe ? "available" : "unavailable");
return hasUnsafe;
} catch (Throwable t) {
logger.trace("Could not determine if Unsafe is available", t);
return false;
}
}
我们再分析一直 ByteBufAllocator allocator = ByteBufAllocator.DEFAULT 中的默认 ByteBufAllocator。
ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;
// ByteBufUtil,主要判断是否将内存池化,因为直接缓冲区的分配和销毁开销比较大
static final ByteBufAllocator DEFAULT_ALLOCATOR;
static {
String allocType = SystemPropertyUtil.get(
"io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
allocType = allocType.toLowerCase(Locale.US).trim();
ByteBufAllocator alloc;
if ("unpooled".equals(allocType)) {
alloc = UnpooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else if ("pooled".equals(allocType)) {
alloc = PooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else {
alloc = PooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
}
DEFAULT_ALLOCATOR = alloc;
}
// 除了 HAS_UNSAFE 外还需要判断 io.netty.noPreferDirect 属性
public static final PooledByteBufAllocator DEFAULT =
new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
public static final UnpooledByteBufAllocator DEFAULT =
new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred());
public static boolean directBufferPreferred() {
return DIRECT_BUFFER_PREFERRED;
}
private static final boolean DIRECT_BUFFER_PREFERRED =
HAS_UNSAFE && !SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false);
二、CompositeByteBuf
//定义两个ByteBuf类型的 body 和 header
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
compositeByteBuf.addComponents(true, header, body);
addComponents 方法将 header 与 body 合并为一个逻辑上的 ByteBuf, 这两个 ByteBuf 在 CompositeByteBuf 内部都是单独存在的, CompositeByteBuf 只是逻辑上是一个整体。
三、通过 FileRegion 实现零拷贝
利用 nio 提供的 transferTo 实现零拷贝。
srcFileChannel.transferTo(position, count, destFileChannel);
四、通过 wrap / slice 实现零拷贝
// wrappedBuffer 和 slice 都是共享同一内存,并没有拷贝
ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
ByteBuf header = byteBuf.slice(0, 5);
ByteBuf body = byteBuf.slice(5, 10);
每天用心记录一点点。内容也许不重要,但习惯很重要!
原文地址:https://www.cnblogs.com/binarylei/p/10117437.html
时间: 2024-10-11 20:51:31