玩转Bits和Bytes(一)

How Apache Flink operates on binary data

Nowadays, a lot of open-source systems for analyzing large data sets are implemented in Java or other JVM-based programming languages. The most well-known example is Apache Hadoop, but also newer frameworks such as Apache Spark, Apache Drill, and also Apache Flink run on JVMs. A common challenge that JVM-based data analysis engines face is to store large amounts of data in memory - both for caching and for efficient processing such as sorting and joining of data. Managing the JVM memory well makes the difference between a system that is hard to configure and has unpredictable reliability and performance and a system that behaves robustly with few configuration knobs.

In this blog post we discuss how Apache Flink manages memory, talk about its custom data de/serialization stack, and show how it operates on binary data.

Data Objects? Let’s put them on the heap!

The most straight-forward approach to process lots of data in a JVM is to put it as objects on the heap and operate on these objects. Caching a data set as objects would be as simple as maintaining a list containing an object for each record. An in-memory sort would simply sort the list of objects. However, this approach has a few notable drawbacks. First of all it is not trivial to watch and control heap memory usage when a lot of objects are created and invalidated constantly. Memory overallocation instantly kills the JVM with an OutOfMemoryError. Another aspect is garbage collection on multi-GB JVMs which are flooded with new objects. The overhead of garbage collection in such environments can easily reach 50% and more. Finally, Java objects come with a certain space overhead depending on the JVM and platform. For data sets with many small objects this can significantly reduce the effectively usable amount of memory. Given proficient system design and careful, use-case specific system parameter tuning, heap memory usage can be more or less controlled and OutOfMemoryErrors avoided. However, such setups are rather fragile especially if data characteristics or the execution environment change.

What is Flink doing about that?

Apache Flink has its roots at a research project which aimed to combine the best technologies of MapReduce-based systems and parallel database systems. Coming from this background, Flink has always had its own way of processing data in-memory. Instead of putting lots of objects on the heap, Flink serializes objects into a fixed number of pre-allocated memory segments. Its DBMS-style sort and join algorithms operate as much as possible on this binary data to keep the de/serialization overhead at a minimum. If more data needs to be processed than can be kept in memory, Flink’s operators partially spill data to disk. In fact, a lot of Flink’s internal implementations look more like C/C++ rather than common Java. The following figure gives a high-level overview of how Flink stores data serialized in memory segments and spills to disk if necessary.

Flink’s style of active memory management and operating on binary data has several benefits:

  1. Memory-safe execution & efficient out-of-core algorithms. Due to the fixed amount of allocated memory segments, it is trivial to monitor remaining memory resources. In case of memory shortage, processing operators can efficiently write larger batches of memory segments to disk and later them read back. Consequently, OutOfMemoryErrors are effectively prevented.
  2. Reduced garbage collection pressure. Because all long-lived data is in binary representation in Flink’s managed memory, all data objects are short-lived or even mutable and can be reused. Short-lived objects can be more efficiently garbage-collected, which significantly reduces garbage collection pressure. Right now, the pre-allocated memory segments are long-lived objects on the JVM heap, but the Flink community is actively working on allocating off-heap memory for this purpose. This effort will result in much smaller JVM heaps and facilitate even faster garbage collection cycles.
  3. Space efficient data representation. Java objects have a storage overhead which can be avoided if the data is stored in a binary representation.
  4. Efficient binary operations & cache sensitivity. Binary data can be efficiently compared and operated on given a suitable binary representation. Furthermore, the binary representations can put related values, as well as hash codes, keys, and pointers, adjacently into memory. This gives data structures with usually more cache efficient access patterns.

These properties of active memory management are very desirable in a data processing systems for large-scale data analytics but have a significant price tag attached. Active memory management and operating on binary data is not trivial to implement, i.e., using java.util.HashMap is much easier than implementing a spillable hash-table backed by byte arrays and a custom serialization stack. Of course Apache Flink is not the only JVM-based data processing system that operates on serialized binary data. Projects such as Apache Drill, Apache Ignite (incubating) or Apache Geode (incubating) apply similar techniques and it was recently announced that also Apache Spark will evolve into this direction with Project Tungsten.

