Kafka 协议实现中的内存优化

Kafka 协议实现中的内存优化

Kafka 协议实现中的内存优化

Jusfr 原创,转载请注明来自博客园

Request 与 Response 的响应格式

Request 与 Response 都是以 长度+内容 形式描述, 见于 A Guide To The Kafka Protocol

Request 除了 Size+ApiKey+ApiVersion+CorrelationId+ClientId 这些固定字段, 额外的 RequestMessage 包含了具体请求数据;

Request => Size ApiKey ApiVersion CorrelationId ClientId RequestMessage
  Size => int32
  ApiKey => int16
  ApiVersion => int16
  CorrelationId => int32
  ClientId => string
  RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest

Response 除了 Size+CorrelationId, 额外的 ResponseMessage 包含了具体响应数据;

Response => Size CorrelationId ResponseMessage
Size => int32
CorrelationId => int32
ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse

处理序列化与反序列化需求

使用 MemoryStream

序列化 Request 需要分配内存, 从缓冲区读取 Response 同理.

MemoryStream 是一个可靠方案, 它实现了自动扩容, 但扩容过程离不开字节拷贝, 而频繁分配不小的内存将影响性能, 近似的扩容示例代码如下:

// init
Byte[] buffer = new Byte[4096];
Int32 offset = 0;

//write bytes
Byte[] bytePrepareCopy =  // from outside
if (bytePrepareCopy > buffer.Length - offset) {
    Byte[] newBuffer = new Byte[buffer.Length * 2];
    Array.Copy(buffer, 0, newBuffer, 0, offset);
    buffer = newBuffer;
}
Array.Copy(bytePrepareCopy, 0, buffer, offset, bytePrepareCopy.Length);

数组扩容可以参见 List 的实现, 这里只是示意, 没有处理长度为 (buffer.Length*2 - offset) < bytePrepareCopy.Length 的情况

在数组长度超4k 时,扩容成本非常高。如果约定“请求和响应不得超过4k“, 那么使用可回收(见下文相关内容)的固定长度的数组模拟 MemoryStream 的读取和写入行为, 能够达到极大的性能收益。

KafkaStreamBinary (见于 github) 内部使用 MemoryStream, KafkaFixedBinary (见于 github) 则是基于数组的实现;

使用 BufferManager

使用过 Memcached 的人很容易理解 BufferManager 的思路: 为了降低频繁开辟内存带来的开销,首先“将内存块化”, 申请者获取到“成块的内存”, 被分配出去的内存块标记为“已分配”; 与 Memcached 不同的是 BufferManager 期望申请者归还使用完后的内存块,以重新分配给其他申请操作。

System.ServiceModel.Channels.BufferManager 提供了一个可靠实现, 大致使用方式如下:

const Int32 size = 4096;
BufferManager bm = BufferManager.CreateBufferManager(maxBufferPoolSize: size * 32, maxBufferSize: size);
Byte[] buffer = bm.TakeBuffer(1024);
bm.ReturnBuffer(buffer);

与手动分配内容的性能对比

const Int32 size = 4096;
BufferManager bm = BufferManager.CreateBufferManager(maxBufferPoolSize: size * 10, maxBufferSize: size);

var timer = new FunctionTimer();
timer.Push("BufferManager", () => {
    Byte[] buffer = bm.TakeBuffer(size);
    bm.ReturnBuffer(buffer);
});

timer.Push("new Byte[]", () => {
    Byte[] buffer = new Byte[size];
});

timer.Initialize();
timer.Execute(100000).Print();

测试结果:

BufferManager
    Time Elapsed : 7ms
    CPU Cycles   : 17,055,523
    Memory cost  : 3,388
    Gen 0        : 2
    Gen 1        : 2
    Gen 2        : 2
