MapReduce高级特性

计数器

因为计数器的查看往往比查看集群日志来的方便快捷

所以有些情况下计数器信息比集群日志更加有效

用户自定义的计数器

关于Hadoop的内置计数器的介绍可以参考Hadoop权威指南第九章MapReduce Features中的Build-in Counts小节

这里限于篇幅不再说明

MapReduce允许用户在程序中使用枚举或者字符串的格式类自定义计数器

一个作业可以定义的计数器不限,使用枚举类型时

枚举类型的名称即为组名,枚举类型的字段即为计数器名

计数器是全局的,会跨越所有Mapper和Reducer进行使用,并在作业结束的时候产生一个结果

例如,现有枚举类型如下:

enum Temperature{
    MISSING,
    MALFORMAT
}

在MapReduce程序中可以这样来使用计数器:

context.getCounter(Temperature.MISSING).increment(1);
context.getCounter(Temperature.MALFORMAT).increment(1);

动态计数器

由于枚举类型在编译的时候就确定了所有字段,但是某些情况下我们可能要根据未知的名称来命名计数器

这个时候就可以使用动态计数器来实现:

context.getCounter("计数器组名","计数器名").increment(1);

这里的计数器名的获得方式可以是任意的,例如动态获取的字段值等

但是大部分情况下,枚举类型可以足够使用了,而且枚举类型阅读性较强,易于使用,而且是类型安全的

所以推荐尽可能的使用枚举类型

在代码中获取计数器的值

除了通过Web UI、CLI和-counter参数获得作业的计数器,用户也可以通过代码在程序中获取计数器的值:

String jobId = args[0];
Cluster cluster = new Cluster(getConf());
Job job = cluster.getJob(JobId.forName(jobId));
if(job == null){
    System.err.println("No job whih ID %s found",jobId);
    return -1;
}
if(!job.isComplete()){
    System.err.println("Job %s is not complete",jobId);
    return -1;
}
Counters counters = job.getCounters();
//关键代码
long missing = conters.findCounter(Temperature.MISSING).getValue();
long total = counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();

排序

部分排序

部分排序是指在map阶段,对每个分区中的数据进行排序的过程

Hadoop提交作业自定义排序和分组中可以看到

MapReduce中控制部分排序的方法不只有一种,控制排序的顺序如下:

1.如果设置了mapreduce.job.output.key.comparator.class属性或者setComparatorClass()方法,则使用设置的类进行部分排序

2.否则,键必须是WritableComparable的子类,并使用针对该键类型的已经注册的comparator

3.否则,使用RawComparator将字节流反序列化为对象,并调用WritableComparable的comparaTo()方法

我们在自定义数据类型的时候继承自WritableComparable,并重写了comparaTo方法,这里的设置是最后才会使用的

如果定义了RawComparator/WritableComparator的具体实现类,那么将会优先使用这个设置,因为其可以直接对比字节流数组

全排序

MapReduce Shuffle阶段的排序只针对各个单独的分区,也就是之前讨论到的部分排序

对于每个分区,其数据是有序的,但是从数据的总体来看,是无序的

如何让MapReduce产生全局有序的数据呢?

最简单的办法是只使用一个分区,但是这就丧失了MapReduce并行计算的特性,必须在单台机器上处理所有数据

事实上,除了使用一个分区,还有另外一种方式既可以实现全局有序,也可以充分利用到MapReduce的并行计算能力

但是这个方法需要做一些额外的工作

思考一下,在部分排序中,每个分区内的数据都是有序的,但是从分区的角度看就是无序的了

如果我们能够确保分区也是有序的呢?,例如分区1保存1-100的数据,分区2保存101-200的数据,一次类推

那么从分区的角度看,各个分区之间是有序的,而分区内部的数据也是自然有序的

从而就做到了数据的全局有序

但是在这个过程中需要注意一个情况:如何确保每个分区的数据量分配是均匀的?

因为在实际场景中,1-100中包含的数据可能有1000个,而101-200的数据只有50个,这就造成了数据倾斜的问题

为了解决这个问题,我们通常需要深入的了解数据的组成特性

但是在海量数据的情况下,不可能对全部数据进行检查

这时我们可以使用采样的方式来进行

采样的核心思想是只查看一小部分的键,获得键的近似分布由此构建分区

Hadoop中已经内置了若干的采样器,接口如下:

public interface Sampler<K,V>{
    K[] getSample(InputFormat<K,V> inf,Job job) throw IOException,InterruptedException;
}

