关于Mapper、Reducer的个人总结(转)

Mapper的处理过程:

1.1. InputFormat 产生 InputSplit,并且调用RecordReader将这些逻辑单元(InputSplit)转化为map task的输入。其中InputSplit是map task处理的最小输入单元的逻辑表示。

1.2. 在客户端代码中调用Job类来设置参数,并执行在hadoop集群的上的MapReduce程序。

1.3. Mapper类在Job中被实例化,并且通过MapContext对象来传递参数设置。可以调用Job.getConfiguration().set(“myKey”, “myVal”)来设置参数。

1.4. Mapper的run()方法调用了自身的setup()来设置参数。

1.5. Mapper的run()方法调用map(KeyInType, ValInType, Context)方法来依次处理InputSplit中输入的key/value数据对。并且可以通过Context参数的Context.write(KeyOutType, ValOutType)方法来发射处理的结果。用户程序还可以用Context来设置状态信息等。同时,用户还可以用Job.setCombinerClass(Class)来设置Combiner,将map产生的中间态数据在Mapper本地进行汇聚,从而减少传递给Reducer的数据。

1.6. Mapper的run()方法调用cleanup()方法。

一些说明:

所有的中间结果都会被MapReduce框架自动的分组,然后传递给Reducer(s)去产生最后的结果。用户可以通过Job.setGroupingComparatorClass(Class)来设置Comparator。如果这个Comparator没有被设置,那么所有有一样的key的数据不会被排序。

Mapper的结果都是被排序过的,并被划分为R个区块(R是Reducer的个数)。用户可以通过实现自定义的Partitioner类来指定哪些数据被划分给哪个Reducer。

中间态的数据往往用(key-len, key, value-len, value)的简单格式存储。用户程序可以指定CompressionCodec来压缩中间数据。

Map的数目由输入数据的总大小决定。一般来说,一个计算节点10-100个map任务有较好的并行性。如果cpu的计算量很小,那么平均每个计算节点300个map任务也是可以的。但是每个任务都是需要时间来初始化的,因此每个任务不能划分的太小,至少也要平均一个任务执行个一分钟。可以通过mapreduce.job.maps参数来设置map的数目。更进一步说,任务的数量是有InputFormat.getSplits()方法来控制的,用户可以重写这个方法。

------------------------------------------------------------------------------------------------------------------------------------------------

Reducer主要分三个步骤:

1.       Shuffle洗牌

这步骤是Reducer从相关的Mapper节点获取中间态的数据。

2.       Sort排序

在洗牌的同时,Reducer对获取的数据进行排序。在这个过程中,可以用Job.setGroupingComparatorClass(Class)来对同一个key的数据进行Secondary Sort二次排序。

3.       Reduce

Reduce的调用顺序和Map差不多,也是通过run()方法调用setup(),reduce(),cleanup()来实现的。Reducer输出的数据是没有经过排序的。

Reduce 的数目可以通过Job.setNumReduceTasks(int)来设置。一般来说,Reduce的数目是节点数的0.95到1.75倍。

Reduce的数目也可以设置为0,那么这样map的输出会直接写到文件系统中。

Reduce中还可以使用Mark-Reset的功能。简而言之就是可以在遍历map产生的中间态的数据的时候可以进行标记,然后在后面适当的时候用reset回到最近标记的位置。当然这是有一点限制的,如下面的例子,必须自己在Reduce方法中对reduce的Iterator重新new 一个MarkableIterator才能使用。

public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
MarkableIterator<IntWritable> mitr = new MarkableIterator<IntWritable>(values.iterator());
// Mark the position
mitr.mark();
while (mitr.hasNext()) {
i = mitr.next();
// Do the necessary processing
}

// Reset
mitr.reset();
// Iterate all over again. Since mark was called before the first
// call to mitr.next() in this example, we will iterate over all
// the values now
while (mitr.hasNext()) {
i = mitr.next();
// Do the necessary processing
}
}

3.     Partitioner