new Byte[]
    Time Elapsed : 42ms
    CPU Cycles   : 113,437,539
    Memory cost  : 24
    Gen 0        : 263
    Gen 1        : 2
    Gen 2        : 2
  • 过小的内容使用没有使用 BufferManager 的必要,但BufferManager分配超过 4k 内存时性能下降明显;
  • 最优情况是申请人获取的内存块大小一致,如果设置maxBufferSize = 4k,但 TakeBuffer(Int32 bufferSize) 方法使用的参数大于 4k,测试表明性能还不如手动创建 Byte 数组;
  • mono 的实现存在线程安全的问题;

强制要求业务使用的请求不超过4k 貌似做得到,但需求更大内存的场景总是存在,比如合并消息、批量消费等,Chuye.Kafka 作为类库需要提供支持。

KafkaScalableBinary = BufferManager + Byte[][]

KafkaScalableBinary 并没有发明新东西, 在其内部维护了一个 Dictionary<int32, byte[]=""> 保存一系列 Byte数组;

初始化时并未真正分配内存, 除非开始写入;

public KafkaScalableBinary()
    : this(4096) {
}

public KafkaScalableBinary(Int32 size) {
    if (size <= 0) {
        throw new ArgumentOutOfRangeException("size");
    }
    _lengthPerArray = size;
    _buffers = new Dictionary<Int32, Byte[]>(16);
}

写入时先根据当前位置对数组长度取模 _position / _lengthPerArray 找到待写入数组,不存在则分配新数组;

private Byte[] GetBufferForWrite() {
    var index = (Int32)(_position / _lengthPerArray);
    Byte[] buffer;
    if (!_buffers.TryGetValue(index, out buffer)) {
        if (_lengthPerArray >= 128) {
            buffer = ServiceProvider.BufferManager.TakeBuffer(_lengthPerArray);
        }
        else {
            buffer = new Byte[_lengthPerArray];
        }
        _buffers.Add(index, buffer);
    }
    return buffer;
}

然后根据当前位置对数组长度取整 _position % _lengthPerArray 找到目标位置;由于待写入长度可能超过可使用长度,这里使用了 while 循环,一边获取和分配待写入数组, 一边将剩余字节写入其中,直至完成;

public override void WriteByte(Byte[] buffer, int offset, int count) {
    if (buffer == null) {
        throw new ArgumentNullException("buffer");
    }
    if (buffer.Length == 0) {
        return;
    }
    if (buffer.Length < count) {
        throw new ArgumentOutOfRangeException();
    }

    checked {
        var left = count;                                               //标记剩余量
        while (left > 0) {
            var targetBuffer = GetBufferForWrite();                     //查找目标数组
            var targetOffset = (Int32)(_position % _lengthPerArray);    //查找目标位置
            if (targetOffset == _lengthPerArray - 1) {                  //如果位置已经位于数组末尾, 说明位于起始位置;
                targetOffset = 0;
            }

            var prepareCopy = left;                                     //准备写入剩余量
            if (prepareCopy > _lengthPerArray - targetOffset) {         //但数组的剩余长度可能不够,写入较小长度
                prepareCopy = _lengthPerArray - targetOffset;
            }
            Array.Copy(buffer, count - left, targetBuffer, targetOffset, prepareCopy);  //拷贝字节
            _position += prepareCopy;                                   //推进位置
            left -= prepareCopy;                                        //减小剩余量
            if (_position > _length) {                                  //增大总长度
                _length = _position;
            }
        }
    }
}
        

读取过程类似,循环查找待读取数组和拷贝字节直到完成,不同的是分配内存的逻辑以一条异常替代;

public override Int32 ReadBytes(Byte[] buffer, int offset, int count) {
    if (buffer == null) {
        throw new ArgumentNullException("buffer");
    }
    if (buffer.Length == 0) {
        return 0;
    }
    if (buffer.Length < count) {
        throw new ArgumentOutOfRangeException();
    }
    checked {
        var prepareRead = (Int32)(Math.Min(count, _length - _position));    //计算待读取长度
        var left = prepareRead;                                             //标记剩余量
        while (left > 0) {
            var targetBuffer = GetBufferForRead();                          //查找目标数组
            var targetOffset = (Int32)(_position % _lengthPerArray);        //查找目标位置
            var prepareCopy = left;                                         //准备读取剩余量
            if (prepareCopy > _lengthPerArray - targetOffset) {
                prepareCopy = _lengthPerArray - targetOffset;
            }
            Array.Copy(targetBuffer, targetOffset, buffer, prepareRead - left, prepareCopy);  //但数组的剩余长度可能不够,读取较小长度
            _position += prepareCopy;                                       //推进位置
            left -= prepareCopy;                                            //减小剩余量
        }
        return prepareRead;
    }
}