In the following we discuss in detail how Flink allocates memory, de/serializes objects, and operates on binary data. We will also show some performance numbers comparing processing objects on the heap and operating on binary data.

How does Flink allocate memory?

A Flink worker, called TaskManager, is composed of several internal components such as an actor system for coordination with the Flink master, an IOManager that takes care of spilling data to disk and reading it back, and a MemoryManager that coordinates memory usage. In the context of this blog post, the MemoryManager is of most interest.

The MemoryManager takes care of allocating, accounting, and distributing MemorySegments to data processing operators such as sort and join operators. A MemorySegment is Flink’s distribution unit of memory and is backed by a regular Java byte array (size is 32 KB by default). A MemorySegment provides very efficient write and read access to its backed byte array using Java’s unsafe methods. You can think of a MemorySegment as a custom-tailored version of Java’s NIO ByteBuffer. In order to operate on multiple MemorySegments like on a larger chunk of consecutive memory, Flink uses logical views that implement Java’s java.io.DataOutput and java.io.DataInput interfaces.

MemorySegments are allocated once at TaskManager start-up time and are destroyed when the TaskManager is shut down. Hence, they are reused and not garbage-collected over the whole lifetime of a TaskManager. After all internal data structures of a TaskManager have been initialized and all core services have been started, the MemoryManager starts creating MemorySegments. By default 70% of the JVM heap that is available after service initialization is allocated by the MemoryManager. It is also possible to configure an absolute amount of managed memory. The remaining JVM heap is used for objects that are instantiated during task processing, including objects created by user-defined functions. The following figure shows the memory distribution in the TaskManager JVM after startup.

How does Flink serialize objects?

The Java ecosystem offers several libraries to convert objects into a binary representation and back. Common alternatives are standard Java serialization, Kryo, Apache Avro, Apache Thrift, or Google’s Protobuf. Flink includes its own custom serialization framework in order to control the binary representation of data. This is important because operating on binary data such as comparing or even manipulating binary data requires exact knowledge of the serialization layout. Further, configuring the serialization layout with respect to operations that are performed on binary data can yield a significant performance boost. Flink’s serialization stack also leverages the fact, that the type of the objects which are going through de/serialization are exactly known before a program is executed.

Flink programs can process data represented as arbitrary Java or Scala objects. Before a program is optimized, the data types at each processing step of the program’s data flow need to be identified. For Java programs, Flink features a reflection-based type extraction component to analyze the return types of user-defined functions. Scala programs are analyzed with help of the Scala compiler. Flink represents each data type with a TypeInformation. Flink has TypeInformations for several kinds of data types, including:

  1. BasicTypeInfo: Any (boxed) Java primitive type or java.lang.String.
  2. BasicArrayTypeInfo: Any array of a (boxed) Java primitive type or java.lang.String.
  3. WritableTypeInfo: Any implementation of Hadoop’s Writable interface.
  4. TupleTypeInfo: Any Flink tuple (Tuple1 toTuple25). Flink tuples are Java representations for fixed-length tuples with typed fields.
  5. CaseClassTypeInfo: Any Scala CaseClass (including Scala tuples).
  6. PojoTypeInfo: Any POJO (Java or Scala), i.e., an object with all fields either being public or accessible through getters and setter that follow the common naming conventions.
  7. GenericTypeInfo: Any data type that cannot be identified as another type.

Each TypeInformation provides a serializer for the data type it represents. For example, a BasicTypeInfo returns a serializer that writes the respective primitive type, the serializer of a WritableTypeInfo delegates de/serialization to the write() and readFields() methods of the object implementing Hadoop’s Writable interface, and a GenericTypeInfo returns a serializer that delegates serialization to Kryo. Object serialization to a DataOutput which is backed by Flink MemorySegments goes automatically through Java’s efficient unsafe operations. For data types that can be used as keys, i.e., compared and hashed, the TypeInformation provides TypeComparators. TypeComparators compare and hash objects and can - depending on the concrete data type - also efficiently compare binary representations and extract fixed-length binary key prefixes.