Partitioner 对map输出的中间态数据按照reduce的数目进行分区,一般通过hash来进行划分。默认的实现是HashPartitioner。

4.     Reporting Progress

MapReduce程序可以通过mapper或者reducer的Context来汇报应用程序的状态。

5.     Job

当我们在写MapReduce程序的时候,通常,在main函数里。建立一个Job对象,设置它的JobName,然后配置输入输出路径,设置我们的Mapper类和Reducer类,设置InputFormat和正确的输出类型等等。然后我们会使用job.waitForCompletion()提交到JobTracker,等待job运行并返回,这就是一般的Job设置过程。JobTracker会初始化这个Job,获取输入分片,然后将一个一个的task任务分配给TaskTrackers执行。TaskTracker获取task是通过心跳的返回值得到的,然后TaskTracker就会为收到的task启动一个JVM来运行。

Job其实就是提供配置作业、获取作业配置、以及提交作业的功能,以及跟踪作业进度和控制作业。Job类继承于JobContext类。JobContext提供了获取作业配置的功能,如作业ID,作业的Mapper类,Reducer类,输入格式,输出格式等等,它们除了作业ID之外,都是只读的。 Job类在JobContext的基础上,提供了设置作业配置信息的功能、跟踪进度,以及提交作业的接口和控制作业的方法。

一个Job对象有两种状态,DEFINE和RUNNING,Job对象被创建时的状态时DEFINE,当且仅当Job对象处于DEFINE状态,才可以用来设置作业的一些配置,如Reduce task的数量、InputFormat类、工作的Mapper类,Partitioner类等等,这些设置是通过设置配置信息conf来实现的;当作业通过submit()被提交,就会将这个Job对象的状态设置为RUNNING,这时候作业以及提交了,就不能再设置上面那些参数了,作业处于调度运行阶段。处于RUNNING状态的作业我们可以获取作业、maptask和reduce task的进度,通过代码中的*Progress()获得,这些函数是通过info来获取的,info是RunningJob对象,它是实际在运行的作业的一组获取作业情况的接口,如Progress。

在waitForCompletion()中,首先用submit()提交作业,然后等待info.waitForCompletion()返回作业执行完毕。verbose参数用来决定是否将运行进度等信息输出给用户。submit()首先会检查是否正确使用了new API,这通过setUseNewAPI()检查旧版本的属性是否被设置来实现的,接着就connect()连接JobTracker并提交。实际提交作业的是一个JobClient对象,提交作业后返回一个RunningJob对象,这个对象可以跟踪作业的进度以及含有由JobTracker设置的作业ID。

getCounter()函数是用来返回这个作业的计数器列表的,计数器被用来收集作业的统计信息,比如失败的map task数量,reduce输出的记录数等等。它包括内置计数器和用户定义的计数器,用户自定义的计数器可以用来收集用户需要的特定信息。计数器首先被每个task定期传输到TaskTracker,最后TaskTracker再传到JobTracker收集起来。这就意味着,计数器是全局的。

http://www.aboutyun.com/thread-7066-1-1.html

时间: 2024-10-10 01:21:45

关于Mapper、Reducer的个人总结(转)的相关文章

hadoop2.7之Mapper/reducer源码分析

一切从示例程序开始: 示例程序 Hadoop2.7 提供的示例程序WordCount.java package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.

Mapper类/Reducer类中的setup方法和cleanup方法以及run方法的介绍

在hadoop的源码中,基类Mapper类和Reducer类中都是只包含四个方法:setup方法,cleanup方法,run方法,map方法.如下所示: 其方法的调用方式是在run方法中,如下所示: 可以看出,在run方法中调用了上面的三个方法:setup方法,map方法,cleanup方法.其中setup方法和cleanup方法默认是不做任何操作,且它们只被执行一次.但是setup方法一般会在map函数之前执行一些准备工作,如作业的一些配置信息等:cleanup方法则是在map方法运行完之后最

hadoop之mapper类妙用

