HDFS小文件合并问题的优化:copyMerge的改进

1.问题分析

用fsck命令统计 查看HDFS上在某一天日志的大小,分块情况以及平均的块大小,即

[[email protected] jar]$ hadoop fsck /wcc/da/kafka/report/2015-01-11
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

15/01/13 18:57:23 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Connecting to namenode via http://da-master:50070
FSCK started by hduser (auth:SIMPLE) from /172.21.101.30 for path /wcc/da/kafka/report/2015-01-11 at Tue Jan 13 18:57:24 CST 2015
....................................................................................................
....................................................................................................
........................................Status: HEALTHY
 Total size:	9562516137 B
 Total dirs:	1
 Total files:	240
 Total symlinks:		0
 Total blocks (validated):	264 (avg. block size 36221652 B)
 Minimally replicated blocks:	264 (100.0 %)
 Over-replicated blocks:	0 (0.0 %)
 Under-replicated blocks:	0 (0.0 %)
 Mis-replicated blocks:		0 (0.0 %)
 Default replication factor:	2
 Average block replication:	2.0
 Corrupt blocks:		0
 Missing replicas:		0 (0.0 %)
 Number of data-nodes:		5
 Number of racks:		1
FSCK ended at Tue Jan 13 18:57:24 CST 2015 in 14 milliseconds

The filesystem under path '/wcc/da/kafka/report/2015-01-11' is HEALTHY

用表格整理出来:


Date Time


Total(GB)


Total blocks


AveBlockSize(MB)


2014/12/21


9.39


268


36


2014/12/20


9.5


268


36


2014/12/19


8.89


268


34


2014/11/5


8.6


266


33


2014/10/1


9.31


268


36

分析问题的存在性:从表中可以看出,每天日志量的分块情况:总共大概有268左右的块数,平均块大小为36MB左右,远远不足128MB,这潜在的说明了一个问题。日志产生了很多小文件,大多数都不足128M,严重影响集群的扩展性和性能:首先,在HDFS中,任何block,文件或者目录在内存中均以对象的形式存储,每个对象约占150byte,如果有1000 0000个小文件,每个文件占用一个block,则namenode大约需要2G空间。如果存储1亿个文件,则namenode需要20G空间,这样namenode内存容量严重制约了集群的扩展;
其次,访问大量小文件速度远远小于访问几个大文件;HDFS最初是为流式访问大文件开发的,如果访问大量小文件,需要不断的从一个datanode跳到另一个datanode,严重影响性能;最后,处理大量小文件速度远远小于处理同等大小的大文件的速度,因为每一个小文件要占用一个slot,而task启动将耗费大量时间甚至大部分时间都耗费在启动task和释放task上,累计起来的总时长必然增加。我们采取的策略是先合并小文件,比如整理日志成user_report.tsv,client_report.tsv,AppLog_UserDevice.tsv,
再运行job。

2.解决方案

可以调用API的 FileUtil工具类的方法copyMerge(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource,Configuration conf, String addString);

但是此方法并不适用,因为某一天日志存在着三种类型的日志,即:

要分别合并成三个文件user_report.tsv,client_report.tsv和AppLog_UserDevice.tsv,故必须重新实现copyMerge函数,先分析copyMerge源码:

  /** Copy all files in a directory to one output file (merge). */
  public static boolean copyMerge(FileSystem srcFS, Path srcDir,
                                  FileSystem dstFS, Path dstFile,
                                  boolean deleteSource,
                                  Configuration conf, String addString) throws IOException {
	//生成合并后的目标文件路径dstFile,文件名为srcDir.getName(),即源路径的目录名,因此这里我们不能自定义生成的日志文件名,非常不方便
    dstFile = checkDest(srcDir.getName(), dstFS, dstFile, false);
	//判断源路径是否为文件目录
    if (!srcFS.getFileStatus(srcDir).isDirectory())
      return false;
   //创建输出流,准备写文件
    OutputStream out = dstFS.create(dstFile);

    try {
	 // 得到每个源路径目录下的每个文件
      FileStatus contents[] = srcFS.listStatus(srcDir);
	  //排序操作
      Arrays.sort(contents);
      for (int i = 0; i < contents.length; i++) {
        if (contents[i].isFile()) {
			//创建输入流,读文件
          InputStream in = srcFS.open(contents[i].getPath());
          try {
		  //执行复制操作,写入到目标文件中
            IOUtils.copyBytes(in, out, conf, false);
            if (addString!=null)
              out.write(addString.getBytes("UTF-8"));

          } finally {
            in.close();
          }
        }
      }
    } finally {
      out.close();
    }

	//若deleteSource为true,删除源路径目录下的每个文件
    if (deleteSource) {
      return srcFS.delete(srcDir, true);
    } else {
      return true;
    }
  }  

