hadoop中mapreduce的常用类(二)

云智慧(北京)科技有限公司陈鑫

NullWritable 

不想输出的时候,把它当做key。NullWritable是Writable的一个特殊类,序列化的长度为0,实现方法为空实现,不从数据流中读数据,也不写入数据,只充当占位符,如在MapReduce中,如果你不需要使用键或值,你就可以将键或值声明为NullWritable,NullWritable是一个不可变的单实例类型。

FileInputFormat继承于InputFormat

InputFormat的作用:

验证输入规范;
切分输入文件为InputSpilts;
提供RecordReader来收集InputSplit中的输入记录,给Mapper进行执行。

RecordReader
将面向字节的InputSplit转换为面向记录的视图,供Mapper或者Reducer使用运行。因此假定处理记录的责任界限,为任务呈现key-value。

SequenceFile:

SequenceFile是包含二进制kv的扁平文件(序列化)。它提供Writer、Reader、Sorter来进行写、读、排序功能。基于CompressionType,SequenceFile有三种对于kv的压缩方式:

Writer:不压缩records;

RecordCompressWriter:只压缩values;

BlockCompressWriter:   压缩records,keys和values都被分开压缩在block中,block的大小可以配置;

压缩方式由合适的CompressionCodec指定。推荐使用此类的静态方法createWriter来选择格式。Reader作为桥接可以读取以上任何一种压缩格式。

CompressionCodec

封装了关于流式压缩/解压缩的相关方法。

Mapper

Mapper 将输入的kv对映射成中间数据kv对集合。Maps 将输入记录转变为中间记录,其中被转化后的记录不必和输入记录类型相同。一个给定的输入对可以映射为0或者多个输出对。

在MRJob执行过程中,MapReduce框架根据提前指定的InputFormat(输入格式对象)产生InputSplit(输入分片),而每个InputSplit将会由一个map任务处理。
总起来讲,Mapper实现类通过JobConfigurable.configure(JobConf)方法传入JobConf对象来初始化,然后在每个map任务中调用map(WritableComparable,Writable,OutputCollector,Reporter)方法处理InputSplit的每个kv对。MR应用可以覆盖Closeable.close方法去处理一些必须的清理工作。
输出对不一定和输入对类型相同。一个给定的输入对可能映射成0或者很多的输出对。输出对是框架通过调用OutputCollector.colect(WritableComparable,Writable)得到。

MR应用可以使用Reporter汇报进度,设置应用层级的状态信息,更新计数器或者只是显示应用处于运行状态等。

所有和给定的输出key关联的中间数据都会随后被框架分组处理,并传给Reducer处理以产生最终的输出。用户可以通过JobConf.setOutputKeyComparatorClass(Class)指定一个Comparator控制分组处理过程。

Mapper输出都被排序后根据Reducer数量进行分区,分区数量等于reduce任务数量。用户可以通过实现自定义的Partitioner来控制哪些keys(记录)到哪个Reducer中去。

此外,用户还可以指定一个Combiner,调用JobConf.setCombinerClass(Class)来实现。这个可以来对map输出做本地的聚合,有助于减少从mapper到reducer的数据量。

经过排序的中间输出数据通常以一种简单的格式(key-len,key,value-len,value)存储在SequenceFile中。应用可以决定是否或者怎样被压缩以及压缩格式,可以通过JobConf来指定CompressionCodec.

如果job没有reducer,那么mapper的输出结果会不经过分组排序,直接写进FileSystem.

Map

通常map数由输入数据总大小决定,也就是所有输入文件的blocks数目决定。
每个节点并行的运行的map数正常在10到100个。由于Map任务初始化本身需要一段时间所以map运行时间至少在1分钟为好。

如此,如果有10T的数据文件,每个block大小128M,最大使用为82000map数,除非使用setNumMapTasks(int)(这个方法仅仅对MR框架提供一个建议值)将map数值设置到更高。

Reducer

Reducer根据key将中间数据集合处理合并为更小的数据结果集。
用户可以通过JobConf.setNumReduceTasks(int)设置作业的reducer数目。
整体而言,Reducer实现类通过JobConfigurable.configure(JobConf)方法将JobConf对象传入,并为Job设置和初始化Reducer。MR框架调用 reduce(WritableComparable, Iterator, OutputCollector,Reporter) 来处理以key被分组的输入数据。应用可以覆盖Closeable.close()处理必要的清理操作。