private Byte[] GetBufferForRead() {
    var index = (Int32)(_position / _lengthPerArray);
    Byte[] buffer;
    if (!_buffers.TryGetValue(index, out buffer)) {
        throw new IndexOutOfRangeException();
    }
    return buffer;
}

释放时释放内部维护的的全部字节;

public override void Dispose() {
    foreach (var item in _buffers) {
        if (_lengthPerArray >= 128) {
            ServiceProvider.BufferManager.ReturnBuffer(item.Value);
        }
    }
    _buffers.Clear();
}

写入缓冲区是对内部维护数组列表的直接操作,高度优化

public override void CopyTo(Stream destination) {
    foreach (var item in GetBufferAndSize()) {
        destination.Write(item.Key, 0, item.Value);
    }
}

读取缓冲区时和写入行为类似

public override void ReadFrom(Stream source, int count) {
    var left = count;
    var loop = 0;
    do {
        var targetBuffer = GetBufferForWrite();
        var targetOffset = (Int32)(_position % _lengthPerArray);
        var prepareCopy = left;
        if (prepareCopy > _lengthPerArray - targetOffset) {
            prepareCopy = _lengthPerArray - targetOffset;
        }

        var readed = source.Read(targetBuffer, targetOffset, prepareCopy);
        _position += readed;
        left -= readed;
        if (_position > _length) {
            _length = _position;
        }
        loop++;
    } while (left > 0);
}

实际上可以从 MemoryStream 定义出 ScalableMemoryStream 再改写其行为,KafkaScalableBinary 依赖于 MemoryStream 而不是具体实现,整体就更加"设计模式"了 , 基本逻辑前文已陈述。

测试过程中发现,一来 **mono 的 BufferManager 实现存在线程安全问题*,故 Chuye.Kafka 提供了一个 ObjectPool 模式的 BufferManager 作为替代方案; 二是 KafkaScalableBinary 与 ScalableStreamBinary 的性能对比测试结果非常不稳定,但前者频繁的取横取整及字典开销必然是拖累,我会继续追踪和优化。

KafkaScalableBinary (见于 github), 序列化部分设计示意:


Jusfr 原创,转载请注明来自博客园

时间: 2024-08-08 21:21:00

Kafka 协议实现中的内存优化的相关文章

Kafka 协议实现中的内存优化【转】

Kafka 协议实现中的内存优化 Jusfr 原创,转载请注明来自博客园 Request 与 Response 的响应格式 Request 与 Response 都是以 长度+内容 形式描述, 见于 A Guide To The Kafka Protocol Request 除了 Size+ApiKey+ApiVersion+CorrelationId+ClientId 这些固定字段, 额外的 RequestMessage 包含了具体请求数据: Request => Size ApiKey Ap

android中的内存优化

内存泄露可以引发很多的问题: 1.程序卡顿,响应速度慢(内存占用高时JVM虚拟机会频繁触发GC) 2.莫名消失(当你的程序所占内存越大,它在后台的时候就越可能被干掉.反之内存占用越小,在后台存在的时间就越长) 3.直接崩溃(OutOfMemoryError) ANDROID内存面临的问题: 1.有限的堆内存,原始只有16M 2.内存大小消耗等根据设备,操作系统等级,屏幕尺寸的不同而不同 3.程序不能直接控制 4.支持后台多任务处理(multitasking) 5.运行在虚拟机之上 我主要通过以下

Android中内存优化