1. Mapper类 首先 Mapper类有四个方法: (1) protected void setup(Context context) (2) Protected void map(KEYIN key,VALUEIN value,Context context) (3) protected void cleanup(Context context) (4) public void run(Context context) setup()方法一般用来加载一些初始化的工作,像全局文件\建立数据库

python操作mongodb之二聚合查询

#聚合查询 from pymongo import MongoClient db = MongoClient('mongodb://10.0.0.9:27017/').aggregation_example #准备数据 result = db.things.insert_many([{"x": 1, "tags": ["dog", "cat"]}, {"x": 2, "tags": [&

MapReduce源码分析总结

转自:http://www.cnblogs.com/forfuture1978/archive/2010/11/19/1882279.html 转者注:本来想在Hadoop学习总结系列详细解析HDFS以及Map-Reduce的,然而查找资料的时候,发现了这篇文章,并且发现caibinbupt已经对Hadoop的源代码已经进行了详细的分析,推荐大家阅读. 转自http://blog.csdn.net/HEYUTAO007/archive/2010/07/10/5725379.aspx 参考: 1

Lucene + Hadoop 分布式搜索运行框架 Nut 1.0a9转自http://www.linuxidc.com/Linux/2012-02/53113.htm

1.概述 不管程序性能有多高,机器处理能力有多强,都会有其极限.能够快速方便的横向与纵向扩展是Nut设计最重要的原则,以此原则形成以分布式并行计算为核心的架构设计.以分布式并行计算为核心的架构设计是Nut区别于Solr.Katta的地方. Nut是一个Lucene+Hadoop分布式并行计算搜索框架,能对千G以上索引提供7*24小时搜索服务.在服务器资源足够的情况下能达到每秒处理100万次的搜索请求. Nut开发环境:jdk1.6.0.23+lucene3.0.3+eclipse3.6.1+ha

使用Storm实现实时大数据分析

摘要:随着数据体积的越来越大,实时处理成为了许多机构需要面对的首要挑战.Shruthi Kumar和Siddharth Patankar在Dr.Dobb’s上结合了汽车超速监视,为我们演示了使用Storm进行实时大数据分析.CSDN在此编译.整理. 简单和明了,Storm让大数据分析变得轻松加愉快. 当今世界,公司的日常运营经常会生成TB级别的数据.数据来源囊括了互联网装置可以捕获的任何类型数据,网站.社交媒体.交易型商业数据以及其它商业环境中创建的数据.考虑到数据的生成量,实时处理成为了许多机

大数据学习笔记3--HDFS扩展和mapreduce工作过程

HDFS配置: 客户端中的配置参数可以覆盖服务端的参数. 例如:副本数,切块大小 HDFS文件存储: 服务端存储block的实际大小,但是不适合存储小文件,小文件会占用namenode的元数据空间. 对于小文件数据的优化,可以在上传之前先合并再上传. 例如:压缩.文本文件合并 HDFS扩展: hdfs支持rest API,与平台无关 jetty 容器 hdfs支持rest command 分布式任务传统方式: 任务资源分发 jar配置文件...硬件资源的分配 任务在各个任务节点上设置运行环境,启

基于mapreducer的图算法

作者系阿里巴巴集团1688技术部普通码农 引言 周末看到一篇不错的文章"Graph Twiddling in a MapReduce world" ,介绍MapReduce下一些图算法的实现.文章语言质朴,介绍很多实用图优化技巧.文章2009年发表,至今已经被引用183次,足以证明这篇文章价值.目前这篇文章网上已经有人对这篇文章做了介绍,但仅介绍了其中最简单的两个算法,对其中的所做优化,并没有做分析.为了加深对文章算法的理解,我重新对这篇文章的算法做了翻译,同时加了自己的理解,以及算法

Hadoop Streaming 编程

1.概述 Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer,例如: 采用shell脚本语言中的一些命令作为mapper和reducer(cat作为mapper,wc作为reducer) $HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar \ -input myInputDirs \ -outpu