但是通常不会直接使用这个getSample接口,而是由InputSampler的writePartitionFile方法调用

目的是创建一个SequenceFile来存储定义分区的键

public static <K,V> writePartitionFile(Job job,Sampler<K,V> sampler) throw IOException,ClassNotFoundException,InterruptedException

该SequenceFile会被TotalOrderPartitioner使用来为作业创建分区:

//设置分区类为TotalOrderPartitioner
job.setPartitionerClass(TotalOrderPartitioner.class);
//使用随机采样器,采样率为0.1,最大样本数和最大分区数为10000何10,任意一个条件满足之后即刻停止采样
InputSampler.Sampler<IntWritable,Text> sampler = new InputSampler.RandomSampler<IntWritable,Text>(0.1,10000,10);
//使用该采样器创建定义分区键的SequenceFile
InputSampler.writePartitionFile(job,sampler);
///获得该SequenceFile并加入分布式缓存中共享
String partitionFile = TotalOrderPartitioner.getPartitionFile(conf);
URI uri = new URI(partitionFile);
jov.addCacheFile(uri);

这个采样器将会运行在客户端,所以会从集群上下载数据,需要注意下载的数据量不要太大不然运行时间很久

使用该方法还可以自由的设置reducer的任务数,即分区数,通过mapreduce.job.reducers来设置最后需要产生多少个均匀的分区

RandomSampler是一种比较通用的采样器,除了它,还有另外一些例如:

  • SplitSampler:只采样一个分片中的前n条记录,没有从全部分片中广泛采样,所以不适合已经排好序的数据
  • IntervalSampler:以一定的间隔从分片中选择键,因此很适合排过序的数据

二次排序

二次排序即为对数据的值进行排序,其实在Hadoop I/O的序列化小节中

就已经讨论过这个问题了,具体案例可以参考:Hadoop提交作业自定义排序和分组

Join连接

使用MapReduce进行连接操作的方式和技巧取决于数据集的规模和结构

如果一个数据集很大,另外一个很小,完全可以使用MapReduce中的DistributedCache

将小数据集分发到各个节点上

如果两个数据集都很大,那么又可以分为Map端的Join和Reduce端的Join

Map端的Join

Map端的Join操作会在数据到达map函数之前执行

为了达到这个目的,Map端的输入数据必须:

1.两个数据集被划分为数量相同的分区

2.两个数据集按照相同的键进行排序

由于Map可以设置之前执行的多个作业的输出为其输入,按照以上条件

此时输入数据应该满足:

1.两个作业有相同的reduce数量

2.键是相同的且不可分割

满足Map端Join操作的要求之后,可以利用org.apache.hadoop.mapreduce.join包中的ComsiteInputFormat类在map函数之前执行join操作

Reduce端的Join

比起Map端,Reduce端的Join对数据的要求没有那么高,利用Shuffle相同键的记录会被输入到同一个reducer(分区)中的特性

Reducer端可以天然进行Join操作,但是由于数据要经过Shuffle过程,所以效率往往比Map端的Join要低

而且在Reduce端的Join中,还可以利用到之前讨论的二次排序

有时候join连接需要一个数据集先于另一个数据集到达reduce函数,这时候我们可以听过二次排序对数据的值做一个标号

先要达到的数据标号设置为0,另外一个数据集设置为1,然后根据这个标号进行排序就可以实现让想要的数据集先一步到达reduce

边数据分布

所谓的边数据(Side Data)可以理解为MapReduce作业执行过程中

所有任务都有可能要使用到的只读的的数据,用以辅助处理主数据

使用JobConfiguration

Configuration类的各种setter方法可以方便的设置一些键值对类型的数据

用户可以通过getConfiguration方法获得配置的信息

这种方式足以应对很多只需要设置一些属性的场合

但是其缺点是:

  • 只适合类似属性设置的小数据
  • 对于很复杂的对象,用户需要自己设置序列化和反序列化
  • 每次读取配置的时候所有设置都将读取内存,不管有没有用到

DistributedCache

分布式缓存机制在作业运行之前将用户设置的数据拷贝到各个节点中以供使用

缓存的容量大小默认为10G,可以通过yarn.nodemanager.localizer.cache.target-size-mb来配置(以字节为单位)

具体的使用方式参考:MapReduce中的DistributedCache

作者:@小黑

时间: 2024-08-30 14:02:09

MapReduce高级特性的相关文章

MapReduce编程实战之“高级特性”

