hadoop输入分片计算(Map Task个数的确定)

  作业从JobClient端的submitJobInternal()方法提交作业的同时,调用InputFormat接口的getSplits()方法来创建split。默认是使用InputFormat的子类FileInputFormat来计算分片,而split的默认实现为FileSplit(其父接口为InputSplit)。这里要注意,split只是逻辑上的概念,并不对文件做实际的切分。一个split记录了一个Map Task要处理的文件区间,所以分片要记录其对应的文件偏移量以及长度等。每个split由一个Map Task来处理,所以有多少split,就有多少Map Task。下面着重分析这个方法:

 1 public List<InputSplit> getSplits(JobContext job
 2                                     ) throws IOException {
 3     //getFormatMinSplitSize():始终返回1
 4     //getMinSplitSize(job):获取” mapred.min.split.size”的值,默认为1
 5     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
 6
 7     //getMaxSplitSize(job):获取"mapred.max.split.size"的值,
 8     //默认配置文件中并没有这一项,所以其默认值为” Long.MAX_VALUE”,即2^63 – 1
 9     long maxSize = getMaxSplitSize(job);
10
11     // generate splits
12     List<InputSplit> splits = new ArrayList<InputSplit>();
13     List<FileStatus>files = listStatus(job);
14     for (FileStatus file: files) {
15       Path path = file.getPath();
16       FileSystem fs = path.getFileSystem(job.getConfiguration());
17       long length = file.getLen();
18       BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
19       if ((length != 0) && isSplitable(job, path)) {
20         long blockSize = file.getBlockSize();
21         //计算split大小
22         long splitSize = computeSplitSize(blockSize, minSize, maxSize);
23
24         //计算split个数
25         long bytesRemaining = length;    //bytesRemaining表示剩余字节数
26         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //SPLIT_SLOP=1.1
27           int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
28           splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
29                                    blkLocations[blkIndex].getHosts()));
30           bytesRemaining -= splitSize;
31         }
32
33         if (bytesRemaining != 0) {
34           splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
35                      blkLocations[blkLocations.length-1].getHosts()));
36         }
37       } else if (length != 0) {
38         splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
39       } else {
40         //Create empty hosts array for zero length files
41         splits.add(new FileSplit(path, 0, length, new String[0]));
42       }
43     }
44
45     // Save the number of input files in the job-conf
46     job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
47
48     LOG.debug("Total # of splits: " + splits.size());
49     return splits;
50   }

  首先计算分片的下限和上限:minSize和maxSize,具体的过程在注释中已经说清楚了。接下来用这两个值再加上blockSize来计算实际的split大小,过程也很简单,具体代码如下:

1 protected long computeSplitSize(long blockSize, long minSize,
2                                   long maxSize) {
3     return Math.max(minSize, Math.min(maxSize, blockSize));
4   }

  接下来就是计算实际的分片个数了。针对每个输入文件,计算input split的个数。while循环的含义如下:

  a)  文件剩余字节数/splitSize>1.1,创建一个split,这个split的字节数=splitSize,文件剩余字节数=文件大小 - splitSize

  b)  文件剩余字节数/splitSize<1.1,剩余的部分全都作为一个split(这主要是考虑到,不用为剩余的很少的字节数一些启动一个Map Task)

  

  我们发现,在默认配置下,split大小和block大小是相同的。这是不是为了防止这种情况:

一个split如果对应的多个block,若这些block大多不在本地,则会降低Map Task的本地性,降低效率。

  到这里split的划分就介绍完了,但是有两个问题需要考虑:

1、如果一个record跨越了两个block该怎么办?

  这个可以看到,在Map Task读取block的时候,每次是读取一行的,如果发现块的开头不是上一个文件的结束,那么抛弃第一条record,因为这个record会被上一个block对应的Map Task来处理。那么,第二个问题来了:

2、上一个block对应的Map Task并没有最后一条完整的record,它又该怎么办?

  一般来说,Map Task在读block的时候都会多读后续的几个block,以处理上面的这种情况。不过这部分的代码我还没有看到,等看到了再补充吧。

  本文基于hadoop1.2.1

  如有错误,还请指正

  参考文章:《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》 董西成

  转载请注明出处:http://www.cnblogs.com/gwgyk/p/4113929.html

时间: 2024-10-07 06:32:38

