HBase HFileBlock

HFileBlock官方源码注释:

Reading HFile version 1 and 2 blocks, and writing version 2 blocks.

  • In version 1 all blocks are always compressed or uncompressed, as specified by the HFile‘s compression algorithm, with a type-specific magic record stored in the beginning of the compressed data (i.e. one needs to uncompress the compressed block to determine the block type). There is only a single compression algorithm setting for all blocks. Offset and size information from the block index are required to read a block.
  • In version 2 a block is structured as follows:
    • Magic record identifying the block type (8 bytes)
    • Compressed block size, header not included (4 bytes)
    • Uncompressed block size, header not included (4 bytes)
    • The offset of the previous block of the same type (8 bytes). This is used to be able to navigate to the previous block without going to the block
    • For minorVersions >=1, there is an additional 4 byte field bytesPerChecksum that records the number of bytes in a checksum chunk.
    • For minorVersions >=1, there is a 4 byte value to store the size of data on disk (excluding the checksums)
    • For minorVersions >=1, a series of 4 byte checksums, one each for the number of bytes specified by bytesPerChecksum. index.
    • Compressed data (or uncompressed data if compression is disabled). The compression algorithm is the same for all the blocks in the HFile, similarly to what was done in version 1.

The version 2 block representation in the block cache is the same as above, except that the data section is always uncompressed in the cache.

从上述文档可以看出,随着HBase版本的迭代,出现了两种格式的HFile(version1、version2),对应着两个格式的HFileBlock(version1、version2),这里仅考虑version2的情况,且minorVersions默认值为1,由此可得,HFileBlock version2格式如下图:

magic record (8 bytes)
compressed block size (header not included, 8 bytes)
uncompressed block size (header not included, 4 bytes)
the offset of the previous block of the same type (8 bytes)
bytesPerChecksum (the number of bytes in a checksum chunk, 4 bytes)
data size on disk (excluding the checksums)
a series of 4 byte checksums (one each for the number of bytes specified by bytesPerChecksum)
compressed data (uncompressed data if compression is disabled)

HFileBlock.Writer

 

Unified version 2 HFile block writer. The intended usage pattern is as follows:

  • Construct an HFileBlock.Writer, providing a compression algorithm
  • Call Writer.startWriting(BlockType, boolean) and get a data stream to write to
  • Write your data into the stream
  • Call Writer.writeHeaderAndData(FSDataOutputStream) as many times as you need to store the serialized block into an external stream, or call Writer.getHeaderAndData() to get it as a byte array.
  • Repeat to write more blocks

从上述文档可以看出,官方建议HFileBlock.Writer的使用方式如下:

(1)构建HFileBlock.Writer实例,需要提供使用什么压缩算法、使用什么数据编码格式、是否包含MemstoreTS、HBase小版本号、校验和类型以及多少字节数据生成一个校验和;

(2)通过startWriting方法获取到一个可以用以写出数据的输出流;

(3)根据需要循环调用writeHeaderAndData方法将块数据写出至输出流,然后可以通过方法getHeaderAndData得到一个包含已写出块数据的字节数组;

(4)循环(2)、(3)写出更多的块数据。

类图

核心变量

private State state = State.INIT;

Writer state. Used to ensure the correct usage protocol.

用以标识当前Writer状态,在其它方法调用期间确保Writer状态的正确性(即某些方法调用时Writer应处于特定状态),Writer状态有以下三种类型:

  • INIT
  • WRITING
  • BLOCK_READY

private final Compression.Algorithm compressAlgo;

Compression algorithm for all blocks this instance writes.

用以标识当前Writer使用的压缩算法,共有以下几种类型:

  • LZO
  • GZ
  • NONE
  • SNAPPY
  • LZ4

private final HFileDataBlockEncoder dataBlockEncoder;

Data block encoder used for data blocks.

数据块使用的编码器。

private ByteArrayOutputStream baosInMemory;

The stream we use to accumulate data in uncompressed format for each block. We reset this stream at the end of each block and reuse it. The header is written as the first HFileBlock.headerSize(int) bytes into this stream.

该输出流用以积聚每个块的非压缩格式数据(每个块的数据由多个KeyValue组成),当一个块数据写出完毕后,通过方法reset重置此输出流,以使多个块之间可以重用该输出流。

private Compressor compressor;

private CompressionOutputStream compressionStream;

private ByteArrayOutputStream compressedByteStream;

如果使用压缩,将会涉及到上面三个变量:

compressor:Compressor, which is also reused between consecutive blocks.