Tuple, Pojo, and CaseClass types are composite types, i.e., containers for one or more possibly nested data types. As such, their serializers and comparators are also composite and delegate the serialization and comparison of their member data types to the respective serializers and comparators. The following figure illustrates the serialization of a (nested) Tuple3

public class Person {
    public int id;
    public String name;
}

Flink’s type system can be easily extended by providing custom TypeInformations, Serializers, and Comparators to improve the performance of serializing and comparing custom data types.

How does Flink operate on binary data?

Similar to many other data processing APIs (including SQL), Flink’s APIs provide transformations to group, sort, and join data sets. These transformations operate on potentially very large data sets. Relational database systems feature very efficient algorithms for these purposes since several decades including external merge-sort, merge-join, and hybrid hash-join. Flink builds on this technology, but generalizes it to handle arbitrary objects using its custom serialization and comparison stack. In the following, we show how Flink operates with binary data by the example of Flink’s in-memory sort algorithm.

Flink assigns a memory budget to its data processing operators. Upon initialization, a sort algorithm requests its memory budget from the MemoryManager and receives a corresponding set of MemorySegments. The set of MemorySegments becomes the memory pool of a so-called sort buffer which collects the data that is be sorted. The following figure illustrates how data objects are serialized into the sort buffer.

The sort buffer is internally organized into two memory regions. The first region holds the full binary data of all objects. The second region contains pointers to the full binary object data and - depending on the key data type - fixed-length sort keys. When an object is added to the sort buffer, its binary data is appended to the first region, and a pointer (and possibly a key) is appended to the second region. The separation of actual data and pointers plus fixed-length keys is done for two purposes. It enables efficient swapping of fix-length entries (key+pointer) and also reduces the data that needs to be moved when sorting. If the sort key is a variable length data type such as a String, the fixed-length sort key must be a prefix key such as the first n characters of a String. Note, not all data types provide a fixed-length (prefix) sort key. When serializing objects into the sort buffer, both memory regions are extended with MemorySegments from the memory pool. Once the memory pool is empty and no more objects can be added, the sort buffer is completely filled and can be sorted. Flink’s sort buffer provides methods to compare and swap elements. This makes the actual sort algorithm pluggable. By default, Flink uses a Quicksort implementation which can fall back to HeapSort. The following figure shows how two objects are compared.

The sort buffer compares two elements by comparing their binary fix-length sort keys. The comparison is successful if either done on a full key (not a prefix key) or if the binary prefix keys are not equal. If the prefix keys are equal (or the sort key data type does not provide a binary prefix key), the sort buffer follows the pointers to the actual object data, deserializes both objects and compares the objects. Depending on the result of the comparison, the sort algorithm decides whether to swap the compared elements or not. The sort buffer swaps two elements by moving their fix-length keys and pointers. The actual data is not moved. Once the sort algorithm finishes, the pointers in the sort buffer are correctly ordered. The following figure shows how the sorted data is returned from the sort buffer.

The sorted data is returned by sequentially reading the pointer region of the sort buffer, skipping the sort keys and following the sorted pointers to the actual data. This data is either deserialized and returned as objects or the binary representation is copied and written to disk in case of an external merge-sort (see this blog post on joins in Flink).

    -
时间: 2024-12-04 02:46:39

玩转Bits和Bytes(一)的相关文章

Flink - Juggling with Bits and Bytes

http://www.36dsj.com/archives/33650 http://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html http://www.bigsynapse.com/addressing-big-data-performance ,addressing-big-data-performance   第一篇描述,当前JVM存在的问题, 1. Java对象开销 Java对象的存储密度相对偏低,对

Java Primitives and Bits