改进后:(这种方式只需要打开关闭输出流out 三次)

	/** Copy corresponding files in a directory to related output file (merge). */
	@SuppressWarnings("unchecked")
	public static boolean merge(FileSystem hdfs, Path srcDir, Path dstFile,
			boolean deleteSource, Configuration conf) throws IOException {
		if (!hdfs.getFileStatus(srcDir).isDirectory())
			return false;
		// 得到每个源目录下的每个文件;
		FileStatus[] fileStatus = hdfs.listStatus(srcDir);
		// 三种不同类型的文件各自的文件路径存入不同的list;
		ArrayList<Path> userPaths = new ArrayList<Path>();
		ArrayList<Path> clientPaths = new ArrayList<Path>();
		ArrayList<Path> appPaths = new ArrayList<Path>();
		for (FileStatus fileStatu : fileStatus) {
			Path filePath = fileStatu.getPath();
			if (filePath.getName().startsWith("user_report")) {
				userPaths.add(filePath);
			} else if (filePath.getName().startsWith("client_report")) {
				clientPaths.add(filePath);
			} else if (filePath.getName().startsWith("AppLog_UserDevice")) {
				appPaths.add(filePath);
			}
		}
		// 分别写入到目标文件:user_report.tsv中
		if (userPaths.size() > 0) {
			Path userDstFile = new Path(dstFile.toString() + "/user_report.tsv");
			OutputStream out = hdfs.create(userDstFile);
			Collections.sort(userPaths);
			try {
				Iterator<Path> iterator = userPaths.iterator();
				while (iterator.hasNext()) {
					InputStream in = hdfs.open(iterator.next());
					try {
						IOUtils.copyBytes(in, out, conf, false);
					} finally {
						in.close();
					}
				}
			} finally {
				out.close();
			}
		}
		// 分别写入到目标文件:client_report.tsv中
		if (clientPaths.size() > 0) {
			Path clientDstFile = new Path(dstFile.toString()
					+ "/client_report.tsv");
			OutputStream out = hdfs.create(clientDstFile);
			Collections.sort(clientPaths);
			try {
				Iterator<Path> iterator = clientPaths.iterator();
				while (iterator.hasNext()) {
					InputStream in = hdfs.open(iterator.next());
					try {
						IOUtils.copyBytes(in, out, conf, false);
					} finally {
						in.close();
					}
				}
			} finally {
				out.close();
			}
		}
		// 分别写入到目标文件:AppLog_UserDevice.tsv中
		if (appPaths.size() > 0) {
			Path appDstFile = new Path(dstFile.toString()
					+ "/AppLog_UserDevice.tsv");
			OutputStream out = hdfs.create(appDstFile);
			Collections.sort(appPaths);
			try {
				Iterator<Path> iterator = appPaths.iterator();
				while (iterator.hasNext()) {
					InputStream in = hdfs.open(iterator.next());
					try {
						IOUtils.copyBytes(in, out, conf, false);
					} finally {
						in.close();
					}
				}
			} finally {
				out.close();
			}
		}
		if (deleteSource) {
			return hdfs.delete(srcDir, true);
		}
		return true;
	}

当然你也可以这样:

	public static boolean mergeFiles(FileSystem hdfs, Path srcDir,
			Path dstFile, boolean deleteSource, Configuration conf)
			throws IOException {
		if (!hdfs.getFileStatus(srcDir).isDirectory())
			return false;
		// 得到每个源目录下的每个文件;
		FileStatus[] fileStatus = hdfs.listStatus(srcDir);
		// 三种不同类型的文件各自合并

		for (FileStatus fileStatu : fileStatus) {
			Path filePath = fileStatu.getPath();
			Path dstPath = new Path("");
			if (filePath.getName().startsWith("user_report")) {
				dstPath = new Path(dstFile.toString() + "/user_report.tsv");
			} else if (filePath.getName().startsWith("client_report")) {
				dstPath = new Path(dstFile.toString() + "/client_report.tsv");
			} else if (filePath.getName().startsWith("AppLog_UserDevice")) {
				dstPath = new Path(dstFile.toString() + "/client_report.tsv");
			}else{
				dstPath=new Path( "/error.tsv");
			}

			OutputStream out = hdfs.create(dstPath);
			try {
				InputStream in = hdfs.open(filePath);
				try {
					IOUtils.copyBytes(in, out, conf, false);
				} finally {
					in.close();
				}

			} finally {
				out.close();
			}

		}
		if (deleteSource) {
			return hdfs.delete(srcDir, true);
		}
		return true;
	}

3.总结