compressionStream:Compression output stream.

compressedByteStream:Underlying stream to write compressed bytes to.

其中,compressionStream通过封装compressor、compressedByteStream而成。

private BlockType blockType;

Current block type. Set in startWriting(BlockType). Could be changed in encodeDataBlockForDisk() from BlockType.DATA to BlockType.ENCODED_DATA.

用以标识当前数据块的类型。

private DataOutputStream userDataStream;

A stream that we write uncompressed bytes to, which compresses them and writes them to baosInMemory.

userDataStream通过封装baosInMemory而成。

private byte[] onDiskBytesWithHeader;

Bytes to be written to the file system, including the header. Compressed if compression is turned on. It also includes the checksum data that immediately follows the block data. (header + data + checksums)

写入文件系统的最终字节数据,可能是压缩后的字节数据,同时包含头和检验和数据。

private int onDiskDataSizeWithHeader;

The size of the data on disk that does not include the checksums. (header + data)

写入文件系统的最终字节数据大小,不包含校验和数据。

private byte[] onDiskChecksum;

The size of the checksum data on disk. It is used only if data is not compressed. If data is compressed, then the checksums are already part of onDiskBytesWithHeader. If data is uncompressed, then this variable stores the checksum data for this block.

校验和数据,它仅仅被使用在数据没有被压缩的场景下。如果数据被压缩,则校验和数据包含在onDiskBytesWithHeader中;如果数据没有被压缩,onDiskChecksum存储着当前数据块的校验和数据。

private byte[] uncompressedBytesWithHeader;

Valid in the READY state. Contains the header and the uncompressed (but potentially encoded, if this is a data block) bytes, so the length is uncompressedSizeWithoutHeader + HFileBlock.headerSize(int). Does not store checksums.

包含头和未压缩的数据(数据可能已经被编码),不包含检验和数据。

private long startOffset;

Current block‘s start offset in the HFile. Set in writeHeaderAndData(FSDataOutputStream).

当前数据块在HFile中的起始偏移量。

private long[] prevOffsetByType;

Offset of previous block by block type. Updated when the next block is started.

private long prevOffset;

The offset of the previous block of the same type.

用以记录和当前块类型相同的前一个数据块在HFile中的起始偏移量,其中prevOffsetByType中记录着所有通过Writer写出的数据块类型的起始偏移量,并在开始下一个数据块时更新值。

private boolean includesMemstoreTS;

Whether we are including memstore timestamp after every key/value.

用以标识是否在每个KeyValue键值数据后加入MemstoreTS。

private ChecksumType checksumType;

校验和类型,包含以下几种:

  • NULL
  • CRC32
  • CRC32C

private int bytesPerChecksum;

用以标识多少字节数据需要形成一个校验和。

private final int minorVersion;

用以标识HBase小版本号。

HFileBlock.Writer实例创建(构造函数)过程

this.minorVersion = minorVersion;  // 默认值为HFileReaderV2.MAX_MINOR_VERSION(1)

compressAlgo = compressionAlgorithm == null ? NONE : compressionAlgorithm; // 默认值为Compression.Algorithm.NONE

this.dataBlockEncoder = dataBlockEncoder != null ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE; // 默认值为NoOpDataBlockEncoder.INSTANCE,该实例实际上不对数据进行任务加工。

baosInMemory = new ByteArrayOutputStream();

if (compressAlgo != NONE) {

compressor = compressionAlgorithm.getCompressor();

compressedByteStream = new ByteArrayOutputStream();

try {

compressionStream = compressionAlgorithm.createPlainCompressionStream(compressedByteStream, compressor);

} catch (IOException e) {

throw new RuntimeException("Could not create compression stream " + "for algorithm " + compressionAlgorithm, e);

}

}

创建字节输出流,如果使用压缩,则需要创建相应的压缩输出流,无论压缩与否,底层均包装ByteArrayOutputStream,由此可以看出,HFileBlock.Writer并没有将数据直接写入文件系统,而且仅仅缓存在了内存中的一个字节数组中,外部程序拿到这个字节数组(会经过若干步处理,如编码、压缩)后再将数据写入文件系统。

if (minorVersion > MINOR_VERSION_NO_CHECKSUM && bytesPerChecksum < HEADER_SIZE_WITH_CHECKSUMS) {

throw new RuntimeException("Unsupported value of bytesPerChecksum. " +

" Minimum is " + HEADER_SIZE_WITH_CHECKSUMS + " but the configured value is " +

bytesPerChecksum);

}

校验bytesPerChecksum。