本篇介绍MapReduce的一些高级特性,如计数器.数据集的排序和连接.计数器是一种收集作业统计信息的有效手段,排序是MapReduce的核心技术,MapReduce也能够执行大型数据集间的""连接(join)操作. 计数器 计数器是一种收集作业统计信息的有效手段,用于质量控制或应用级统计.计数器还可用于辅助诊断系统故障.对于大型分布式系统来说,获取计数器比分析日志文件容易的多. 示例一:气温缺失及不规则数据计数器 import java.io.IOException; import

MapReduce计算框架高级特性程序运行并发度

2019/2/19 星期二 MapReduce计算框架高级特性程序运行并发度 所谓的并发度,就是在MapReduce执行程序的过程中有多少个map task进程和reduce task进程,来一起完成程序的处理. MapReduce就是把业务处理逻辑变成分布式来处理. reduce task 数量的决定机制 //全局的聚合操作 由业务场景决定1.业务逻辑需要2.数据量大小设置方法:job.setNumReduceTasks(5) //reduce task的数量不能够任意的指定,比如:我们在一大

Python的一些高级特性

内容基本上来自于廖雪峰老师的blog相当于自己手打了一遍,加强加强理解吧. http://www.liaoxuefeng.com/wiki/001374738125095c955c1e6d8bb493182103fac9270762a000 Python的一些高级特性 Slot python是动态语言,所谓动态,就是可以先创建类的实例,之后再动态绑定属性或方法,比如下边这个例子: class Student(object) pass s=Student() s.name="asd" 注

Spark Streaming高级特性在NDCG计算实践

从storm到spark streaming,再到flink,流式计算得到长足发展, 依托于spark平台的spark streaming走出了一条自己的路,其借鉴了spark批处理架构,通过批处理方式实现了实时处理框架.为进一步了解spark streaming的相关内容,飞马网于3月20日晚邀请到历任百度大数据的高级工程师-王富平,在线上直播中,王老师针对spark streaming高级特性以及ndcg计算实践进行了分享. 以下是本次直播的主要内容: 一.Spark Streaming简介

javascript高级特性

01_javascript相关内容02_函数_Arguments对象03_函数_变量的作用域04_函数_特殊函数05_闭包_作用域链&闭包06_闭包_循环中的闭包07_对象_定义普通对象08_对象_定义函数对象09_对象_内建对象10_原型_为函数对象增加属性或方法11_原型_利用函数对象本身重写原型12_继承_函数对象之间的继承13_继承_普通对象之间的继承 javascript高级特性(面向对象): * 面向对象:   * 面向对象和面向过程的区别:     * 面向对象:人就是对象,年龄\

python之高级特性

掌握了Python的数据类型.语句和函数,基本上就可以编写出很多有用的程序了. 比如构造一个1, 3, 5, 7, ..., 99的列表,可以通过循环实现: L = [] n = 1 while n <= 99: L.append(n) n = n + 2 取list的前一半的元素,也可以通过循环实现. 但是在Python中,代码不是越多越好,而是越少越好.代码不是越复杂越好,而是越简单越好. 基于这一思想,我们来介绍Python中非常有用的高级特性,1行代码能实现的功能,决不写5行代码.请始终

Python_高级特性

Python高级特性 author:lxy 切片.迭代.列表生成式.生成器 切片 Python中 代码越少越简单越好, 我们要取一个list中的某一部分的元素的我们可以使用判断+循环实现,在Python提供了专门的方法--切片 slice切片,用来获取list中某一段元素 tuple.str等都看做是一种list只是使用切片获取的片段还是他们原来相应的类型 例1.对list进行切片 >>> n = [1,3,2,5,6,8] >>> n[0:3]           

JSP简明教程(五):高级特性

JSP过滤器 过滤器的作用是给web请求增加额外的逻辑,每个页面可以被多个过滤器进行处理.过滤器需要在web.xml文件中进行定义,语法如下.过滤器的执行顺序与filter-mapping的定义顺序相同. <filter> <filter-name>FilterName</filter-name> <filter-class>TestFilter</filter-name> <init-param> <param-name>

Day-5: Python高级特性

python的理念是:简单.优雅.所以,在Python中集成了许多经常要使用的高级特性,以此来简化代码. 切片: 对于一个list或者tuple,取其中一段的元素,称为切片(Slice). L[start:end]表示取L中从索引号为start到end的元素,其中如果顺着取,则索引号范围为0~len(L)-1:反着取,则索引号范围为-1~-len(L). 迭代: Python中迭代用for...in来完成.对于list或者tuple,就是for name in names之类:而对于dict,就