Hadoop MapReduce中压缩技术的使用

Compression and Input Splits

当我们使用压缩数据作为MapReduce的输入时,需要确认数据的压缩格式是否支持切片?

假设HDFS中有一个未经压缩的大小为1GB的文本文件,如果HDFS Block大小为128MB,那么这个文件会被HDFS存储为8个Block。当MapReduce Job使用这个文件作为输入时将会创建8个切片(默认每一个Block生成一个切片),每一个切片关联的数据都可以被一个Map Task独立地处理。

如果这个文本文件使用Gzip格式压缩,大小仍为1GB,如前所述,它也会被HDFS存储为8个Block。可是当MapReduce Job使用这个文件作为输入时,为每一个Block生成一个切片是不可行的,取而代之的是整个文件将作为一个切片被一个Map Task所处理。

Gzip使用DEFLATE存储压缩数据,DEFLATE将数据存储为一系列的压缩数据块,可是这些压缩数据块的边界是无法区分的,导致在数据流中无法定位某个数据块的起始位置。也就是说,我们无法随意地指定一个位置(该位置不一定恰好是某数据块的起始位置),然后移动到下一个数据块的起始位置读取数据。基本这个原因,Gzip格式的文件是不支持切片的。

对于这种情况,MapReduce是可以作出正确处理的,通过文件后缀名(文件后缀名直接影响压缩数据格式的判断)可以判断出这个文件是以Gzip格式进行压缩的,不支持切片,会将整个文件作为一个切片进行处理。但这样做是有很大代价的,一个Map Task要处理整个文件的数据,而且大部分数据并不是“数据本地性”的。

如果这个文本文件使用LZO格式压缩,同样的问题也会存在,但是Hadoop LZO Library提供了一个用于预处理LZO文件的切片索引工具,可以简单地认为生成的索引文件中保存着各个切片的起始位置,再配合合适的InputFormat(如:LzoTextInputFormat),运行MapReduce Job时就可以支持切片。

Bzip2的各个数据块之间存放有专门的“Synchronization Marker”,因此它是可以支持切片的。

Hadoop通常处理的都是大规模的数据集,因此我们必须尽可能的利用压缩优化性能。具体使用哪一个压缩格式依赖于文件大小、文件格式以及我们使用的分析工具。以下是一些使用建议:

(1)使用一些容器文件格式,如Sequence File、Avro DataFile、ORCFile、Parquet File,这些文件格式全部支持压缩和切片,配合一个快速的压缩算法(如:LZO、LZ4、Snappy)使用通常是一个好的选择;

(2)使用一个支持切片的压缩算法,如bzip2、lzo(通过索引处理之后可以支持切片);

(3)将一个文件人为地切分为Chunk(即一个文件被切分为多个文件),然后将这些Chunks逐个的进行压缩,可以使用任意支持的压缩算法,且不需要考虑压缩算法是否支持切片,但是Chunk压缩后的大小需要接近于HDFS Block的大小;

(4)文件不作压缩处理。

对于大型的文件,我们不能选择不支持切片的压缩算法,因为这会导致MapReduce Job丧失数据本地性且运行效率低下。

Using Compression in MapReduce

MapReduce读取输入路径中的压缩文件时会自动完成数据解压(可参考CompressionCodecFactory)。

如果MapReduce Job的结果输出需要使用压缩,可以通过设置Job的相关配置属性来实现:

mapreduce.output.fileoutputformat.compress:true

mapreduce.output.fileoutputformat.compress.codec:CompressionCodec全限定类名

也可以通过FileOutputFormat提供的静态方法设置,如:

FileOutputFormat.setCompressOutput(job, true);

FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

不同的输出文件格式可能相应的设置属性会有不同。

Compressing map output

 

Map Task的输出被写出到本地磁盘,而且需要通过网络传输至Reduce Task的节点,只要简单地使用一个快速的压缩算法(如LZO、LZ4、Snappy)就可以带来性能的提升,因为压缩机制的使用避免了Map Tasks与Reduce Tasks之间大量中间结果数据被传输。可以通过设置相应的Job配置属性开启:

mapreduce.map.output.compress:true

mapreduce.map.output.compress.codec:CompressionCodec全限定类名

也可以通过Configuration API进行设置:

new API:

Configuration conf = new Configuration();

conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true);

conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class, CompressionCodec.class);

Job job = new Job(conf);

old API:

conf.setCompressMapOutput(true);

conf.setMapOutputCompressorClass(GzipCodec.class);

时间: 2024-10-16 15:09:26

Hadoop MapReduce中压缩技术的使用的相关文章

Hadoop MapReduce中如何处理跨行Block和UnputSplit