prevOffsetByType = new long[BlockType.values().length];

for (int i = 0; i < prevOffsetByType.length; ++i)

prevOffsetByType[i] = -1;

初始化数组prevOffsetByType,数组长度为块类型的个数。

this.includesMemstoreTS = includesMemstoreTS;  // 默认值为true

this.checksumType = checksumType;  // 默认值为ChecksumType.CRC32

this.bytesPerChecksum = bytesPerChecksum;  // 默认值为16 * 1024

典型处理流程及源码分析

 

1. startWriting

方法签名:public DataOutputStream startWriting(BlockType newBlockType) throws IOException

方法描述:Starts writing into the block. The previous block‘s data is discarded.准备写入块数据,前一个块的数据将被丢弃。

触发条件:HFileWriterV2.newBlock()

执行流程:

if (state == State.BLOCK_READY && startOffset != -1) {

// We had a previous block that was written to a stream at a specific

// offset. Save that offset as the last offset of a block of that type.

prevOffsetByType[blockType.getId()] = startOffset;

}

根据数据块类型,记录前一个数据块在HFile中的起始偏移量,仅当前一个数据块写入完成(state为BLOCK_READY)才会执行此操作。

startOffset = -1;

blockType = newBlockType;

重置起始偏移量及数据块类型。

baosInMemory.reset();

baosInMemory.write(getDummyHeaderForVersion(this.minorVersion));

重置字节数组输出流,并写入“假”的头部信息,即一个空的字节数组,数据长度为24。

state = State.WRITING;

更新Writer状态为WRITING。

// We will compress it later in finishBlock()

userDataStream = new DataOutputStream(baosInMemory);

使用DataOutputStream“装饰”baosInMemory,方便外部数据写入。

return userDataStream;

返回供外部使用的数据输出流。

2. 写入KeyValue

// Write length of key and value and then actual key and value bytes.

// Additionally, we may also write down the memstoreTS.

{

DataOutputStream out = fsBlockWriter.getUserDataStream();

out.writeInt(klength);

totalKeyLength += klength;

out.writeInt(vlength);

totalValueLength += vlength;

out.write(key, koffset, klength);

out.write(value, voffset, vlength);

if (this.includeMemstoreTS) {

WritableUtils.writeVLong(out, memstoreTS);

}

}

上述代码实际由HFileWriterV2.append完成,在此不作详细解释。可以看出,通过HFileBlock.Writer getUserDataStream获取DataOutputStream实例out,然后即可通过out将数据写入。

3. writeHeaderAndData

方法签名:public void writeHeaderAndData(FSDataOutputStream out) throws IOException

方法描述:Similar to writeHeaderAndData(FSDataOutputStream), but records the offset of this block so that it can be referenced in the next block of the same type.主要作用是记录当前块的起始偏移量,以便于操作同一类型的下一个数据块时使用,其它操作转交由重载方法writeHeaderAndData(FSDataOutputStream)负责。总体职责就是将积聚的块数据写入指定的输出流即HDFS。

触发条件:当数据块的大小超过配额限制(默认64KB)时(通过方法blockSizeWritten,即userDataStream.size()得出),间接触发该方法执行。

执行流程:

long offset = out.getPos();

if (startOffset != -1 && offset != startOffset) {

throw new IOException("A " + blockType + " block written to a "

+ "stream twice, first at offset " + startOffset + ", then at "

+ offset);

}

startOffset = offset;

记录前一个数据块的起始偏移量。

writeHeaderAndData((DataOutputStream) out);

执行流程转交给重载方法writeHeaderAndData(FSDataOutputStream),执行流程如下:

ensureBlockReady();

Transitions the block writer from the "writing" state to the "block ready" state. Does nothing if a block is already finished.

将Writer的状态从“writing”转移至“block ready”,实际过程由finishBlock完成。

out.write(onDiskBytesWithHeader);

写出头部数据、数据,如果使用压缩则包含校验和数据。

if (compressAlgo == NONE && minorVersion > MINOR_VERSION_NO_CHECKSUM) {

if (onDiskChecksum == HConstants.EMPTY_BYTE_ARRAY) {

throw new IOException("A " + blockType

+ " without compression should have checksums "

+ " stored separately.");

}

out.write(onDiskChecksum);

}

如果没有使用压缩,写出校验和数据。

4. finishBlock

方法签名:private void finishBlock() throws IOException

方法描述:An internal method that flushes the compressing stream (if using compression), serializes the header, and takes care of the separate uncompressed stream for caching on write, if applicable. Sets block write state to "block ready".

