HFileBlock官方源码注释:
Reading HFile
version 1 and 2 blocks, and writing version 2 blocks.
- In version 1 all blocks are always compressed or uncompressed, as specified by the
HFile
‘s compression algorithm, with a type-specific magic record stored in the beginning of the compressed data (i.e. one needs to uncompress the compressed block to determine the block type). There is only a single compression algorithm setting for all blocks. Offset and size information from the block index are required to read a block. - In version 2 a block is structured as follows:
- Magic record identifying the block type (8 bytes)
- Compressed block size, header not included (4 bytes)
- Uncompressed block size, header not included (4 bytes)
- The offset of the previous block of the same type (8 bytes). This is used to be able to navigate to the previous block without going to the block
- For minorVersions >=1, there is an additional 4 byte field bytesPerChecksum that records the number of bytes in a checksum chunk.
- For minorVersions >=1, there is a 4 byte value to store the size of data on disk (excluding the checksums)
- For minorVersions >=1, a series of 4 byte checksums, one each for the number of bytes specified by bytesPerChecksum. index.
- Compressed data (or uncompressed data if compression is disabled). The compression algorithm is the same for all the blocks in the
HFile
, similarly to what was done in version 1.
The version 2 block representation in the block cache is the same as above, except that the data section is always uncompressed in the cache.
从上述文档可以看出,随着HBase版本的迭代,出现了两种格式的HFile(version1、version2),对应着两个格式的HFileBlock(version1、version2),这里仅考虑version2的情况,且minorVersions默认值为1,由此可得,HFileBlock version2格式如下图:
magic record (8 bytes) |
compressed block size (header not included, 8 bytes) |
uncompressed block size (header not included, 4 bytes) |
the offset of the previous block of the same type (8 bytes) |
bytesPerChecksum (the number of bytes in a checksum chunk, 4 bytes) |
data size on disk (excluding the checksums) |
a series of 4 byte checksums (one each for the number of bytes specified by bytesPerChecksum) |
compressed data (uncompressed data if compression is disabled) |
HFileBlock.Writer
Unified version 2 HFile
block writer. The intended usage pattern is as follows:
- Construct an
HFileBlock.Writer
, providing a compression algorithm - Call
Writer.startWriting(BlockType, boolean)
and get a data stream to write to - Write your data into the stream
- Call
Writer.writeHeaderAndData(FSDataOutputStream)
as many times as you need to store the serialized block into an external stream, or callWriter.getHeaderAndData()
to get it as a byte array. - Repeat to write more blocks
从上述文档可以看出,官方建议HFileBlock.Writer的使用方式如下:
(1)构建HFileBlock.Writer实例,需要提供使用什么压缩算法、使用什么数据编码格式、是否包含MemstoreTS、HBase小版本号、校验和类型以及多少字节数据生成一个校验和;
(2)通过startWriting方法获取到一个可以用以写出数据的输出流;
(3)根据需要循环调用writeHeaderAndData方法将块数据写出至输出流,然后可以通过方法getHeaderAndData得到一个包含已写出块数据的字节数组;
(4)循环(2)、(3)写出更多的块数据。
类图
核心变量
private State state = State.INIT;
Writer state. Used to ensure the correct usage protocol.
用以标识当前Writer状态,在其它方法调用期间确保Writer状态的正确性(即某些方法调用时Writer应处于特定状态),Writer状态有以下三种类型:
- INIT
- WRITING
- BLOCK_READY
private final Compression.Algorithm compressAlgo;
Compression algorithm for all blocks this instance writes.
用以标识当前Writer使用的压缩算法,共有以下几种类型:
- LZO
- GZ
- NONE
- SNAPPY
- LZ4
private final HFileDataBlockEncoder dataBlockEncoder;
Data block encoder used for data blocks.
数据块使用的编码器。
private ByteArrayOutputStream baosInMemory;
The stream we use to accumulate data in uncompressed format for each block. We reset this stream at the end of each block and reuse it. The header is written as the first HFileBlock.headerSize(int)
bytes into this stream.
该输出流用以积聚每个块的非压缩格式数据(每个块的数据由多个KeyValue组成),当一个块数据写出完毕后,通过方法reset重置此输出流,以使多个块之间可以重用该输出流。
private Compressor compressor;
private CompressionOutputStream compressionStream;
private ByteArrayOutputStream compressedByteStream;
如果使用压缩,将会涉及到上面三个变量:
compressor:Compressor, which is also reused between consecutive blocks.
compressionStream:Compression output stream.
compressedByteStream:Underlying stream to write compressed bytes to.
其中,compressionStream通过封装compressor、compressedByteStream而成。
private BlockType blockType;
Current block type. Set in startWriting(BlockType). Could be changed in encodeDataBlockForDisk() from BlockType.DATA to BlockType.ENCODED_DATA.
用以标识当前数据块的类型。
private DataOutputStream userDataStream;
A stream that we write uncompressed bytes to, which compresses them and writes them to baosInMemory.
userDataStream通过封装baosInMemory而成。
private byte[] onDiskBytesWithHeader;
Bytes to be written to the file system, including the header. Compressed if compression is turned on. It also includes the checksum data that immediately follows the block data. (header + data + checksums)
写入文件系统的最终字节数据,可能是压缩后的字节数据,同时包含头和检验和数据。
private int onDiskDataSizeWithHeader;
The size of the data on disk that does not include the checksums. (header + data)
写入文件系统的最终字节数据大小,不包含校验和数据。
private byte[] onDiskChecksum;
The size of the checksum data on disk. It is used only if data is not compressed. If data is compressed, then the checksums are already part of onDiskBytesWithHeader. If data is uncompressed, then this variable stores the checksum data for this block.
校验和数据,它仅仅被使用在数据没有被压缩的场景下。如果数据被压缩,则校验和数据包含在onDiskBytesWithHeader中;如果数据没有被压缩,onDiskChecksum存储着当前数据块的校验和数据。
private byte[] uncompressedBytesWithHeader;
Valid in the READY state. Contains the header and the uncompressed (but potentially encoded, if this is a data block) bytes, so the length is uncompressedSizeWithoutHeader + HFileBlock.headerSize(int). Does not store checksums.
包含头和未压缩的数据(数据可能已经被编码),不包含检验和数据。
private long startOffset;
Current block‘s start offset in the HFile. Set in writeHeaderAndData(FSDataOutputStream).
当前数据块在HFile中的起始偏移量。
private long[] prevOffsetByType;
Offset of previous block by block type. Updated when the next block is started.
private long prevOffset;
The offset of the previous block of the same type.
用以记录和当前块类型相同的前一个数据块在HFile中的起始偏移量,其中prevOffsetByType中记录着所有通过Writer写出的数据块类型的起始偏移量,并在开始下一个数据块时更新值。
private boolean includesMemstoreTS;
Whether we are including memstore timestamp after every key/value.
用以标识是否在每个KeyValue键值数据后加入MemstoreTS。
private ChecksumType checksumType;
校验和类型,包含以下几种:
- NULL
- CRC32
- CRC32C
private int bytesPerChecksum;
用以标识多少字节数据需要形成一个校验和。
private final int minorVersion;
用以标识HBase小版本号。
HFileBlock.Writer实例创建(构造函数)过程
this.minorVersion = minorVersion; // 默认值为HFileReaderV2.MAX_MINOR_VERSION(1)
compressAlgo = compressionAlgorithm == null ? NONE : compressionAlgorithm; // 默认值为Compression.Algorithm.NONE
this.dataBlockEncoder = dataBlockEncoder != null ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE; // 默认值为NoOpDataBlockEncoder.INSTANCE,该实例实际上不对数据进行任务加工。
baosInMemory = new ByteArrayOutputStream();
if (compressAlgo != NONE) {
compressor = compressionAlgorithm.getCompressor();
compressedByteStream = new ByteArrayOutputStream();
try {
compressionStream = compressionAlgorithm.createPlainCompressionStream(compressedByteStream, compressor);
} catch (IOException e) {
throw new RuntimeException("Could not create compression stream " + "for algorithm " + compressionAlgorithm, e);
}
}
创建字节输出流,如果使用压缩,则需要创建相应的压缩输出流,无论压缩与否,底层均包装ByteArrayOutputStream,由此可以看出,HFileBlock.Writer并没有将数据直接写入文件系统,而且仅仅缓存在了内存中的一个字节数组中,外部程序拿到这个字节数组(会经过若干步处理,如编码、压缩)后再将数据写入文件系统。
if (minorVersion > MINOR_VERSION_NO_CHECKSUM && bytesPerChecksum < HEADER_SIZE_WITH_CHECKSUMS) {
throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
" Minimum is " + HEADER_SIZE_WITH_CHECKSUMS + " but the configured value is " +
bytesPerChecksum);
}
校验bytesPerChecksum。
prevOffsetByType = new long[BlockType.values().length];
for (int i = 0; i < prevOffsetByType.length; ++i)
prevOffsetByType[i] = -1;
初始化数组prevOffsetByType,数组长度为块类型的个数。
this.includesMemstoreTS = includesMemstoreTS; // 默认值为true
this.checksumType = checksumType; // 默认值为ChecksumType.CRC32
this.bytesPerChecksum = bytesPerChecksum; // 默认值为16 * 1024
典型处理流程及源码分析
1. startWriting
方法签名:public DataOutputStream startWriting(BlockType newBlockType) throws IOException
方法描述:Starts writing into the block. The previous block‘s data is discarded.准备写入块数据,前一个块的数据将被丢弃。
触发条件:HFileWriterV2.newBlock()
执行流程:
if (state == State.BLOCK_READY && startOffset != -1) {
// We had a previous block that was written to a stream at a specific
// offset. Save that offset as the last offset of a block of that type.
prevOffsetByType[blockType.getId()] = startOffset;
}
根据数据块类型,记录前一个数据块在HFile中的起始偏移量,仅当前一个数据块写入完成(state为BLOCK_READY)才会执行此操作。
startOffset = -1;
blockType = newBlockType;
重置起始偏移量及数据块类型。
baosInMemory.reset();
baosInMemory.write(getDummyHeaderForVersion(this.minorVersion));
重置字节数组输出流,并写入“假”的头部信息,即一个空的字节数组,数据长度为24。
state = State.WRITING;
更新Writer状态为WRITING。
// We will compress it later in finishBlock()
userDataStream = new DataOutputStream(baosInMemory);
使用DataOutputStream“装饰”baosInMemory,方便外部数据写入。
return userDataStream;
返回供外部使用的数据输出流。
2. 写入KeyValue
// Write length of key and value and then actual key and value bytes.
// Additionally, we may also write down the memstoreTS.
{
DataOutputStream out = fsBlockWriter.getUserDataStream();
out.writeInt(klength);
totalKeyLength += klength;
out.writeInt(vlength);
totalValueLength += vlength;
out.write(key, koffset, klength);
out.write(value, voffset, vlength);
if (this.includeMemstoreTS) {
WritableUtils.writeVLong(out, memstoreTS);
}
}
上述代码实际由HFileWriterV2.append完成,在此不作详细解释。可以看出,通过HFileBlock.Writer getUserDataStream获取DataOutputStream实例out,然后即可通过out将数据写入。
3. writeHeaderAndData
方法签名:public void writeHeaderAndData(FSDataOutputStream out) throws IOException
方法描述:Similar to writeHeaderAndData(FSDataOutputStream), but records the offset of this block so that it can be referenced in the next block of the same type.主要作用是记录当前块的起始偏移量,以便于操作同一类型的下一个数据块时使用,其它操作转交由重载方法writeHeaderAndData(FSDataOutputStream)负责。总体职责就是将积聚的块数据写入指定的输出流即HDFS。
触发条件:当数据块的大小超过配额限制(默认64KB)时(通过方法blockSizeWritten,即userDataStream.size()得出),间接触发该方法执行。
执行流程:
long offset = out.getPos();
if (startOffset != -1 && offset != startOffset) {
throw new IOException("A " + blockType + " block written to a "
+ "stream twice, first at offset " + startOffset + ", then at "
+ offset);
}
startOffset = offset;
记录前一个数据块的起始偏移量。
writeHeaderAndData((DataOutputStream) out);
执行流程转交给重载方法writeHeaderAndData(FSDataOutputStream),执行流程如下:
ensureBlockReady();
Transitions the block writer from the "writing" state to the "block ready" state. Does nothing if a block is already finished.
将Writer的状态从“writing”转移至“block ready”,实际过程由finishBlock完成。
out.write(onDiskBytesWithHeader);
写出头部数据、数据,如果使用压缩则包含校验和数据。
if (compressAlgo == NONE && minorVersion > MINOR_VERSION_NO_CHECKSUM) {
if (onDiskChecksum == HConstants.EMPTY_BYTE_ARRAY) {
throw new IOException("A " + blockType
+ " without compression should have checksums "
+ " stored separately.");
}
out.write(onDiskChecksum);
}
如果没有使用压缩,写出校验和数据。
4. finishBlock
方法签名:private void finishBlock() throws IOException
方法描述:An internal method that flushes the compressing stream (if using compression), serializes the header, and takes care of the separate uncompressed stream for caching on write, if applicable. Sets block write state to "block ready".
触发条件:由方法ensureBlockReady调用触发
执行流程:
userDataStream.flush();
刷出输出流数据。
uncompressedBytesWithHeader = baosInMemory.toByteArray();
数据拷贝,将字节数组输出流中的数据保存至一字节数组中。
prevOffset = prevOffsetByType[blockType.getId()];
保存与当前块类型相同的前一个块的起始偏移量。
state = State.BLOCK_READY;
更新Writer状态为BLOCK_READY,注意执行此操作时数据尚未编码或压缩。
encodeDataBlockForDisk();
编码数据,默认编码器实例为NoOpDataBlockEncoder,并不会对数据产生任何编码操作。
doCompressionAndChecksumming();
压缩数据并生成校验和,执行流程如下:
private void doCompressionAndChecksumming() throws IOException {
if ( minorVersion <= MINOR_VERSION_NO_CHECKSUM) {
version20compression();
} else {
version21ChecksumAndCompression();
}
}
minorVersion值默认为1,流程跳转至version21ChecksumAndCompression,代码如下:
// do the compression
if (compressAlgo != NONE) {
// 使用压缩
// 重置压缩输出流
compressedByteStream.reset();
// 压缩输出流中写入“假”的头部数据
compressedByteStream.write(DUMMY_HEADER_WITH_CHECKSUM);
// 重置压缩输出流状态,compressionStream“装饰”compressedByteStream
compressionStream.resetState();
// 将uncompressedBytesWithHeader中的数据部分(不包含头)写入压缩输出流
compressionStream.write(uncompressedBytesWithHeader, headerSize(this.minorVersion),
uncompressedBytesWithHeader.length - headerSize(this.minorVersion));
// 刷出并完成压缩输出流
compressionStream.flush();
compressionStream.finish();
// generate checksums
// 其实就是在压缩输出流的末尾写入“假”的校验和数据用以“占位”,为后期的计算校验和数据留出空间
onDiskDataSizeWithHeader = compressedByteStream.size(); // data size
// reserve space for checksums in the output byte stream
ChecksumUtil.reserveSpaceForChecksums(compressedByteStream,
onDiskDataSizeWithHeader, bytesPerChecksum);
// “假”头部数据 + 数据(已被压缩)+ “假”校验和数据
onDiskBytesWithHeader = compressedByteStream.toByteArray();
// 计算“真实”头部数据
put21Header(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,
uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);
// generate checksums for header and data. The checksums are
// part of onDiskBytesWithHeader itself.
// 计算“真实”校验和数据
ChecksumUtil.generateChecksums(
onDiskBytesWithHeader, 0, onDiskDataSizeWithHeader,
onDiskBytesWithHeader, onDiskDataSizeWithHeader,
checksumType, bytesPerChecksum);
// Checksums are already part of onDiskBytesWithHeader
onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY;
// 为cache-on-write生成头部数据,uncompressedBytesWithHeader的数据部分是没有被压缩的
//set the header for the uncompressed bytes (for cache-on-write)
put21Header(uncompressedBytesWithHeader, 0,
onDiskBytesWithHeader.length + onDiskChecksum.length,
uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);
} else {
// 没有使用压缩
// If we are not using any compression, then the
// checksums are written to its own array onDiskChecksum.
onDiskBytesWithHeader = uncompressedBytesWithHeader;
onDiskDataSizeWithHeader = onDiskBytesWithHeader.length;
// 计算校验和长度
int numBytes = (int)ChecksumUtil.numBytes(
uncompressedBytesWithHeader.length,
bytesPerChecksum);
onDiskChecksum = new byte[numBytes];
// 计算头部数据
//set the header for the uncompressed bytes
put21Header(uncompressedBytesWithHeader, 0,
onDiskBytesWithHeader.length + onDiskChecksum.length,
uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);
// 计算校验和数据
ChecksumUtil.generateChecksums(
uncompressedBytesWithHeader, 0, uncompressedBytesWithHeader.length,
onDiskChecksum, 0,
checksumType, bytesPerChecksum);
}
代码到此,如果没有使用压缩,onDiskBytesWithHeader中包含头部数据+数据,onDiskChecksum为相应的校验和数据;如果使用压缩,onDiskBytesWithHeader包含头部数据+压缩数据+校验和数据,onDiskChecksum为EMPTY_BYTE_ARRAY。第3步writeHeaderAndData即可据此将块数据(onDiskBytesWithHeader、onDiskChecksum)写出。
循环以上几步,即可完成HFile Block的写流程。