2019/2/19 星期二
MapReduce计算框架高级特性程序运行并发度
所谓的并发度,就是在MapReduce执行程序的过程中有多少个map task进程和reduce task进程,来一起完成程序的处理。
MapReduce就是把业务处理逻辑变成分布式来处理。
reduce task 数量的决定机制 //全局的聚合操作 由业务场景决定
1、业务逻辑需要
2、数据量大小
设置方法:
job.setNumReduceTasks(5)
//reduce task的数量不能够任意的指定,比如:我们在一大堆的英文文件中,我们要去统计有多少个单词,这个时候reduce task在全局 程序执行的过程中只能有1个,为什么呢?因为:需要让所有的map task 把所有的结果给到一个reduce task 这样reduce task 就会把所有的结果统计出来。在这种情况下reduce task就没办法用多个。
比如:在wordcount中,我们统计每一个单词出现的总次数,在这种情况下,reduce task就 可以由任意多个。因为,maptask处理的结果经过shuffle阶段之后,相同的单词只会出现在同一个reduce task中。我们可能会得到5个文件,但这5个文件中统计的单词数量是全局唯一的。
map task 数量的决定机制:
由于map task 之间没有协作关系,每一个map task 都是各自为政,在map task 的处理中没法做“全局”性的聚合操作,所以map task 的数量完全取决于所处理的数据量的大小
决定机制:
对待处理数据进行“切片”
每一个切片分配一个map task 来处理
Mapreduce 框架中默认的切片机制:
TextInputFormat.getSplits()继承自FileInputFormat.getSplits()
数据切片的思考
1:定义一个切片大小:可以通过参数来调节,默认情况下等于“hdfs 中设置的blocksize”,通常是128M //一定程度上会减少网络传输数据,但并不是绝对的。
2:获取输入数据目录下所有待处理文件List
3:遍历文件List,逐个逐个文件进行切片
for(file:List)
对file 从0 偏移量开始切,每到128M 就构成一个切片,比如a.txt(200M),就会被切成两个切片: a.txt: 0-128M, a.txt :128M-256M 再比如b.txt(80M),就会切成一个切片, b.txt :0-80M
如果要处理的数据是大量的小文件,使用上述这种默认切片机制,就会导致大量的切片,从而maptask 进程数特别多,但是每一个切片又非常小,每个maptask 的处理数据量就很小,从而,整体的效率会很低。通用解决方案:就是将多个小文件划分成一个切片;实现办法就是自定义一个Inputformat 子类重写里面的getSplits 方法;
Mapreduce 框架中自带了一个用于此场景的Inputformat 实现类:CombineFileInputformat
原文地址:http://blog.51cto.com/12445535/2351751