触发条件:由方法ensureBlockReady调用触发

执行流程:

userDataStream.flush();

刷出输出流数据。

uncompressedBytesWithHeader = baosInMemory.toByteArray();

数据拷贝,将字节数组输出流中的数据保存至一字节数组中。

prevOffset = prevOffsetByType[blockType.getId()];

保存与当前块类型相同的前一个块的起始偏移量。

state = State.BLOCK_READY;

更新Writer状态为BLOCK_READY,注意执行此操作时数据尚未编码或压缩。

encodeDataBlockForDisk();

编码数据,默认编码器实例为NoOpDataBlockEncoder,并不会对数据产生任何编码操作。

doCompressionAndChecksumming();

压缩数据并生成校验和,执行流程如下:

private void doCompressionAndChecksumming() throws IOException {

if ( minorVersion <= MINOR_VERSION_NO_CHECKSUM) {

version20compression();

} else {

version21ChecksumAndCompression();

}

}

minorVersion值默认为1,流程跳转至version21ChecksumAndCompression,代码如下:

// do the compression

if (compressAlgo != NONE) {

// 使用压缩

// 重置压缩输出流

compressedByteStream.reset();

// 压缩输出流中写入“假”的头部数据

compressedByteStream.write(DUMMY_HEADER_WITH_CHECKSUM);

// 重置压缩输出流状态,compressionStream“装饰”compressedByteStream

compressionStream.resetState();

// 将uncompressedBytesWithHeader中的数据部分(不包含头)写入压缩输出流

compressionStream.write(uncompressedBytesWithHeader, headerSize(this.minorVersion),

uncompressedBytesWithHeader.length - headerSize(this.minorVersion));

// 刷出并完成压缩输出流

compressionStream.flush();

compressionStream.finish();

// generate checksums

// 其实就是在压缩输出流的末尾写入“假”的校验和数据用以“占位”,为后期的计算校验和数据留出空间

onDiskDataSizeWithHeader = compressedByteStream.size(); // data size

// reserve space for checksums in the output byte stream

ChecksumUtil.reserveSpaceForChecksums(compressedByteStream,

onDiskDataSizeWithHeader, bytesPerChecksum);

// “假”头部数据 + 数据(已被压缩)+ “假”校验和数据

onDiskBytesWithHeader = compressedByteStream.toByteArray();

// 计算“真实”头部数据

put21Header(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,

uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);

// generate checksums for header and data. The checksums are

// part of onDiskBytesWithHeader itself.

// 计算“真实”校验和数据

ChecksumUtil.generateChecksums(

onDiskBytesWithHeader, 0, onDiskDataSizeWithHeader,

onDiskBytesWithHeader, onDiskDataSizeWithHeader,

checksumType, bytesPerChecksum);

// Checksums are already part of onDiskBytesWithHeader

onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY;

// 为cache-on-write生成头部数据,uncompressedBytesWithHeader的数据部分是没有被压缩的

//set the header for the uncompressed bytes (for cache-on-write)

put21Header(uncompressedBytesWithHeader, 0,

onDiskBytesWithHeader.length + onDiskChecksum.length,

uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);

} else {

// 没有使用压缩

// If we are not using any compression, then the

// checksums are written to its own array onDiskChecksum.

onDiskBytesWithHeader = uncompressedBytesWithHeader;

onDiskDataSizeWithHeader = onDiskBytesWithHeader.length;

// 计算校验和长度

int numBytes = (int)ChecksumUtil.numBytes(

uncompressedBytesWithHeader.length,

bytesPerChecksum);

onDiskChecksum = new byte[numBytes];

// 计算头部数据

//set the header for the uncompressed bytes

put21Header(uncompressedBytesWithHeader, 0,

onDiskBytesWithHeader.length + onDiskChecksum.length,

uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);

// 计算校验和数据

ChecksumUtil.generateChecksums(

uncompressedBytesWithHeader, 0, uncompressedBytesWithHeader.length,

onDiskChecksum, 0,

checksumType, bytesPerChecksum);

}

代码到此,如果没有使用压缩,onDiskBytesWithHeader中包含头部数据+数据,onDiskChecksum为相应的校验和数据;如果使用压缩,onDiskBytesWithHeader包含头部数据+压缩数据+校验和数据,onDiskChecksum为EMPTY_BYTE_ARRAY。第3步writeHeaderAndData即可据此将块数据(onDiskBytesWithHeader、onDiskChecksum)写出。

循环以上几步,即可完成HFile Block的写流程。

