Yarn下Map数控制

public List<InputSplit> getSplits(JobContext job) throws IOException {
        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
        long maxSize = getMaxSplitSize(job);

        List splits = new ArrayList();
        List files = listStatus(job);
        for (FileStatus file : files) {
            Path path = file.getPath();
            long length = file.getLen();
            if (length != 0L) {
                FileSystem fs = path.getFileSystem(job.getConfiguration());
                BlockLocation[] blkLocations = fs.getFileBlockLocations(file,
                        0L, length);
                if (isSplitable(job, path)) {
                    long blockSize = file.getBlockSize();
                    long splitSize = computeSplitSize(blockSize, minSize,
                            maxSize);

                    long bytesRemaining = length;
                    while (bytesRemaining / splitSize > 1.1D) {
                        int blkIndex = getBlockIndex(blkLocations, length
                                - bytesRemaining);
                        splits.add(makeSplit(path, length - bytesRemaining,
                                splitSize, blkLocations[blkIndex].getHosts()));

                        bytesRemaining -= splitSize;
                    }

                    if (bytesRemaining != 0L) {
                        int blkIndex = getBlockIndex(blkLocations, length
                                - bytesRemaining);
                        splits.add(makeSplit(path, length - bytesRemaining,
                                bytesRemaining,
                                blkLocations[blkIndex].getHosts()));
                    }
                } else {
                    splits.add(makeSplit(path, 0L, length,
                            blkLocations[0].getHosts()));
                }
            } else {
                splits.add(makeSplit(path, 0L, length, new String[0]));
            }
        }

        job.getConfiguration().setLong(
                "mapreduce.input.fileinputformat.numinputfiles", files.size());
        LOG.debug("Total # of splits: " + splits.size());
        return splits;
    }

Yarn 下好像没了1*下的由用户设置预期的Map数

核心代码

long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));

getFormatMinSplitSize 默认返回1,getMinSplitSize 为用户设置的最小分片数, 如果用户设置的大于1,则为用户设置的最小分片数
long maxSize = getMaxSplitSize(job);

getMaxSplitSize为用户设置的最大分片数,默认最大为9223372036854775807L

long splitSize = computeSplitSize(blockSize, minSize,
                            maxSize);

protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
        return Math.max(minSize, Math.min(maxSize, blockSize));
    }

测试 文件大小 297M(311349250)

块大小128M

测试代码

测试1

FileInputFormat.setMinInputSplitSize(job, 301349250);
   FileInputFormat.setMaxInputSplitSize(job, 10000);

测试后Map个数为1,由上面分片公式算出分片大小为301349250, 比 311349250小, 理论应该为两个map,  再看分片函数

while (bytesRemaining / splitSize > 1.1D) {
                        int blkIndex = getBlockIndex(blkLocations, length
                                - bytesRemaining);
                        splits.add(makeSplit(path, length - bytesRemaining,
                                splitSize, blkLocations[blkIndex].getHosts()));

bytesRemaining -= splitSize;
                    }

只要剩余的文件大小不超过分片大小的1.1倍, 则会分到一个分片中,避免开两个MAP, 其中一个运行数据太小,浪费资源。

测试2

FileInputFormat.setMinInputSplitSize(job, 150*1024*1024);

FileInputFormat.setMaxInputSplitSize(job, 10000);

MAP 数为2

测试3

在原有的输入目录下,添加一个很小的文件,几K,测试是否会合并

FileInputFormat.setMinInputSplitSize(job, 150*1024*1024);
FileInputFormat.setMaxInputSplitSize(job, 10000);

Map数变为了3

看源代码

for (FileStatus file : files) {

..

}

原来输入是按照文件名来分片的,这个按照常理也能知道, 不同的文件内容格式不同

总结,分片过程大概为,先遍历目标文件,过滤部分不符合要求的文件, 然后添加到列表,然后按照文件名来切分分片 (大小为前面计算分片大小的公式, 最后有个文件尾可能合并,其实常写网络程序的都知道), 然后添加到分片列表,然后每个分片读取自身对应的部分给MAP处理

Yarn下Map数控制

时间: 2024-11-08 07:05:06

Yarn下Map数控制的相关文章

在YARN中,如何控制和监控map/reduce的并发数