Hadoop的初学者经常会疑惑这样两个问题:1.Hadoop的一个Block默认是64M,那么对于一个记录行形式的文本,会不会造成一行记录被分到两个Block当中?2.在把文件从Block中读取出来进行切分时,会不会造成一行记录被分成两个InputSplit,如果被分成两个InputSplit,这样一个InputSplit里面就有一行不完整的数据,那么处理这个InputSplit的Mapper会不会得出不正确的结果? 对于上面的两个问题,首先要明确两个概念:Block和InputSplit 1.

Hadoop Mapreduce 中的Partitioner

Partitioner的作用的对Mapper产生的中间结果进行分片,以便将同一分组的数据交给同一个Reduce处理,Partitioner直接影响Reduce阶段的负载均衡. MapReduce提供了两个Partitioner实现:HashPartitioner和TotalOederPartitioner. HashPartitioner是默认实现,实现了一种基于哈希值的分片方法,代码如下: public int getPartition(K2 key, V2 value, int numRed

Hadoop Mapreduce中shuffle 详解

MapReduce 里面的shuffle:描述者数据从map task 输出到reduce task 输入的这段过程 Shuffle 过程: 首先,map 输出的<key,value > 会放在内存中,内存有一定的大小,超过之后,会将内存里的东西溢写(spill) 到磁盘(disk)中 .在从内存溢写到磁盘的过程中,会有两个操作:分区(parttition),排序(sort).map结束之后,磁盘中会有很多文件 . 有很多小文件,需要将文件进行文件的合并,并且排序.map 中的一些map任务可

Hadoop Mapreduce 中的FileInputFormat类的文件切分算法和host选择算法

文件切分算法 文件切分算法主要用于确定InputSplit的个数以及每个InputSplit对应的数据段. FileInputFormat以文件为单位切分成InputSplit.对于每个文件,由以下三个属性值确定其对应的InputSplit的个数. goalSize:根据用户期望的InputSplit数据计算,即totalSize/numSplit.totalSize为文件总大小:numSplit为用户设定的Map Task个数,默认情况下是1. minSize:InputSplit的最小值,由

hadoop mapreduce中对splite的处理

分片: 1. 在job.submit() 提交job之后  会调用 submitter.submitJobInternal(Job.this, cluster); 2. 在submitJobInternal()函数中 会给job创建分片 int maps = writeSplits(job, submitJobDir); 在该函数中会调用writeNewSplits() 3. 在writeNewSplits()方法 中,通过反射获得InputFormat对象,会调用该对象中的getSplits(

下一代Apache Hadoop MapReduce框架的架构

背景 随着集群规模和负载增加,MapReduce JobTracker在内存消耗,线程模型和扩展性/可靠性/性能方面暴露出了缺点,为此需要对它进行大整修. 需求 当我们对Hadoop MapReduce框架进行改进时,需要时刻谨记的一个重要原则是用户的需求.近几年来,从Hadoop用户那里总结出MapReduce框架当前最紧迫的需求有: (1)可靠性(Reliability)– JobTracker不可靠 (2)可用性(Availability)– JobTracker可用性有问题 (3) 扩展

【转】Hadoop在MapReduce中使用压缩详解

原文链接 http://www.cnblogs.com/ggjucheng/archive/2012/04/22/2465580.html#top hadoop对于压缩文件的支持 hadoop对于压缩格式的是透明识别,我们的MapReduce任务的执行是透明的,hadoop能够自动为我们 将压缩的文件解压,而不用我们去关心. 如果我们压缩的文件有相应压缩格式的扩展名(比如lzo,gz,bzip2等),hadoop就会根据扩展名去选择解码器解压. hadoop对每个压缩格式的支持,详细见下表:  

Hadoop压缩之MapReduce中使用压缩

1.压缩和输入分片 Hadoop中文件是以块的形式存储在各个DataNode节点中,假如有一个文件A要做为输入数据,给MapReduce处理,系统要做的,首先从NameNode中找到文件A存储在哪些DataNode中,然后,在这些DataNode中,找到相应的数据块,作为一个单独的数据分块,作为map任务的输入,这就是mapreduce处理的数据的粗略过程!但是,我们都知道,对于一些大型的数据,压缩是很有用的,不仅能够节省存储空间,而且还能够加快传输速率.把文件压缩后再存入数据节点中,这个很常见

大数据技术之MapReduce中多表合并案例

大数据技术之MapReduce中多表合并案例 1)需求: 订单数据表t_order: id pid amount 1001 01 1 1002 02 2 1003 03 3 订单数据order.txt 1001 01 1 1002 02 2 1003 03 3 1004 01 4 1005 02 5 1006 03 6 商品信息表t_product pid pname 01 小米 02 华为 03 格力 商品数据pd.txt 01 小米 02 华为 03 格力 将商品信息表中数据根据商品pid合