时间: 2024-10-29 19:06:32

HBase HFileBlock的相关文章

hbase官方文档(转)

Apache HBase™ 参考指南  HBase 官方文档中文版 Copyright © 2012 Apache Software Foundation.保留所有权利. Apache Hadoop, Hadoop, MapReduce, HDFS, Zookeeper, HBase 及 HBase项目 logo 是Apache Software Foundation的商标. Revision History Revision 0.95-SNAPSHOT 2012-12-03T13:38 中文版

hbase源码系列(十三)缓存机制MemStore与Block Cache

这一章讲hbase的缓存机制,这里面涉及的内容也是比较多,呵呵,我理解中的缓存是保存在内存中的特定的便于检索的数据结构就是缓存. 之前在讲put的时候,put是被添加到Store里面,这个Store是个接口,实现是在HStore里面,MemStore其实是它底下的小子. 那它和Region Server.Region是什么关系? Region Server下面有若干个Region,每个Region下面有若干的列族,每个列族对应着一个HStore. HStore里面有三个很重要的类,在这章的内容都

hbase源码系列(十二)Get、Scan在服务端是如何处理?

继上一篇讲了Put和Delete之后,这一篇我们讲Get和Scan, 因为我发现这两个操作几乎是一样的过程,就像之前的Put和Delete一样,上一篇我本来只打算写Put的,结果发现Delete也可以走这个过程,所以就一起写了. Get 我们打开HRegionServer找到get方法.Get的方法处理分两种,设置了ClosestRowBefore和没有设置的,一般来讲,我们都是知道了明确的rowkey,不太会设置这个参数,它默认是false的. if (get.hasClosestRowBef

hbase源码系列(九)StoreFile存储格式

从这一章开始要讲Region Server这块的了,但是在讲Region Server这块之前得讲一下StoreFile,否则后面的不好讲下去,这块是基础,Region Sever上面的操作,大部分都是基于它来进行的. HFile概述 HFile是HBase中实际存数据的文件,为HBase提供高效快速的数据访问.它是基于Hadoop的TFile,模仿Google Bigtable 架构中的SSTable格式.文件格式如下: 文件是变长的,唯一固定的块是File Info和Trailer,如图所示

网易视频云:HBase – 存储文件HFile结构解析

网易视频云是网易推出的PAAS视频云服务,主要应用于在线教育.直播秀场.远程医疗.企业协作等领域.今天,网易视频云技术专家与大家分享一下:HBase – 存储文件HFile结构解析. HFile是HBase存储数据的文件组织形式,参考BigTable的SSTable和Hadoop的TFile实现.从HBase开始到现在,HFile经历了三个版本,其中V2在0.92引入,V3在0.98引入.HFileV1版本的在实际使用过程中发现它占用内存多,HFile V2版本针对此进行了优化,HFile V3

hbase过滤器(1)

最近在公司做hbase就打算复习下它的过滤器以便不时之需,RowFilter根据行键(rowkey)筛选数据 public void filter() throws IOException { Filter rf = new RowFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("35643b94-b396-4cdc-abd9-029ca495769d"))); Scan s = new S

[原创]HBase学习笔记(1)-安装和部署

HBase安装和部署 使用的HBase版本是1.2.4 1.安装步骤(默认hdfs已安装好) # 下载并解压安装包 cd tools/ tar -zxf hbase-1.2.4-bin.tar.gz   # 重命名为hbase mv hbase-1.2.4 hbase # 将hadoop目录下的hdfs-site.xml 和 core-stie.xml拷贝到 hbase下的conf 目录中 cd /home/work/tools/hbase/conf cp /home/work/tools/ha

Hbase delete遇到的常见异常: Exception in thread &quot;main&quot; java.lang.UnsupportedOperationException

hbase 执行批量删除时出现错误: Exception in thread "main" java.lang.UnsupportedOperationException at java.util.AbstractList.remove(AbstractList.java:161) at org.apache.hadoop.hbase.client.HTable.delete(HTable.java:852) 这种异常其实很常见,remove操作不支持,为什么会出现不支持的情况呢?检查

HBase学习

记录HBase的学习过程.之后会陆续添加内容. 读取hbase的博客,理解hbase是什么.推荐博文: 1,HBase原理,基础架构,基础概念 2,HBase超详细介绍 ----------------------------------------------------- 一.直接实践吧! 1,HBase standalone模式安装 版本:1.2.4 参考文档:http://archive.cloudera.com/cdh5/cdh/5/hbase-0.98.6-cdh5.3.3/book