hadoop输入分片计算(Map Task个数的确定)的相关文章

hadoop 分片与分块,map task和reduce task的理解

分块:Block HDFS存储系统中,引入了文件系统的分块概念(block),块是存储的最小单位,HDFS定义其大小为64MB.与单磁盘文件系统相似,存储在 HDFS上的文件均存储为多个块,不同的是,如果某文件大小没有到达64MB,该文件也不会占据整个块空间.在分布式的HDFS集群上,Hadoop系统保证一个块存储在一个datanode上. 把File划分成Block,这个是物理上真真实实的进行了划分,数据文件上传到HDFS里的时候,需要划分成一块一块,每块的大小由hadoop-default.

如何在hadoop中控制map的个数

hadooop提供了一个设置map个数的参数mapred.map.tasks,我们可以通过这个参数来控制map的个数.但是通过这种方式设置map 的个数,并不是每次都有效的.原因是mapred.map.tasks只是一个hadoop的参考数值,最终map的个数,还取决于其他的因素. 为了方便介绍,先来看几个名词: block_size : hdfs的文件块大小,默认为64M,可以通过参数dfs.block.size设置 total_size : 输入文件整体的大小 input_file_num

(转) 通过input分片的大小来设置map的个数

摘要 通过input分片的大小来设置map的个数 map inputsplit hadoop 前言:在具体执行Hadoop程序的时候,我们要根据不同的情况来设置Map的个数.除了设置固定的每个节点上可运行的最大map个数外,我们还需要控制真正执行Map操作的任务个数. 1.如何控制实际运行的map任务个数 我们知道,文件在上传到Hdfs文件系统的时候,被切分成不同的Block块(默认大小为64MB).但是每个Map处理的分块有时候并不是系统的物理Block块大小.实际处理的输入分块的大小是根据I

字符拆分存入Map计算单词的个数

///计算从命令行输入单词的种类与个数//Map<key,Value>Key-->单词:Value-->数量

51.从键盘上输入任意两个数和一个运算符(+、-、*、/),根据输入的运算符对两个数计算,并输出结果

?#include<iostream> using namespace std; int main() { int x,y; char a; cout<<"please input two numbers: "<<endl; cin>>x>>y; cout<<"please input an operational character:"<<endl; cin>>a; s

Task运行过程分析3——Map Task内部实现

Map Task内部实现 在Task运行过程分析2中提到,MapTask分为4种,分别是Job-setup Task.Job-cleanup Task.Task-cleanup Task和Map Task.其中,Job-setup Task和Job-cleanup Task分别是作业运行时启动的第一个任务和最后一个任务,主要工作分别是进行一些作业初始化和收尾工作,比如创建和删除作业临时输出目录:而Task-cleanup Task则是任务失败或者被杀死后,用于清理已写入临时目录中数据的任务.本文

mapreduce map 的个数

在map阶段读取数据前,FileInputFormat会将输入文件分割成split.split的个数决定了map的个数.影响map个数(split个数)的主要因素有: 1) 文件的大小.当块(dfs.block.size)为128m时,如果输入文件为128m,会被划分为1个split:当块为256m,会被划分为2个split. 2) 文件的个数.FileInputFormat按照文件分割split,并且只会分割大文件,即那些大小超过HDFS块的大小的文件.如果HDFS中dfs.block.siz

8.2.1输入分片InputSplit和输入处理格式FileInputFormat

1.1.1         输入分片和记录 (1)输入分片InputSplit接口 输入分片一般是文件,也可以数据库中的若干行.记录对应一行数据.输入分片在java表示为InputSplit接口,getlength函数返回大小,用于分片排序,大的先处理.Getlocation函数返回分片位置,让map任务尽量本地化.分片并不包含数据本身,而是指向数据的索引. public abstract class InputSplit { /** * Get the size of the split, s

Map Task内部实现分析

转自:http://blog.csdn.net/androidlushangderen/article/details/41142795 上篇我刚刚学习完,Spilt的过程,还算比较简单的了,接下来学习的就是Map操作的过程了,Map和Reduce一样,是整个MapReduce的重要内容,所以,这一篇,我会好好的讲讲里面的内部实现过程.首先要说,MapTask,分为4种,可能这一点上有人就可能知道了,分别是Job-setup Task,Job-cleanup Task,Task-cleanup和