Integer when processors were 16 bit, an int was 2 bytes. Nowadays, it's most often 4 bytes on a 32 bits system or 8 bytes on 64 bits system. Character An ASCII character in 8-bit ASCII encoding is 8 bits (1 byte), though it can fit in 7 bits. An ISO-

笔记整理--C语言

linux下错误的捕获:errno和strerror的使用 - Google Chrome (2014/2/26 17:31:39) linux下错误的捕获:errno和strerror的使用 2011-08-09 13:44:12 经常在调用linux 系统api 的时候会出现一些错误,比方说使用open() write() creat()之类的函数有些时候会返回-1,也就是调用失败,这个时候往往需要知道失败的原因.这个时候使用errno这个全局变量就相当有用了.    在程序代码中包含 #i

程序设计基石与实践系列之失落的C语言结构体封装艺术

英文来源于 Eric S. Raymond-- The Lost Art of C Structure Packing 谁该阅读这篇文章 本文是关于削减C语言程序内存占用空间的一项技术--为了减小内存大小而手工重新封装C结构体声明.你需要C语言的基本知识来读懂本文. 如果你要为内存有限制的嵌入式系统.或者操作系统内核写代码,那么你需要懂这项技术.如果你在处理极大的应用程序数据集,以至于你的程序常常达到内存的界限时,这项技术是有帮助的.在任何你真的真的需要关注将高速缓存行未命中降到最低的应用程序里

(转)失落的C语言结构体封装艺术

目录1. 谁该阅读这篇文章 2. 我为什么写这篇文章 3.对齐要求 4.填充 5.结构体对齐及填充 6.结构体重排序 7.难以处理的标量的情况 8.可读性和缓存局部性 9.其他封装的技术 10.工具 11.证明及例外 12.版本履历 1. 谁该阅读这篇文章 本文是关于削减C语言程序内存占用空间的一项技术——为了减小内存大小而手工重新封装C结构体声明.你需要C语言的基本知识来读懂本文. 如果你要为内存有限制的嵌入式系统.或者操作系统内核写代码,那么你需要懂这项技术.如果你在处理极大的应用程序数据集

Mysql binlog 详细解读

Mysql的binlog主要用于逻辑同步以及二阶段提交的安全性保证,而在平时的使用中我们DBA也会从中获取一些重要的信息,比如说主从同步延迟了我们可以通过binlog查看当前事务执行的内容,比如可以利用binlog+备份的方式进行误删回滚,开源工具中也有很多利用binlog做闪回.同步数据到缓存中的方案,异地多活的高可用框架完成后有点空闲就对binlog的组成深入了解了下(基于mysql5.7版本,binlog版本v4),mysql 5.0之后binlog都采用的v4版本,结构如下分为heade

Wireshark抓包分析---分析数据包

Wireshark数据抓包教程之认识捕获分析数据包 认识Wireshark捕获数据包 当我们对Wireshark主窗口各部分作用了解了,学会捕获数据了,接下来就该去认识这些捕获的数据包了.Wireshark将从网络中捕获到的二进制数据按照不同的协议包结构规范,显示在Packet Details面板中.为了帮助用户能够清楚的分析数据,本节将介绍识别数据包的方法. 在Wireshark中关于数据包的叫法有三个术语,分别是帧.包.段.下面通过分析一个数据包,来介绍这三个术语.在Wireshark中捕获

One Time Auth

One Time Auth One-time authentication (shortened as OTA) is a new experimental feature designed to improve the security against CCA. You should understand the protocol before reading this document. By default, the server that supports OTA should run

二进制数据表示方式

整理一下,便于回忆 正数存储->首位表示符号位->原码存储 例如:8位int数据 int8 1->0 000 0001 负数存储->首位为符号位->原码->取反码->取补码 例如:8位int数据  int8 -1->1 000 0001->1 111 1110->1 000 0010 为何要取反码和补码? 1.方便减法运算: 如:8-3,对3先取反变为(0-3),然后直接相加变为8+(0-3),完成减法 2.计算机存储要求一一对应,如果不去取补码