CSDN博客不写,排名会下降,我知道了...... Android内存优化,设计到很多方面,参考别大神的博客,自己也总结一下..... 下面将通过两篇博客,浅析Android 中的内存优化问题.来张图抖索一下精神.... 本片博客将一下内存优化,主要参考工作经验和借鉴大牛的一些博客...... 一.什么是内存? 简单理解,Android内存包括运行内存RAM.和磁盘缓存ROM. 而内存优化,主要值运行内存的优化. RAM(random access memory): 寄存器(Registers)

探讨深入Java虚拟机之内存优化

上一篇我们讲述了Java虚拟机的体系结构和内存模型,那么我们就不得不说到内存泄露.大家都知道,Java是从C++的基础上发展而来的,而C++程序的很大的一个问题就是内存泄露难以解决,尽管Java的JVM有一套自己的垃圾回收机制来回收内存,在大多数的情况下并不需要java程序开发人员操太多的心,但也是存在泄露问题的,只是比C++小一点.比如说,程序中存在被引用但无用的对象:程序引用了该对象,但后续不会或者不能再使用它,那么它占用的内存空间就浪费了. 我们先来看看GC是如何工作的:监控每一个对象的运

SQLServer 2014 内存优化表

内存优化表是 SQLServer 2014 的新功能,它是可以将表放在内存中,这会明显提升DML性能.关于内存优化表,更多可参考两位大侠的文章:SQL Server 2014新特性探秘(1)-内存数据库 试试SQLSERVER2014的内存优化表 创建内存优化表也很简单,以下测试: 添加内存优化数据库文件组:[sql] view plain copy 在CODE上查看代码片派生到我的代码片USE [master] GO -- 在当前数据库中添加内存优化数据库文件组(每个数据库仅1个文件组) AL

SQL Server 2014 内存优化表

不同于disk-based table,内存优化表驻留在内存中,使用 Hekaton 内存数据库引擎实现.在查询时,从内存中读取数据行:在更新时,将数据的更新直接写入到内存中.内存优化表能够在disk上维护一个副本,用于持久化数据集. Memory-optimized tables reside in memory. Rows in the table are read from and written to memory. The entire table resides in memory.

Redis系列--内存淘汰机制(含单机版内存优化建议)

https://blog.csdn.net/Jack__Frost/article/details/72478400?locationNum=13&fps=1 每台redis的服务器的内存都是有限的,而且也不是所有的内存都用来存储信息.而且redis的实现并没有在内存这块做太多的优化,所以实现者为了防止内存过于饱和,采取了一些措施来管控内存. 文章结构:(1)内存策略:(2)内存释放机制原理:(3)项目中如何合理应用淘汰策略:(4)单机版Redis内存优化注意点. 一.内存策略:先来吃份官方文档

android内存优化大全_中

转载请注明本文出自大苞米的博客(http://blog.csdn.net/a396901990),谢谢支持! 写在最前: 本文的思路主要借鉴了2014年AnDevCon开发者大会的一个演讲PPT,加上把网上搜集的各种内存零散知识点进行汇总.挑选.简化后整理而成. 所以我将本文定义为一个工具类的文章,如果你在ANDROID开发中遇到关于内存问题,或者马上要参加面试,或者就是单纯的学习或复习一下内存相关知识,都欢迎阅读.(本文最后我会尽量列出所参考的文章). OOM: 内存泄露可以引发很多的问题:

ANDROID内存优化(大汇总——中)

本文的思路主要借鉴了2014年AnDevCon开发者大会的一个演讲PPT,加上把网上搜集的各种内存零散知识点进行汇总.挑选.简化后整理而成. 所以我将本文定义为一个工具类的文章,如果你在ANDROID开发中遇到关于内存问题,或者马上要参加面试,或者就是单纯的学习或复习一下内存相关知识,都欢迎阅读.(本文最后我会尽量列出所参考的文章). OOM: 内存泄露可以引发很多的问题: 1.程序卡顿,响应速度慢(内存占用高时JVM虚拟机会频繁触发GC) 2.莫名消失(当你的程序所占内存越大,它在后台的时候就