Hadoop对小文件的解决方案

小文件指的是那些size比HDFS的block size(默认64M)小的多的文件。任何一个文件,目录和block,在HDFS中都会被表示为一个object存储在namenode的内存中, 每一个object占用150 bytes的内存空间。所以,如果有10million个文件, 每一个文件对应一个block,那么就将要消耗namenode 3G的内存来保存这些block的信息。如果规模再大一些,那么将会超出现阶段计算机硬件所能满足的极限。

控制小文件的方法有:

1、应用程序自己控制

2、archive

3、Sequence File / Map File

4、CombineFileInputFormat***

5、合并小文件,如HBase部分的compact

1、应用程序自己控制

final Path path = new Path("/combinedfile");
final FSDataOutputStream create = fs.create(path);
final File dir = new File("C:\\Windows\\System32\\drivers\\etc");
for(File fileName : dir.listFiles())
{
    System.out.println(fileName.getAbsolutePath());
    final FileInputStream fileInputStream = new
FileInputStream(fileName.getAbsolutePath());
    final List<String> readLines = IOUtils.readLines(fileInputStream);
    for (String line : readLines)
    {
        create.write(line.getBytes());
    }
    fileInputStream.close();
}
create.close();

2、archive 命令行操作

具体参考如下:

http://blog.csdn.net/scgaliguodong123_/article/details/46341587

3、Sequence File/Map File

Sequence File

通常对于”the small files problem”的回应会是:使用SequenceFile。

这种方法是说,使用filename作为key,并且file contents作为value。实践中这种方式非常管用。

如果有10000个100KB的文件,可以写一个程序来将这些小文件写入到一个单独的 SequenceFile中去,然后就可以在一个streaming fashion(directly or using mapreduce)中来使用这个sequenceFile。不仅如此,SequenceFiles也是splittable的,所以mapreduce 可以break them into chunks,并且分别的被独立的处理。和HAR不同的是,这种方式还支持压缩。 block的压缩在许多情况下都是最好的选择,因为它将多个 records压缩到一起,而不是一个record一个压缩。

在存储结构上, SequenceFile主要由一个Header后跟多条Record组成。

Header主要包含了Key classname, Value classname,存储压缩算法,用户自定义元数据等信息,此外,还包含了一些同步标识,用于快速定位到记录的边界。

每条Record以键值对的方式进行存储,用来表示它的字符数组可依次解析成:记录的长度、 Key的长度、 Key值和Value值,并且Value值的结构取决于该记录是否被压缩。

数据压缩有利于节省磁盘空间和加快网络传输, SeqeunceFile支持两种格式的数据压缩,分别是: record compression和block compression。

record compression是对每条记录的value进行压缩

block compression是将一连串的record组织到一起,统一压缩成一个block。

block信息主要存储了:块所包含的记录数、每条记录Key长度的集合、每条记录Key值的集合、每条记录Value长度的集合和每条记录Value值的集合

注:每个block的大小是可通过io.seqfile.compress.blocksize属性来指定的。

Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(conf);
Path seqFile=new Path("seqFile.seq");
//Reader内部类用于文件的读取操作
SequenceFile.Reader reader=new SequenceFile.Reader(fs,seqFile,conf);
//Writer内部类用于文件的写操作,假设Key和Value都为Text类型
SequenceFile.Writer writer=new SequenceFile.Writer(fs,conf,seqFile,Text.class,Text.class);
//通过writer向文档中写入记录
writer.append(new Text("key"),new Text("value"));
IOUtils.closeStream(writer);//关闭write流
//通过reader从文档中读取记录
Text key=new Text();
Text value=new Text();
while(reader.next(key,value))
{
    System.out.println(key);
    System.out.println(value);
}
IOUtils.closeStream(reader);//关闭read流

具体可参考:

http://blog.csdn.net/scgaliguodong123_/article/details/46391061

MapFile

MapFile是排序后的SequenceFile,通过观察其目录结构可以看到

MapFile由两部分组成,分别是data和index。

index作为文件的数据索引,主要记录了每个Record的key值,以及

该Record在文件中的偏移位置。

在MapFile被访问的时候,索引文件会被加载到内存,通过索引映射关系可迅速定位到指定Record所在文件位置,因此,相对SequenceFile而言, MapFile的检索效率是高效的,缺点是会消耗一部分内存来存储index数据。

注意的是, MapFile并不会把所有Record都记录到index中去,默认情况下每隔128条记录存储一个索引映射。当然,记录间隔可人为修改,通过MapFIle.Writer的setIndexInterval()方法,或修改io.map.index.interval属性;

另外,与SequenceFile不同的是, MapFile的KeyClass一定要实现

WritableComparable接口 ,即Key值是可比较的。

Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(conf);
Path mapFile=new Path("mapFile.map");
//Writer内部类用于文件的写操作,假设Key和Value都为Text类型
MapFile.Writer writer=new MapFile.Writer(conf,fs,mapFile.toString(),Text.class,Text.class);
//通过writer向文档中写入记录
writer.append(new Text("key"),new Text("value"));
IOUtils.closeStream(writer);//关闭write流
//Reader内部类用于文件的读取操作
MapFile.Reader reader=new MapFile.Reader(fs,mapFile.toString(),conf);
//通过reader从文档中读取记录
Text key=new Text();
Text value=new Text();
while(reader.next(key,value))
{
    System.out.println(key);
    System.out.println(value);
}
IOUtils.closeStream(reader);//关闭read流