根据不同业务逻辑的需求,你可以自定义实现API接口函数。对于解决小文件合并问题,如果你有更好的策略,欢迎交流!

时间: 2024-12-26 18:27:47

HDFS小文件合并问题的优化:copyMerge的改进的相关文章

大数据技术之_05_Hadoop学习_04_MapReduce_Hadoop企业优化(重中之重)+HDFS小文件优化方法+MapReduce扩展案例+倒排索引案例(多job串联)+TopN案例+找博客共同粉丝案例+常见错误及解决方案

第6章 Hadoop企业优化(重中之重)6.1 MapReduce 跑的慢的原因6.2 MapReduce优化方法6.2.1 数据输入6.2.2 Map阶段6.2.3 Reduce阶段6.2.4 I/O传输6.2.5 数据倾斜问题6.2.6 常用的调优参数6.3 HDFS小文件优化方法6.3.1 HDFS小文件弊端6.3.2 HDFS小文件解决方案第7章 MapReduce扩展案例7.1 倒排索引案例(多job串联)7.2 TopN案例7.3 找博客共同粉丝案例第8章 常见错误及解决方案 第6章

HDFS操作及小文件合并

小文件合并是针对文件上传到HDFS之前 这些文件夹里面都是小文件 参考代码 package com.gong.hadoop2; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import or

hive小文件合并设置参数

Hive的后端存储是HDFS,它对大文件的处理是非常高效的,如果合理配置文件系统的块大小,NameNode可以支持很大的数据量.但是在数据仓库中,越是上层的表其汇总程度就越高,数据量也就越小.而且这些表通常会按日期进行分区,随着时间的推移,HDFS的文件数目就会逐渐增加. 小文件带来的问题 关于这个问题的阐述可以读一读Cloudera的这篇文章.简单来说,HDFS的文件元信息,包括位置.大小.分块信息等,都是保存在NameNode的内存中的.每个对象大约占用150个字节,因此一千万个文件及分块就

MR案例:小文件合并SequeceFile

SequeceFile是Hadoop API提供的一种二进制文件支持.这种二进制文件直接将<key, value>对序列化到文件中.可以使用这种文件对小文件合并,即将文件名作为key,文件内容作为value序列化到大文件中.这种文件格式有以下好处: 1). 支持压缩,且可定制为基于Record或Block压缩(Block级压缩性能较优)2). 本地化任务支持:因为文件可以被切分,因此MapReduce任务时数据的本地化情况应该是非常好的.3). 难度低:因为是Hadoop框架提供的API,业务

将小文件合并大文件上传

自定义方法将本地多个小文件合并成一个大文件上传到HDFS上. package test; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; impo

Hive merge(小文件合并)

当Hive的输入由很多个小文件组成时,如果不涉及文件合并的话,那么每个小文件都会启动一个map task. 如果文件过小,以至于map任务启动和初始化的时间大于逻辑处理的时间,会造成资源浪费,甚至发生OutOfMemoryError错误. 因此,当我们启动一个任务时,如果发现输入数据量小但任务数量多时,需要注意在Map前端进行输入小文件合并操作. 同理,向一个表写数据时,注意观察reduce数量,注意输出文件大小. 1. Map输入小文件合并 #每个Map处理的最大输入文件大小(256MB) s

hive压缩之小文件合并

Hive压缩之二 小文件合并 调研背景 当Hive输入由很多个小文件组成,由于每个小文件都会启动一个map任务,如果文件过小,以至于map任务启动和初始化的时间大于逻辑处理的时间,会造成资源浪费,甚至OOM.为此,当我们启动一个任务,发现输入数据量小但任务数量多时,需要注意在Map前端进行输入合并.当然,在我们向一个表写数据时,也需要注意输出文件大小. 输入合并 合并输入小文件,减少map数? 主要的决定因素有: input的文件总个数,input的文件大小,集群设置的文件块大小. 举例: a)

清理hdfs小文件shell脚本

清理hdfs小文件shell脚本 #!/bin/bash echo "--------------------------------------------------------------------------------------------------------------------" echo "BASH_VERSION: $BASH_VERSION" echo "参数说明: " echo "从外部只传递零个参数时,

hadoop 将HDFS上多个小文件合并到SequenceFile里

背景:hdfs上的文件最好和hdfs的块大小的N倍.如果文件太小,浪费namnode的元数据存储空间以及内存,如果文件分块不合理也会影响mapreduce中map的效率. 本例中将小文件的文件名作为key,其内容作为value生成SequenceFile 1.生成文件 //将目标目录的所有文件以文件名为key,内容为value放入SequenceFile中 //第一个参数是需要打包的目录,第二个参数生成的文件路径和名称 private static void combineToSequenceF