配置建议: 1.     In MR1, the mapred.tasktracker.map.tasks.maximum and mapred.tasktracker.reduce.tasks.maximum properties dictated how many map and reduce slots each TaskTracker had. These properties no longer exist in YARN. Instead, YARN uses yarn.nodema

Hive参数层面优化之一控制Map数

1.Map个数的决定因素 通常情况下,作业会通过input文件产生一个或者多个map数: Map数主要的决定因素有: input总的文件个数,input文件的大小和集群中设置的block的大小(在hive中可以通过set dfs.block.size命令查看,该参数不能自定义修改): 文件块数拆分原则:如果文件大于块大小(128M),那么拆分:如果小于,则把该文件当成一个块. 举例一: 假设input目录下有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个块(6个128m的块和

hive优化之------控制hive任务中的map数和reduce数

.    控制hive任务中的map数:  1.    通常情况下,作业会通过input的目录产生一个或者多个map任务. 主要的决定因素有: input的文件总个数,input的文件大小,集群设置的文件块大小(目前为128M, 可在hive中通过set dfs.block.size;命令查看到,该参数不能自定义修改): 2.    举例: a)    假设input目录下有1个文件a,大小为780M,那么Hadoop会将该文件a分隔成7个块(6个128m的块和1个12m的块),从而产生7个ma

hive优化----控制hive中的map数

1. 通常情况下,作业会通过input的目录产生一个或者多个map任务. 主要的决定因素有: input的文件总个数,input的文件大小,集群设置的文件块大小(目前为128M, 可在hive中通过set dfs.block.size;命令查看到,该参数不能自定义修改): 2. 举例:a) 假设input目录下有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个块(6个128m的块和1个12m的块),从而产生7个map数b) 假设input目录下有3个文件a,b,c,大小分别为1

【转】hive优化之--控制hive任务中的map数和reduce数

一.    控制hive任务中的map数:  1.    通常情况下,作业会通过input的目录产生一个或者多个map任务. 主要的决定因素有: input的文件总个数,input的文件大小,集群设置的文件块大小(目前为128M, 可在hive中通过set dfs.block.size;命令查看到,该参数不能自定义修改): 2.    举例: a)    假设input目录下有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个块(6个128m的块和1个12m的块),从而产生7个m

hive优化之——控制hive任务中的map数和reduce数

一.    控制hive任务中的map数: 1.    通常情况下,作业会通过input的目录产生一个或者多个map任务.主要的决定因素有: input的文件总个数,input的文件大小,集群设置的文件块大小(目前为128M, 可在hive中通过set dfs.block.size;命令查看到,该参数不能自定义修改): 2.    举例:a)    假设input目录下有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个块(6个128m的块和1个12m的块),从而产生7个map数

Linux Shell多进程并发以及并发数控制

1. 基础知识准备 1.1. linux后台进程 Unix是一个多任务系统,允许多用户同时运行多个程序.shell的元字符&提供了在后台运行不需要键盘输入的程序的方法.输入命令后,其后紧跟&字符,该命令就会被送往到linux后台执行,而终端又可以继续输入下一个命令了. 比如: sh a.sh & sh b.sh & sh c.sh & 这三个命令就会被同时送往linux后台执行,在这个程度上,认为这三个命令并发执行了. 1.2. linux文件描述符 文件描述符(缩

Hadoop中map数的计算

转载▼ Hadoop中在计算一个JOB需要的map数之前首先要计算分片的大小.计算分片大小的公式是: goalSize = totalSize / mapred.map.tasks minSize = max {mapred.min.split.size, minSplitSize} splitSize = max (minSize, min(goalSize, dfs.block.size)) totalSize是一个JOB的所有map总的输入大小,即Map input bytes.参数map

关于Mapreduce On Yarn中Map数量的设置

同事最近对MR on Yarn中Map数量的一个小的研究,描述如下: 在新版MapReduce 中即 MR on yarn中,不支持设置Map数量. Map的数量是由MinInputSplitSize决定的,公式: Map的数量 = TotalSize / MinInputSplitSize 要想控制Map的数量,可以通过控制MinInputSplitSize大小来控制Map数量. 如果设置的MinInputSplitSize大于BlockSize,MinInputSplitSize即为设置的值