Hadoop小文件问题及解决方案

1.概述

小文件是指文件size小于HDFS上block大小的文件。这样的文件会给hadoop的扩展性和性能带来严重问题。首先,在HDFS中,任何block,文件或者目录在内存中均以对象的形式存储,每个对象约占150byte,如果有1千万个小文件,每个文件占用一个block,则NameNode大约需要2G空间。如果存储一亿个文件,则NameNode需要20G空间。这样NameNode内存容量严重制约了集群的扩展。其次,访问大量小文件速度远远小于访问几个大文件。HDFS最初是为流式访问大文件开发的,如果访问大量小文件,需要不断的从一个DataNode跳到另外一个DataNode,严重影响性能。最后,处理大量小文件速度远远小于处理同等大小的大文件的速度。每一个小文件要占用一个slot,而task启动将耗费大量时间甚至大部分时间都耗费在启动task和释放task上。

2.HDFS文件读写流程

在正式介绍HDFS小文件存储方案之前,我们先介绍一下当前HDFS上文件存取的基本流程。

2.1 读文件流程

A.client端发送读文件请求给NameNode,如果文件不存在,返回错误信息,否则,将该文件对应的block机器所在DataNode位置发送给client。

B.client收到文件位置信息后,与不同DataNode建立socket连接并行获取数据。

2.2 写文件流程

A.client端发送写文件请求,NameNode检查文件是否存在,如果已经存在,直接返回错误信息,否则,发送给client一些可用节点。

B.client将文件分块,并行存储到不同DataNode节点上,发送完成以后,client同时发送信息给NameNode和DataNode。

C.NameNode收到client的信息后,发送信息给DataNode。

D.DataNode同时收到NameNode和DataNode的确认信息后,提交写操作。

3 解决小文件的方案

3.1 编写应用程序实现

[java] view plaincopy

  1. public class AppForSmallFile {
  2. //定义文件读取的路径
  3. private static final String OUTPATH = "hdfs://liaozhongmin:9000";
  4. public static void main(String[] args) {
  5. //定义FSDataOutputStream对象
  6. FSDataOutputStream fsDataoutputStream = null;
  7. //定义输入流读文件
  8. InputStreamReader inputStreamReader = null;
  9. try {
  10. //创建合并后文件存储的的路径
  11. Path path = new Path(OUTPATH + "/combinedFile");
  12. //创建FSDataOutputStream对象
  13. fsDataoutputStream =  FileSystem.get(path.toUri(), new Configuration()).create(path);
  14. //创建要合并的小文件路径
  15. File sourceDir = new File("C:\\Windows\\System32\\drivers\\etc");
  16. //遍历小文件
  17. for (File fileName : sourceDir.listFiles()){
  18. //创建输入流
  19. //fileInputStream = new FileInputStream(fileName.getAbsolutePath());
  20. //只有这样才可以制定字符编码(没办法,Window是默认GBK的,Hadoop是默认UTF-8的,所以读的时候就会乱码)
  21. inputStreamReader = new InputStreamReader(new FileInputStream(fileName), "gbk");
  22. //一行一行的读取
  23. List<String> readLines = IOUtils.readLines(inputStreamReader);
  24. //然后再写出去
  25. for (String line : readLines){
  26. //写入一行
  27. fsDataoutputStream.write(line.getBytes());
  28. //写入一个换行符
  29. fsDataoutputStream.write("\n".getBytes());
  30. }
  31. }
  32. System.out.println("合并成功");
  33. } catch (Exception e) {
  34. e.printStackTrace();
  35. } finally{
  36. try {
  37. inputStreamReader.close();
  38. fsDataoutputStream.close();
  39. } catch (IOException e) {
  40. e.printStackTrace();
  41. }
  42. }
  43. }
  44. }

注:这种方案是使用java文件相关操作,将众多的小文件写到一个文件中。

3.2 使用archive工具

[java] view plaincopy

  1. 创建文件 hadoop archive -archiveName xxx.har -p  /src  /dest
  2. 查看内部结构 hadoop fs -lsr /dest/xxx.har
  3. 查看内容 hadoop fs -lsr har:///dest/xxx.har

3.3 使用SequenceFile或者MapFile(以SequenceFile为例)

提供两种将小文件打成SequenceFile的方法:
方法一:

[java] view plaincopy

  1. public class WriteSequenceMapReduce {
  2. // 定义输入路径
  3. private static final String INPUT_PATH = "hdfs://master:9000/files";
  4. // 定义输出路径
  5. private static final String OUT_PATH = "hdfs://master:9000/seq/";
  6. //定义文件系统
  7. private static FileSystem fileSystem = null;
  8. public static void main(String[] args) {
  9. try {
  10. // 创建配置信息
  11. Configuration conf = new Configuration();
  12. // 创建文件系统
  13. fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
  14. // 如果输出目录存在,我们就删除
  15. if (fileSystem.exists(new Path(OUT_PATH))) {
  16. fileSystem.delete(new Path(OUT_PATH), true);
  17. }
  18. // 创建任务
  19. Job job = new Job(conf, WriteSequenceMapReduce.class.getName());
  20. // 1.1 设置输入目录和设置输入数据格式化的类
  21. FileInputFormat.setInputPaths(job, INPUT_PATH);
  22. // 1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
  23. job.setMapperClass(WriteSequenceMapper.class);
  24. // 2.3 指定输出的路径和设置输出的格式化类
  25. FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
  26. // 提交作业 退出
  27. System.exit(job.waitForCompletion(true) ? 0 : 1);
  28. } catch (Exception e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. public static class WriteSequenceMapper extends Mapper<LongWritable, Text, Text, BytesWritable> {
  33. // 定义SequenceFile.Reader对象用于读文件
  34. private static SequenceFile.Writer writer = null;
  35. // 定义配置信息
  36. private static Configuration conf = null;
  37. // 定义最终输出的key和value
  38. private Text outkey = new Text();
  39. private BytesWritable outValue = new BytesWritable();
  40. //定义要合并的文件(存放在数组中)
  41. private FileStatus[] files = null;
  42. //定义输入流和一个字节数组
  43. private InputStream inputStream = null;
  44. private byte[] buffer = null;
  45. @Override
  46. protected void setup(Mapper<LongWritable, Text, Text, BytesWritable>.Context context) throws IOException, InterruptedException {
  47. try {
  48. // 创建配置信息
  49. conf = new Configuration();
  50. // 创建Path对象
  51. Path path = new Path(INPUT_PATH);
  52. // 创建SequenceFile.Writer对象,并指定压缩格式
  53. writer = SequenceFile.createWriter(fileSystem,conf, new Path(OUT_PATH+"/total.seq"), Text.class, BytesWritable.class, CompressionType.BLOCK, new BZip2Codec());
  54. //writer = SequenceFile.createWriter(fileSystem,conf, new Path(OUT_PATH+"/total.seq"), Text.class, BytesWritable.class);
  55. //获取要合并的文件数组
  56. files = fileSystem.listStatus(path);
  57. } catch (Exception e) {
  58. e.printStackTrace();
  59. }
  60. }
  61. @Override
  62. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, BytesWritable>.Context context) throws IOException, InterruptedException {
  63. //遍历文件数组
  64. for (int i=0; i<files.length; i++){
  65. //将文件名作为输出的key
  66. outkey.set(files[i].getPath().toString());
  67. //创建输入流
  68. inputStream = fileSystem.open(files[i].getPath());
  69. //创建字节数组
  70. buffer = new byte[(int) files[i].getLen()];
  71. //通过工具类将文件读到字节数组中
  72. IOUtils.readFully(inputStream, buffer, 0, buffer.length);
  73. //将字节数组中的内容及单个文件的内容作为value输出
  74. outValue.set(new BytesWritable(buffer));
  75. //关闭输入流
  76. IOUtils.closeStream(inputStream);
  77. //将结果写到Sequencefile中
  78. writer.append(outkey, outValue);
  79. }
  80. //关闭流
  81. IOUtils.closeStream(writer);
  82. //System.exit(0);
  83. }
  84. }
  85. }

方法二:自定义InputFormat和RecordReader实现

[java] view plaincopy

  1. public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{
  2. @Override
  3. public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
  4. //创建自定义的RecordReader
  5. WholeFileRecordReader reader = new WholeFileRecordReader();
  6. reader.initialize(split, context);
  7. return reader;
  8. }
  9. @Override
  10. protected boolean isSplitable(JobContext context, Path filename) {
  11. return false;
  12. }
  13. }