Reducer由三个主要阶段组成:shuffle,sort,reduce。

  shuffle

输入到Reducer的输入数据是Mapper已经排过序的数据.在shuffle阶段,根据partition算法获取相关的mapper地址,并通过Http协议将mapper的相应输出数据由reducer拉取到reducer机器上处理。

  sort

框架在这个阶段会根据key对reducer的输入进行分组(因为不同的mapper输出的数据中可能含有相同的key)。
shuffle和sort是同时进行的,同时reducer仍然在拉取map的输出。

  Secondary Sort

如果对中间数据key进行分组的规则和在处理化简阶段前对key分组规则不一致时,可以通过JobConf.setOutputValueGroupingComparator(Class)设置一个Comparator。因为中间数据的分组策略是通过JobConf.setOutputKeyComparatorClass(Class) 设置的,可以控制中间数据根据哪些key进行分组。而JobConf.setOutputValueGroupingComparator(Class)则可用于在数据连接情况下对value进行二次排序。

Reduce(化简)

这个阶段框架循环调用 reduce(WritableComparable, Iterator, OutputCollector,Reporter) 方法处理被分组的每个kv对。

reduce 任务一般通过OutputCollector.collect(WritableComparable, Writable)将输出数据写入文件系统FileSystem。应用可以使用Reporter汇报作业执行进度、设置应用层级的状态信息并更新计数器(Counter),或者只是提示作业在运行。
注意,Reducer的输出不会再进行排序。
Reducer数目

合适的reducer数目可以这样估算:(节点数目mapred.tasktracker.reduce.tasks.maximum)乘以0.95 或乘以1.75。因子为0.95时,当所有map任务完成时所有reducer可以立即启动,并开始从map机器上拉取数据。因子为1.75时,最快的一些节点将完成第一轮reduce处理,此时框架开始启动第二轮reduce任务,这样可以达到比较好的作业负载均衡。提高reduce数目会增加框架的运行负担,但有利于提升作业的负载均衡并降低失败的成本。上述的因子使用最好在作业执行时框架仍然有reduce槽为前提,毕竟框架还需要对作业进行可能的推测执行和失败任务的处理。

不使用Reducer
如果不需要进行化简处理,可以将reduce数目设为0。这种情况下,map的输出会直接写入到文件系统。输出路径通过setOutputPath(Path)指定。框架在写入数据到文件系统之前不再对map结果进行排序。

Partitioner