5、CombineFileInputFormat

相对于大量的小文件来说,hadoop更合适处理少量的大文件。

CombineFileInputFormat可以缓解这个问题,它是针对小文件而设计的。

**注:**CombineFileInputFormat是一个抽象类,需要编写一个继承类。

使用CombineFileInputFormat作为Map任务的输入规格描述,首先需要实现一个自定义的RecordReader。

CombineFileInputFormat的大致原理

它会将输入多个数据文件(小文件)的元数据全部包装到CombineFileSplit类里面。也就是说,因为小文件的情况下,在HDFS中都是单Block的文件,即一个文件一个Block,一个CombineFileSplit包含了一组文件Block,包括每个文件的起始偏移(offset),长度(length),Block位置(localtions)等元数据。

如果想要处理一个 CombineFileSplit,很容易想到,对其包含的每个InputSplit(实际上这里面没有这个,你需要读取一个小文件块的时候,需要构造一 个FileInputSplit对象)。

在执行MapReduce任务的时候,需要读取文件的文本行(简单一点是文本行,也可能是其他格式数据)。

那么对于CombineFileSplit来说,你需要处理其包含的小文件Block,就要对应设置一个RecordReader,才能正确读取文件数据内容。

通常情况下,我们有一批小文件,格式通常是相同的,只需要在CombineFileSplit实现一个RecordReader的时候,

内置另一个用来读取小文件Block的RecordReader,这样就能保证读取CombineFileSplit内部聚积的小文件。

我们基于Hadoop内置的CombineFileInputFormat来实现处理海量小文件,需要做的工作,如下所示:

1、实现一个RecordReader来读取CombineFileSplit包装的文件Block

2、继承自CombineFileInputFormat实现一个使用我们自定义的RecordReader的输入规格说明类。

3、处理数据的Mapper实现类

4、配置用来处理海量小文件的MapReduce Job

package SmallFile;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