[java] view plaincopy

  1. public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable>{
  2. private FileSplit fileSplit;
  3. private Configuration conf;
  4. private BytesWritable value = new BytesWritable();
  5. private boolean processed = false;
  6. public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException{
  7. this.fileSplit = (FileSplit) split;
  8. this.conf = context.getConfiguration();
  9. }
  10. /**
  11. * process表示记录是否已经被处理过了
  12. */
  13. @Override
  14. public boolean nextKeyValue() throws IOException, InterruptedException {
  15. if (!processed){
  16. byte[] contents = new byte[(int) fileSplit.getLength()];
  17. //获取路径
  18. Path file = fileSplit.getPath();
  19. //创建文件系统
  20. FileSystem fileSystem = file.getFileSystem(conf);
  21. FSDataInputStream in = null;
  22. try {
  23. //打开文件
  24. in = fileSystem.open(file);
  25. //将file文件中的内容放入contents数组中。使用了IOUtils工具类的readFully()方法,将in流中的内容读到contents字节数组中
  26. IOUtils.readFully(in, contents, 0, contents.length);
  27. //BytesWritable是一个可用做key或value的字节序列,而ByteWritable是单个字节
  28. //将value的内容设置为contents的值
  29. value.set(contents, 0, contents.length);
  30. } catch (Exception e) {
  31. e.printStackTrace();
  32. } finally{
  33. IOUtils.closeStream(in);
  34. }
  35. processed = true;
  36. return true;
  37. }
  38. return false;
  39. }
  40. @Override
  41. public NullWritable getCurrentKey() throws IOException, InterruptedException {
  42. return NullWritable.get();
  43. }
  44. @Override
  45. public BytesWritable getCurrentValue() throws IOException, InterruptedException {
  46. return value;
  47. }
  48. @Override
  49. public float getProgress() throws IOException, InterruptedException {
  50. return processed ? 1.0f : 0.0f;
  51. }
  52. @Override
  53. public void close() throws IOException {
  54. //do nothing
  55. }
  56. }