Partitioner对数据按照key进行分区,从而控制map的输出传输到哪个reducer上。默认的Partitioner算法是hash(哈希。分区数目由作业的reducer数目决定。HashPartitioner是默认的Partitioner。

Reporter

Reporter为MR应用提供了进度报告、应用状态信息设置,和计数器(Counter)更新等功能.

Mapper和Reducer实现可以使用Reporter汇报进度或者提示作业在正常运行。在一些场景下,应用在处理一些特殊的kv对时耗费了过多时间,这个可能会因为框架假定任务超时而强制停止了这些作业。为避免该情况,可以设置mapred.task.timeout为一个比较高的值或者将其设置为0以避免超时发生。
应用也可以使用Reporter来更新计数(Counter)。

OutputCollector

OutputCollector是MR框架提供的通用工具来收集Mapper或者Reducer输出数据(中间数据或者最终结果数据)。
HadoopMapReduce提供了一些经常使用的mapper、reducer和partioner的实现类供我们进行学习。

以上有关configuration和job的部分在新的API中有所改变,简单说就是在Mapper和Reducer中引入了MapContext和ReduceContext,它们封装了configuration和outputcollector,以及reporter。

时间: 2024-10-10 21:23:20

hadoop中mapreduce的常用类(二)的相关文章

hadoop中mapreduce的常用类(一)

云智慧(北京)科技有限公司陈鑫 写这个文章的时候才意识到新旧API是同时存在于1.1.2的hadoop中的.以前还一直纳闷儿为什么有时候是jobClient提交任务,有时是Job...不管API是否更新,下面这些类也还是存在于API中的,经过自己跟踪源码,发现原理还是这些.只不过进行了重新组织,进行了一些封装,使得扩展性更好.所以还是把这些东西从记事本贴进来吧. 关于这些类的介绍以及使用,有的是在自己debug中看到的,多数为纯翻译API的注释,但是翻译的过程受益良多. GenericOptio

浅谈hadoop中mapreduce的文件分发

最近在做数据分析的时候,需要在mapreduce中调用c语言写的接口,此时就需要把动态链接库so文件分发到hadoop的各个节点上,原来想自己来做这个分发,大概过程就是把so文件放在hdfs上面,然后做mapreduce的时候把so文件从hdfs下载到本地,但查询资料后发现hadoop有相应的组件来帮助我们完成这个操作,这个组件就是DistributedCache,分布式缓存,运用这个东西可以做到第三方文件的分发和缓存功能,下面详解: 如果我们需要在map之间共享一些数据,如果信息量不大,我们可

J2SE基础:8.系统常用类二

1:基础数据与封装类型之间的转型 A:基础数据类型--->封装类型(对象类型) Boolean boolean_1 = new Boolean(true); byte ---->Byte short---->Short char---->Character int--->Integer long-->Long float-->Float double-->Double B:封装类型--->基础类型 Integer.intValue--->int

Java 中必的常用类(很实用)

Java中必须了解的常用类 一.包装类 相信各位小伙伴们对基本数据类型都非常熟悉,例如 int.float.double.boolean.char 等.基本数据类型是不具备对象的特性的,比如基本类型不能调用方法.功能简单...,为了让基本数据类型也具备对象的特性, Java 为每个基本数据类型都提供了一个包装类,这样我们就可以像操作对象那样来操作基本数据类型. 基本类型和包装类之间的对应关系: 注意:有两个包装类的名称比较特殊一个是Integer,另一个是Character,其他都是基本数据类首

Hadoop中MapReduce多种join实现实例分析

一.概述 对于RDBMS中的join操作大伙一定非常熟悉,写sql的时候要十分注意细节,稍有差池就会耗时巨久造成很大的性能瓶颈,而在Hadoop中使用MapReduce框架进行join的操作时同样耗时,但是由于hadoop的分布式设计理念的特殊性,因此对于这种join操作同样也具备了一定的特殊性.本文主要对MapReduce框架对表之间的join操作的几种实现方式进行详细分析,并且根据我在实际开发过程中遇到的实际例子来进行进一步的说明. 二.实现原理 1.在Reudce端进行连接. 在Reudc

MyEclipse中如何查看常用类的源代码?

MyEclipse是一款功能强大的java开发软件,里面提供了很多友好的快捷的使用方法.对于java初学者而言,一般都会遇到查询常用类的源代码,或是从中学习编程技巧:或是从中学习类的使用方法,或是琢磨一些细节问题.那么,该如何查看源代码呢? 首先,打开MyEclipse软件,建一个类,在代码编辑窗口中,写出想要打开的常用类的类名,以“String”类为例进行说明.下面讲解如何打开源代码的三种方法. 方法一:在MyEclipse代码编辑窗口中,把光标放在类名的前面.中间或者后边都可以,具体位置可以

hadoop中实现定制Writable类

Hadoop中有一套Writable实现可以满足大部分需求,但是在有些情况下,我们需要根据自己的需要构造一个新的实现,有了定制的Writable,我们就可以完全控制二进制表示和排序顺序. 为了演示如何新建一个定制的writable类型,我们需要写一个表示一对字符串的实现: blic class TextPair implements WritableComparable<TextPair> { private Text first; private Text second; public Te

android/util中的一些常用类

1.SparseArrays  sparseArrays是映射Integer To Objects,可参照HashMap的作用.它的目的是更省内存,内存效率更高,因为它避免了keys的自动装箱,而且它的结构也不用依赖每一个entry-set.它内部是用二分查找去查询,所以不适合key很多的情况,大概在几百个元素的情况下,比hashmap的速度慢50%以下. 类似的有SparseBooleanArray,SparseIntArray,SparseLongArray,分别是映射Integer To

Java常用类(二) Scanner类和大数类

二.Scanner类 有C系语言基础的可能都比较熟悉scanf("%d",&a);和cin>>a;这种代码,也打开了程序交互的第一道门.因此,这些程序员开始学Java时都会先找输入输出(指标准输入输出),Java的输出就非常常见,任何一个Java教程基本都是以输出开始的,然而输入却在很后面提到,因为Java的输入不似输出那么简单.现在我们就来介绍一下实现输入的Scanner类. 1.Scanner基本使用方法和next()系列方法 一个从键盘输入的基本示例: imp