计数器
因为计数器的查看往往比查看集群日志来的方便快捷
所以有些情况下计数器信息比集群日志更加有效
用户自定义的计数器
关于Hadoop的内置计数器的介绍可以参考Hadoop权威指南第九章MapReduce Features中的Build-in Counts小节
这里限于篇幅不再说明
MapReduce允许用户在程序中使用枚举或者字符串的格式类自定义计数器
一个作业可以定义的计数器不限,使用枚举类型时
枚举类型的名称即为组名,枚举类型的字段即为计数器名
计数器是全局的,会跨越所有Mapper和Reducer进行使用,并在作业结束的时候产生一个结果
例如,现有枚举类型如下:
enum Temperature{
MISSING,
MALFORMAT
}
在MapReduce程序中可以这样来使用计数器:
context.getCounter(Temperature.MISSING).increment(1);
context.getCounter(Temperature.MALFORMAT).increment(1);
动态计数器
由于枚举类型在编译的时候就确定了所有字段,但是某些情况下我们可能要根据未知的名称来命名计数器
这个时候就可以使用动态计数器来实现:
context.getCounter("计数器组名","计数器名").increment(1);
这里的计数器名的获得方式可以是任意的,例如动态获取的字段值等
但是大部分情况下,枚举类型可以足够使用了,而且枚举类型阅读性较强,易于使用,而且是类型安全的
所以推荐尽可能的使用枚举类型
在代码中获取计数器的值
除了通过Web UI、CLI和-counter参数获得作业的计数器,用户也可以通过代码在程序中获取计数器的值:
String jobId = args[0];
Cluster cluster = new Cluster(getConf());
Job job = cluster.getJob(JobId.forName(jobId));
if(job == null){
System.err.println("No job whih ID %s found",jobId);
return -1;
}
if(!job.isComplete()){
System.err.println("Job %s is not complete",jobId);
return -1;
}
Counters counters = job.getCounters();
//关键代码
long missing = conters.findCounter(Temperature.MISSING).getValue();
long total = counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
排序
部分排序
部分排序是指在map阶段,对每个分区中的数据进行排序的过程
在Hadoop提交作业自定义排序和分组中可以看到
MapReduce中控制部分排序的方法不只有一种,控制排序的顺序如下:
1.如果设置了mapreduce.job.output.key.comparator.class属性或者setComparatorClass()方法,则使用设置的类进行部分排序
2.否则,键必须是WritableComparable的子类,并使用针对该键类型的已经注册的comparator
3.否则,使用RawComparator将字节流反序列化为对象,并调用WritableComparable的comparaTo()方法
我们在自定义数据类型的时候继承自WritableComparable,并重写了comparaTo方法,这里的设置是最后才会使用的
如果定义了RawComparator/WritableComparator的具体实现类,那么将会优先使用这个设置,因为其可以直接对比字节流数组
全排序
MapReduce Shuffle阶段的排序只针对各个单独的分区,也就是之前讨论到的部分排序
对于每个分区,其数据是有序的,但是从数据的总体来看,是无序的
如何让MapReduce产生全局有序的数据呢?
最简单的办法是只使用一个分区,但是这就丧失了MapReduce并行计算的特性,必须在单台机器上处理所有数据
事实上,除了使用一个分区,还有另外一种方式既可以实现全局有序,也可以充分利用到MapReduce的并行计算能力
但是这个方法需要做一些额外的工作
思考一下,在部分排序中,每个分区内的数据都是有序的,但是从分区的角度看就是无序的了
如果我们能够确保分区也是有序的呢?,例如分区1保存1-100的数据,分区2保存101-200的数据,一次类推
那么从分区的角度看,各个分区之间是有序的,而分区内部的数据也是自然有序的
从而就做到了数据的全局有序
但是在这个过程中需要注意一个情况:如何确保每个分区的数据量分配是均匀的?
因为在实际场景中,1-100中包含的数据可能有1000个,而101-200的数据只有50个,这就造成了数据倾斜的问题
为了解决这个问题,我们通常需要深入的了解数据的组成特性
但是在海量数据的情况下,不可能对全部数据进行检查
这时我们可以使用采样的方式来进行
采样的核心思想是只查看一小部分的键,获得键的近似分布由此构建分区
Hadoop中已经内置了若干的采样器,接口如下:
public interface Sampler<K,V>{
K[] getSample(InputFormat<K,V> inf,Job job) throw IOException,InterruptedException;
}
但是通常不会直接使用这个getSample接口,而是由InputSampler的writePartitionFile方法调用
目的是创建一个SequenceFile来存储定义分区的键
public static <K,V> writePartitionFile(Job job,Sampler<K,V> sampler) throw IOException,ClassNotFoundException,InterruptedException
该SequenceFile会被TotalOrderPartitioner使用来为作业创建分区:
//设置分区类为TotalOrderPartitioner
job.setPartitionerClass(TotalOrderPartitioner.class);
//使用随机采样器,采样率为0.1,最大样本数和最大分区数为10000何10,任意一个条件满足之后即刻停止采样
InputSampler.Sampler<IntWritable,Text> sampler = new InputSampler.RandomSampler<IntWritable,Text>(0.1,10000,10);
//使用该采样器创建定义分区键的SequenceFile
InputSampler.writePartitionFile(job,sampler);
///获得该SequenceFile并加入分布式缓存中共享
String partitionFile = TotalOrderPartitioner.getPartitionFile(conf);
URI uri = new URI(partitionFile);
jov.addCacheFile(uri);
这个采样器将会运行在客户端,所以会从集群上下载数据,需要注意下载的数据量不要太大不然运行时间很久
使用该方法还可以自由的设置reducer的任务数,即分区数,通过mapreduce.job.reducers来设置最后需要产生多少个均匀的分区
RandomSampler是一种比较通用的采样器,除了它,还有另外一些例如:
- SplitSampler:只采样一个分片中的前n条记录,没有从全部分片中广泛采样,所以不适合已经排好序的数据
- IntervalSampler:以一定的间隔从分片中选择键,因此很适合排过序的数据
二次排序
二次排序即为对数据的值进行排序,其实在Hadoop I/O的序列化小节中
就已经讨论过这个问题了,具体案例可以参考:Hadoop提交作业自定义排序和分组
Join连接
使用MapReduce进行连接操作的方式和技巧取决于数据集的规模和结构
如果一个数据集很大,另外一个很小,完全可以使用MapReduce中的DistributedCache
将小数据集分发到各个节点上
如果两个数据集都很大,那么又可以分为Map端的Join和Reduce端的Join
Map端的Join
Map端的Join操作会在数据到达map函数之前执行
为了达到这个目的,Map端的输入数据必须:
1.两个数据集被划分为数量相同的分区
2.两个数据集按照相同的键进行排序
由于Map可以设置之前执行的多个作业的输出为其输入,按照以上条件
此时输入数据应该满足:
1.两个作业有相同的reduce数量
2.键是相同的且不可分割
满足Map端Join操作的要求之后,可以利用org.apache.hadoop.mapreduce.join包中的ComsiteInputFormat类在map函数之前执行join操作
Reduce端的Join
比起Map端,Reduce端的Join对数据的要求没有那么高,利用Shuffle相同键的记录会被输入到同一个reducer(分区)中的特性
Reducer端可以天然进行Join操作,但是由于数据要经过Shuffle过程,所以效率往往比Map端的Join要低
而且在Reduce端的Join中,还可以利用到之前讨论的二次排序
有时候join连接需要一个数据集先于另一个数据集到达reduce函数,这时候我们可以听过二次排序对数据的值做一个标号
先要达到的数据标号设置为0,另外一个数据集设置为1,然后根据这个标号进行排序就可以实现让想要的数据集先一步到达reduce
边数据分布
所谓的边数据(Side Data)可以理解为MapReduce作业执行过程中
所有任务都有可能要使用到的只读的的数据,用以辅助处理主数据
使用JobConfiguration
Configuration类的各种setter方法可以方便的设置一些键值对类型的数据
用户可以通过getConfiguration方法获得配置的信息
这种方式足以应对很多只需要设置一些属性的场合
但是其缺点是:
- 只适合类似属性设置的小数据
- 对于很复杂的对象,用户需要自己设置序列化和反序列化
- 每次读取配置的时候所有设置都将读取内存,不管有没有用到
DistributedCache
分布式缓存机制在作业运行之前将用户设置的数据拷贝到各个节点中以供使用
缓存的容量大小默认为10G,可以通过yarn.nodemanager.localizer.cache.target-size-mb来配置(以字节为单位)
具体的使用方式参考:MapReduce中的DistributedCache
作者:@小黑