public class CombineSmallfileInputFormat extends
            CombineFileInputFormat<LongWritable,BytesWritable>
{
    @Override
    public RecordReader<LongWritable, BytesWritable> createRecordReader(
            InputSplit split, TaskAttemptContext context) throws IOException
    {
        CombineFileSplit combineFileSplit = (CombineFileSplit)(split);
        CombineFileRecordReader<LongWritable,BytesWritable> recordReader =
            new CombineFileRecordReader<LongWritable,BytesWritable>
            (combineFileSplit, context,CombineSmallfileRecordReader.class);
        try
        {
            recordReader.initialize(combineFileSplit, context);
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        return recordReader;
    }
}

class CombineSmallfileRecordReader extends RecordReader<LongWritable,BytesWritable>
{
    private CombineFileSplit combineFileSplit;
    private LineRecordReader lineRecordReader = new LineRecordReader();
    private Path[] paths;
    private int totalLength;
    private int currentIndex;
    private float currentProgress = 0;
    private LongWritable currentKey;
    private BytesWritable currentValue;

    public CombineSmallfileRecordReader(CombineFileSplit combineFileSplit,TaskAttemptContext context,Integer index)
    {
        super();
        this.combineFileSplit = combineFileSplit;
        this.currentIndex = index;
    }

     @Override
     public void initialize(InputSplit split, TaskAttemptContext context)
       throws IOException, InterruptedException
     {
          FileSplit fileSplit = new FileSplit(combineFileSplit.getPath(currentIndex),
                  combineFileSplit.getOffset(currentIndex),combineFileSplit.getLength(currentIndex),
                  combineFileSplit.getLocations());
          lineRecordReader.initialize(fileSplit, context);
          this.paths = combineFileSplit.getPaths(); //分区所在的所有地址
          context.getConfiguration().set("map.input.file.name",
                  combineFileSplit.getPath(currentIndex).getName()); //设置输入文件名
     }

     @Override
     public boolean nextKeyValue() throws IOException, InterruptedException
     {
         if(currentIndex>=0 && currentIndex<totalLength)
         {
             return lineRecordReader.nextKeyValue();
         }
         return false;
     }

     @Override
     public LongWritable getCurrentKey() throws IOException, InterruptedException
     {
          currentKey = lineRecordReader.getCurrentKey();
          return currentKey;
     }

     @Override
     public BytesWritable getCurrentValue() throws IOException, InterruptedException
     {
          byte[]value = lineRecordReader.getCurrentValue().getBytes();
          currentValue.set(value, 0, value.length);
          return currentValue;
     }

     @Override
     public float getProgress() throws IOException, InterruptedException
     {
          if(currentIndex>=0 && currentIndex<totalLength)
          {
               currentProgress = currentIndex/totalLength;
               return currentProgress;
          }
          return currentProgress;
     }

     @Override
     public void close() throws IOException
     {
         lineRecordReader.close();
     }
}
时间: 2024-10-05 21:55:49

Hadoop对小文件的解决方案的相关文章

Hadoop的小文件解决方案

小文件指的是那些size比HDFS的block size(默认64M)小的多的文件.任何一个文件,目录和block,在HDFS中都会被表示为一个object存储在namenode的内存中,每一个object占用150bytes的内存空间.所以,如果有10million(一千万)个文件,每一个文件对应一个block,那么就将要消耗namenode3G的内存来保存这些block的信息,如果规模再大一些,那么将会超出现阶段计算机硬件所能满足的极限. 相同大小下,小文件越多,对namenode造成的内存

[Hadoop]大量小文件问题及解决方案

1. HDFS上的小文件问题 小文件是指文件大小明显小于HDFS上块(block)大小(默认64MB)的文件.如果存储小文件,必定会有大量这样的小文件,否则你也不会使用Hadoop(If you're storing small files, then you probably have lots of them (otherwise you wouldn't turn to Hadoop)),这样的文件给hadoop的扩展性和性能带来严重问题.当一个文件的大小小于HDFS的块大小(默认64MB

大数据-Hadoop小文件问题解决方案

HDFS中小文件是指文件size小于HDFS上block(dfs block size)大小的文件.大量的小文件会给Hadoop的扩展性和性能带来严重的影响.HDFS中小文件是指文件size小于HDFS上block大小的文件.大量的小文件会给Hadoop的扩展性和性能带来严重的影响. 大数据学习群:716581014 小文件是如何产生的? 动态分区插入数据,产生大量的小文件,从而导致map数量剧增 reduce数量越多,小文件也越多,reduce的个数和输出文件个数一致 数据源本身就是大量的小文

小文件的解决方案

小文件指的是那些size比HDFS的block size(默认64M)小的多的文件.任何一个文件,目录和block,在HDFS中都会被表示为一个object存储在namenode的内存中,每一个object占用150 bytes的内存空间.所以,如果有10million个文件,每一个文件对应一个block,那么就将要消耗namenode 3G的内存来保存这些block的信息.如果规模再大一些,那么将会超出现阶段计算机硬件所能满足的极限. 解决小文件的四种解决方案: 1.应用程序自己控制 2.ar

Hadoop之小文件处理与调优经验

HDFS上每个文件都要在namenode上建立一个索引,这个索引的大小约为150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用namenode的内存空间,另一方面就是索引文件过大是的索引速度变慢. 解决的方式:   1:Hadoop本身提供了一些文件压缩的方案 2:从系统层面改变现有HDFS存在的问题,其实主要还是小文件的合并,然后建立比较快速的索引. Hadoop自带小文件解决方案 1:Hadoop Archive: 是一个高效地将小文件放入HDFS块中的文件存档

Hadoop合并小文件的几种方法

1.Hadoop HAR 将众多小文件打包成一个大文件进行存储,并且打包后原来的文件仍然可以通过Map-Reduce进行操作,打包后的文件由索引和存储两大部分组成: 缺点: 一旦创建就不能修改,也不支持追加操作,还不支持文档压缩,当有新文件进来以后,需要重新打包. 2.SequeuesFile 适用于非文体格式,可作小文件容器,并可压缩: 3.CombineFileInputFormat 将多个文件合并成一个split作为输入,减少map输入与HDFS块的耦合: 4.Java代码实现,使用HDF

HDFS2&mdash;SequenceFile(小文件的解决方案)

1.这种方法是说,使用filename作为key,并且file contents作为value.实践中这种方式非常管用. 2.和HAR不同的是,这种方式还支持压缩. 3.block的压缩在许多情况下都是最好的选择,因为它将多个 records压缩到一起,而不是一个record一个压缩. 4.在存储结构上,SequenceFile主要由一个Header后跟多条Record组成. 5.Header主要包含了Key classname,Value classname,存储压缩算法,用户自定义元数据等信

关于rsync删除大量小文件的问题?

关于网上大量的rsync删除100w小文件的解决方案.博主两台虚拟机,每个目录下有100w个小文件,以下分别为rm和rsync两种删除方法 rm 环境:CentOS release 5.8 ,内存1G,处理器单核,硬盘30G 使用rsync删除,环境:CentOS release 6.5,内存500M,处理器单核,硬盘30G 开始怀疑网上看文章的人都有尝试过么?然后在看看两台机子的配置,调高了rsync所在机子的内存为1G,环境:CentOS release 6.5,内存1G,处理器单核,硬盘3

解决Hadoop一些小问题(整理版)

1 dfsadmin -setQuota的问题 dfsadmin -setQuota 限制文件数量 dfsadmin -setSpaceQuota 限制磁盘空间 2 解决Hadoop小文件问题? 数据块的默认大小是64M,如果一个文件的大小小于64M,那么它就属于Hadoop的小文件. 这样会浪费空间,所以要使用archive的方式来实现归并小文件.数据块的大小可以使用 dfs.block.size这个属性进行配置. 3 start-dfs.sh的告警信息 unable to load nati