[java] view plaincopy

  1. public class SmallFilesToSequenceFileConverter {
  2. // 定义输入路径
  3. private static final String INPUT_PATH = "hdfs://master:9000/files/*";
  4. // 定义输出路径
  5. private static final String OUT_PATH = "hdfs://<span style="font-family: Arial, Helvetica, sans-serif;">master</span>:9000/seq/total.seq";
  6. public static void main(String[] args) {
  7. try {
  8. // 创建配置信息
  9. Configuration conf = new Configuration();
  10. // 创建文件系统
  11. FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
  12. // 如果输出目录存在,我们就删除
  13. if (fileSystem.exists(new Path(OUT_PATH))) {
  14. fileSystem.delete(new Path(OUT_PATH), true);
  15. }
  16. // 创建任务
  17. Job job = new Job(conf, SmallFilesToSequenceFileConverter.class.getName());
  18. //1.1   设置输入目录和设置输入数据格式化的类
  19. FileInputFormat.addInputPaths(job, INPUT_PATH);
  20. job.setInputFormatClass(WholeFileInputFormat.class);
  21. //1.2   设置自定义Mapper类和设置map函数输出数据的key和value的类型
  22. job.setMapperClass(SequenceFileMapper.class);
  23. job.setMapOutputKeyClass(Text.class);
  24. job.setMapOutputValueClass(BytesWritable.class);
  25. //1.3   设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
  26. job.setPartitionerClass(HashPartitioner.class);
  27. //千万不要有这句话,否则单个小文件的内容会输出到单独的一个Sequencefile文件中(简直内伤)
  28. //job.setNumReduceTasks(0);
  29. FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
  30. job.setOutputFormatClass(SequenceFileOutputFormat.class);
  31. // 此处的设置是最终输出的key/value,一定要注意!
  32. job.setOutputKeyClass(Text.class);
  33. job.setOutputValueClass(BytesWritable.class);
  34. // 提交作业 退出
  35. System.exit(job.waitForCompletion(true) ? 0 : 1);
  36. } catch (Exception e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. public static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
  41. // 定义文件的名称作为key
  42. private Text fileNameKey = null;
  43. /**
  44. * task调用之前,初始化fileNameKey
  45. */
  46. @Override
  47. protected void setup(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context) throws IOException, InterruptedException {
  48. // 获取分片
  49. InputSplit split = context.getInputSplit();
  50. // 获取输入目录
  51. Path path = ((FileSplit) split).getPath();
  52. // 设置fileNameKey
  53. fileNameKey = new Text(path.toString());
  54. }
  55. @Override
  56. protected void map(NullWritable key, BytesWritable value, Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context) throws IOException,
  57. InterruptedException {
  58. // 将fileNameKey作为输出的key(文件名),value作为输出的value(单个小文件的内容)
  59. System.out.println(fileNameKey.toString());
  60. context.write(fileNameKey, value);
  61. }
  62. }
  63. }

注:方法二的这三个类可以实现将小文件写到一个SequenceFile中。

读取SequenceFile文件:

[java] view plaincopy

  1. public class ReadSequenceMapReduce {
  2. // 定义输入路径
  3. private static final String INPUT_PATH = "hdfs://master:9000/seq/total.seq";
  4. // 定义输出路径
  5. private static final String OUT_PATH = "hdfs://<span style="font-family: Arial, Helvetica, sans-serif;">master</span>:9000/seq/out";
  6. //定义文件系统
  7. private static FileSystem fileSystem = null;
  8. public static void main(String[] args) {
  9. try {
  10. // 创建配置信息
  11. Configuration conf = new Configuration();
  12. // 创建文件系统
  13. fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
  14. // 如果输出目录存在,我们就删除
  15. if (fileSystem.exists(new Path(OUT_PATH))) {
  16. fileSystem.delete(new Path(OUT_PATH), true);
  17. }
  18. // 创建任务
  19. Job job = new Job(conf, ReadSequenceMapReduce.class.getName());
  20. // 1.1 设置输入目录和设置输入数据格式化的类
  21. FileInputFormat.setInputPaths(job, INPUT_PATH);
  22. // 这个很重要,指定使用SequenceFileInputFormat类来处理我们的输入文件
  23. job.setInputFormatClass(SequenceFileInputFormat.class);
  24. // 1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
  25. job.setMapperClass(ReadSequenceMapper.class);
  26. job.setMapOutputKeyClass(Text.class);
  27. job.setMapOutputValueClass(Text.class);
  28. // 1.3 设置分区和reduce数量
  29. job.setPartitionerClass(HashPartitioner.class);
  30. job.setNumReduceTasks(0);
  31. // 最终输出的类型
  32. job.setOutputKeyClass(Text.class);
  33. job.setOutputValueClass(Text.class);
  34. // 2.3 指定输出的路径和设置输出的格式化类
  35. FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
  36. job.setOutputFormatClass(TextOutputFormat.class);
  37. // 提交作业 退出
  38. System.exit(job.waitForCompletion(true) ? 0 : 1);
  39. } catch (Exception e) {
  40. e.printStackTrace();
  41. }
  42. }
  43. public static class ReadSequenceMapper extends Mapper<Text, BytesWritable, Text, Text> {
  44. //定义SequenceFile.Reader对象用于读文件
  45. private static SequenceFile.Reader reader = null;
  46. //定义配置信息
  47. private static Configuration conf = null;
  48. //定义最终输出的value
  49. private Text outValue = new Text();
  50. /**
  51. * 在setUp()函数中初始化相关对象
  52. */
  53. @Override
  54. protected void setup(Mapper<Text, BytesWritable, Text, Text>.Context context) throws IOException, InterruptedException {
  55. try {
  56. // 创建配置信息
  57. conf = new Configuration();
  58. // 创建文件系统
  59. //FileSystem fs = FileSystem.get(new URI(INPUT_PATH), conf);
  60. // 创建Path对象
  61. Path path = new Path(INPUT_PATH);
  62. // 创建SequenceFile.Reader对象
  63. reader = new SequenceFile.Reader(fileSystem, path, conf);
  64. } catch (Exception e) {
  65. e.printStackTrace();
  66. }
  67. }
  68. @Override
  69. protected void map(Text key, BytesWritable value, Mapper<Text, BytesWritable, Text, Text>.Context context) throws IOException, InterruptedException {
  70. if (!"".equals(key.toString())  && !"".equals(value.get())){
  71. //设置输出的value
  72. outValue.set(new String(value.getBytes(), 0, value.getLength()));
  73. //把结果写出去
  74. context.write(key, outValue);
  75. }
  76. }
  77. }
  78. }

3.4 使用CombineFileInputFormat

详细内容见:http://shiyanjun.cn/archives/299.html

时间: 2024-08-08 05:36:33

Hadoop小文件问题及解决方案的相关文章

Hive优化之小文件问题及其解决方案

小文件是如何产生的 1.动态分区插入数据,产生大量的小文件,从而导致map数量剧增. 2.reduce数量越多,小文件也越多(reduce的个数和输出文件是对应的). 3.数据源本身就包含大量的小文件. 小文件问题的影响 1.从Hive的角度看,小文件会开很多map,一个map开一个JVM去执行,所以这些任务的初始化,启动,执行会浪费大量的资源,严重影响性能. 2.在HDFS中,每个小文件对象约占150byte,如果小文件过多会占用大量内存.这样NameNode内存容量严重制约了集群的扩展. 小

hadoop小文件存档

hadoop小文件存档1.HDFS存档小文件弊端 每个文件均按块存储,每个块的元数据存储在NameNode的内存中,因此HDFS存储小文件会非常低效.因为大量的小文件会耗尽NameNode中的大部分内存.但注意,存储小文件所需的磁盘容量和数据块的大小无关.例如,一个1M的文件设置为128M的块存储,实际使用的是1M的磁盘你空间.2.解决存储小文件办法之一 HDFS存文档文件或HAR文件,是一个更高效的文件存档工具,它将文件存入HDFS块,在减少NameNode内存使用的同时,允许对文件进行透明的

[Hadoop]大量小文件问题及解决方案

1. HDFS上的小文件问题 小文件是指文件大小明显小于HDFS上块(block)大小(默认64MB)的文件.如果存储小文件,必定会有大量这样的小文件,否则你也不会使用Hadoop(If you're storing small files, then you probably have lots of them (otherwise you wouldn't turn to Hadoop)),这样的文件给hadoop的扩展性和性能带来严重问题.当一个文件的大小小于HDFS的块大小(默认64MB

大数据-Hadoop小文件问题解决方案

HDFS中小文件是指文件size小于HDFS上block(dfs block size)大小的文件.大量的小文件会给Hadoop的扩展性和性能带来严重的影响.HDFS中小文件是指文件size小于HDFS上block大小的文件.大量的小文件会给Hadoop的扩展性和性能带来严重的影响. 大数据学习群:716581014 小文件是如何产生的? 动态分区插入数据,产生大量的小文件,从而导致map数量剧增 reduce数量越多,小文件也越多,reduce的个数和输出文件个数一致 数据源本身就是大量的小文

Hive之小文件问题及其解决方案

小文件如何产生 1.动态分区插入数据,产生大量小文件,导致map数剧增 2.Reduce数越多,小文件越多 3.数据直接导入小文件 小文件的影响 从hive的角度看,小文件会开很多map,一个map开一个jvm去执行,所以这些任务的初始化,启动,执行浪费大量资源,严重影响集群性能 在HDFS中,每个小文件对象越占150byte,如果小文件过多会占用大量内存.这样namenode内存容量严重制约了集群的扩展. 解决思路 使用sequence file作为表的存储格式,不要用TextFile 减少R

hadoop小文件合并

1.背景 在实际项目中,输入数据往往是由许多小文件组成,这里的小文件是指小于HDFS系统Block大小的文件(默认128M), 然而每一个存储在HDFS中的文件.目录和块都映射为一个对象,存储在NameNode服务器内存中,通常占用150个字节. 如果有1千万个文件,就需要消耗大约3G的内存空间.如果是10亿个文件呢,简直不可想象.所以在项目开始前, 我们选择一种适合的方案来解决本项目的小文件问题 2.介绍 本地 D:\data目录下有 2012-09-17 至 2012-09-23 一共7天的

Hadoop小文件解决之道之一 Hadoop archive

简介 hdfs并不擅长存储小文件,因为每个文件最少一个block,每个block的元数据都会在namenode节点占用内存,如果存在这样大量的小文件,它们会吃掉namenode节点的大量内存. hadoop Archives可以有效的处理以上问题,他可以把多个文件归档成为一个文件,归档成一个文件后还可以透明的访问每一个文件,并且可以做为mapreduce任务的输入. 用法 hadoop Archives可以使用archive工具创建,同上一篇讲的distcp一样,archive也是一个mapre

Hadoop的小文件解决方案

小文件指的是那些size比HDFS的block size(默认64M)小的多的文件.任何一个文件,目录和block,在HDFS中都会被表示为一个object存储在namenode的内存中,每一个object占用150bytes的内存空间.所以,如果有10million(一千万)个文件,每一个文件对应一个block,那么就将要消耗namenode3G的内存来保存这些block的信息,如果规模再大一些,那么将会超出现阶段计算机硬件所能满足的极限. 相同大小下,小文件越多,对namenode造成的内存

Hadoop对小文件的解决方案

小文件指的是那些size比HDFS的block size(默认64M)小的多的文件.任何一个文件,目录和block,在HDFS中都会被表示为一个object存储在namenode的内存中, 每一个object占用150 bytes的内存空间.所以,如果有10million个文件, 每一个文件对应一个block,那么就将要消耗namenode 3G的内存来保存这些block的信息.如果规模再大一些,那么将会超出现阶段计算机硬件所能满足的极限. 控制小文件的方法有: 1.应用程序自己控制 2.arc