大数据 : Hadoop reduce阶段

Mapreduce中由于sort的存在,MapTask和ReduceTask直接是工作流的架构。而不是数据流的架构。在MapTask尚未结束,其输出结果尚未排序及合并前,ReduceTask是又有数据输入的,因此即使ReduceTask已经创建也只能睡眠等待MapTask完成。从而可以从MapTask节点获取数据。一个MapTask最终的数据输出是一个合并的spill文件,可以通过Web地址访问。所以reduceTask一般在MapTask快要完成的时候才启动。启动早了浪费container资源。

ReduceTask是个线程,这个线程运行在YarnChild的Java虚拟机上,我们从ReduceTask.run开始看Reduce阶段。      获取更多大数据视频资料请加QQ群:947967114

public void run(JobConf job, final TaskUmbilicalProtocol umbilical)

throws IOException, InterruptedException, ClassNotFoundException {

job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

if (isMapOrReduce()) {

/*添加reduce过程需要经过的几个阶段。以便通知TaskTracker目前运 行的情况*/

copyPhase = getProgress().addPhase("copy");

sortPhase = getProgress().addPhase("sort");

reducePhase = getProgress().addPhase("reduce");

}

// start thread that will handle communication with parent

TaskReporter reporter = startReporter(umbilical);

// 设置并启动reporter进程以便和TaskTracker进行交流

boolean useNewApi = job.getUseNewReducer();

//在job client中初始化job时,默认就是用新的API,详见Job.setUseNewAPI()方法

initialize(job, getJobID(), reporter, useNewApi);

/*用来初始化任务,主要是进行一些和任务输出相关的设置,比如创建commiter,设置工作目录等*/

// check if it is a cleanupJobTask

/*以下4个if语句均是根据任务类型的不同进行相应的操作,这些方 法均是Task类的方法,所以与任务是MapTask还是ReduceTask无关*/

if (jobCleanup) {

runJobCleanupTask(umbilical, reporter);

return;//只是为了JobCleanup,做完就停

}

if () {

runJobSetupTask(umbilical, reporter);

return;

//主要是创建工作目录的FileSystem对象

}

if (taskCleanup) {

runTaskCleanupTask(umbilical, reporter);

return;

//设置任务目前所处的阶段为结束阶段,并且删除工作目录

}

下面才是真正要成为reducer

// Initialize the codec

codec = initCodec();

RawKeyValueIterator rIter = null;

ShuffleConsumerPlugin shuffleConsumerPlugin = null;

Class combinerClass = conf.getCombinerClass();

CombineOutputCollector combineCollector =

(null != combinerClass) ?

new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;

//如果需要就创建combineCollector

Classextends ShuffleConsumerPlugin> clazz =

job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);

//配置文件找mapreduce.job.reduce.shuffle.consumer.plugin.class默认是shuffle.class

shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);

//创建shuffle类对象

LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);

ShuffleConsumerPlugin.Context shuffleContext =

new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,

super.lDirAlloc, reporter, codec,

combinerClass, combineCollector,

spilledRecordsCounter, reduceCombineInputCounter,

shuffledMapsCounter,

reduceShuffleBytes, failedShuffleCounter,

mergedMapOutputsCounter,

taskStatus, copyPhase, sortPhase, this,

mapOutputFile, localMapFiles);

//创建context对象,ShuffleConsumerPlugin.Context

shuffleConsumerPlugin.init(shuffleContext);

//这里调用的起始是shuffle的init函数,重点摘要如下。

this.localMapFiles = context.getLocalMapFiles();

scheduler = new ShuffleSchedulerImpl(jobConf, taskStatus, reduceId,

this, copyPhase, context.getShuffledMapsCounter(),

context.getReduceShuffleBytes(), context.getFailedShuffleCounter());

//创建shuffle所需的调度器

merger = createMergeManager(context);

//创建shuffle内部的merge,createMergeManager里面源码:

return new MergeManagerImpl(reduceId, jobConf, context.getLocalFS(),

context.getLocalDirAllocator(), reporter, context.getCodec(),

context.getCombinerClass(), context.getCombineCollector(),

context.getSpilledRecordsCounter(),

context.getReduceCombineInputCounter(),

context.getMergedMapOutputsCounter(), this, context.getMergePhase(),

context.getMapOutputFile());

//创建MergeMnagerImpl对象和Merge线程

rIter = shuffleConsumerPlugin.run();

//从各个Mapper复制其输出文件,并加以合并排序,等待直到完成为止

// free up the data structures

mapOutputFilesOnDisk.clear();

sortPhase.complete();

//排序阶段完成

setPhase(TaskStatus.Phase.REDUCE);

//进入reduce阶段

statusUpdate(umbilical);

Class keyClass = job.getMapOutputKeyClass();

Class valueClass = job.getMapOutputValueClass();

RawComparator comparator = job.getOutputValueGroupingComparator();

//3.Reduce 1.Reduce任务的最后一个阶段。它会准备好Map的 keyClass("mapred.output.key.class""mapred.mapoutput.key.class"),valueClass("mapred.mapoutput.value.class"或"mapred.output.value.class")和 Comparator (“mapred.output.value.groupfn.class”或“mapred.output.key.comparator.class”)

if (useNewApi) {

//2.根据参数useNewAPI判断执行runNewReduce还是runOldReduce。分析润runNewReduce

runNewReducer(job, umbilical, reporter, rIter, comparator,

keyClass, valueClass);

//0.像报告进程书写一些信息,1.获得一个TaskAttemptContext对象。通过这个对象创建reduce、output及用于跟踪的统计output的RecordWrit、最后创建用于收集reduce结果的Context,2.reducer.run(reducerContext)开始执行reduce

} else {//老API

runOldReducer(job, umbilical, reporter, rIter, comparator,

keyClass, valueClass);

}

shuffleConsumerPlugin.close();

done(umbilical, reporter);

}

(1)reduce分为三个阶段(copy就是远程拷贝Map的输出数据、sort就是对所有的数据做排序、reduce做聚集就是我们自己写的reducer),为这三个阶段分别设置Progress,用来和TaskTracker通信报道状态。

(2)上面代码的15-40行和MapReduce的MapTask任务的运行源码级分析中对应部分基本相同,可参考之;

(3)codec = initCodec()这句是检查map的输出是否是压缩的,压缩的则返回压缩codec实例,否则返回null,这里讨论不压缩的;

(4)我们讨论完全分布式的hadoop,即isLocal==false,然后构造一个ReduceCopier对象reduceCopier,并调用reduceCopier.fetchOutputs()方法拷贝各个Mapper的输出,到本地;

(5)然后copy阶段完成,设置接下来的阶段是sort阶段,更新状态信息;

(6)根据isLocal来选择KV迭代器,完全分布式的会使用reduceCopier.createKVIterator(job, rfs, reporter)作为KV迭代器;

(7)sort阶段完成,设置接下来的阶段是reduce阶段,更新状态信息;

(8)然后获取一些配置信息,并根据是否使用新API选择不同的处理方式,这里是新的API,调用runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass)会执行reducer;

(9)done(umbilical, reporter)这个方法用于做结束任务的一些清理工作:更新计数器updateCounters();如果任务需要提交,设置Taks状态为COMMIT_PENDING,并利用TaskUmbilicalProtocol,汇报Task完成,等待提交,然后调用commit提交任务;设置任务结束标志位;结束Reporter通信线程;发送最后一次统计报告(通过sendLastUpdate方法);利用TaskUmbilicalProtocol报告结束状态(通过sendDone方法)。

有些人将Reduce Task分为了5个阶段:一、shuffle阶段:也称为Copy阶段,就是从各个MapTask上远程拷贝一片数据,如果大小超过一定阈值就写到磁盘,否则放入内存;二、Merge阶段:在远程拷贝数据的同时,Reduce Task启动了两个后台线程对内存和磁盘上的文件进行合并,防止内存使用过多和磁盘文件过多;三、sort阶段:用户编写的reduce方法的输入数据是按key进行聚集的,需要对copy过来的数据排序,这里用的是归并排序,因为Map Task的结果是有序的;四、Reduce阶段:将每组数据依次交给用户编写的Reduce方法处理;五、write阶段:就是将结果写入HDFS。

上面的5个阶段分的比较细了,代码里分为3个阶段copy、sort、reduce,我们在eclipse运行MR程序时,控制台看到的reduce阶段的百分比就分为3个阶段各占33.3%。

这里的shuffleConsumerPlugin是实现了ShuffleConsumerPlugin的某个类对象。具体可以通过配置文件mapreduce.job.reduce.shuffle.consumer.plugin.class选项设置,默认情况下是使用shuffle。我们在代码中分析过完成shuffleConsumerPlugin.run,通常是shuffle.run,因为有了这个过程Mapper的合成的spill文件才能通过HTTP协议传输到Reducer端。有了数据才能进行runNewReducer或者runOldReducer。可以说shuffle对象就是MapTask的搬运工。而且shuffle的搬运方式不是一遍搬运一遍Reducer处理,而是要把MapTask所有的数据都搬运过来,并且进行合并排序之后才开始提供给对应的Reducer。

一般而言,MapTask和ReduceTask是多对多的关系,假如有M个Mapper有N个Reducer。我们知道N个Reducer对应着N个partition,所以每个Mapper都会被划分成N个Partition,每个Reducer承担着一个Partition部分的操作。这样每一个Reducer从每个不同的Mapper内拿来属于自己的那部分数据,这样每个Reducer就有M份不同Mapper的数据,把M份数据合并在一起就是一个最终完整的Partition,有必要还会进行排序,这时候才成为了Reducer的具体输入数据。这个数据搬运和重组的过程被叫做shuffle过程。shuffle这个过程开销颇大,会占用较大的网络流量,因为涉及到大量数据的传输,shuffle过程也会有延迟,因为M个Mapper的计算有快有慢,但是shuffle要所有的Mapper完成才能开始,Reduce又必须等shuffle完成才能开始,当然这种延迟不是shuffle造成的,如果Reducer不需要全部Partition数据到位并排序,就不用与最慢的Mapper同步,这是排序付出的代价。

所以shuffle在MapReduce框架中起着非常重要的作用。我们先看shuffle的摘要: 获取更多大数据视频资料请加QQ群:947967114

public class Shuffle implements ShuffleConsumerPlugin, ExceptionReporter

private ShuffleConsumerPlugin.Context context;

private TaskAttemptID reduceId;

private JobConf jobConf;

private TaskUmbilicalProtocol umbilical;

private ShuffleSchedulerImpl scheduler;

private MergeManager merger;

private Task reduceTask; //Used for status updates

private Map localMapFiles;

public void init(ShuffleConsumerPlugin.Context context)

public RawKeyValueIterator run() throws IOException, InterruptedException

在ReduceTask.run中看到调用了shuffle.init,在run理创建了ShuffleSchedulerImpl和MergeManagerImpl对象。后面会讲解就是是做什么用的。

之后就是对shuffle.run的调用,shuffle虽然有一个run但是并非是一个线程,只是用了这个名字而已。

我们看:ReduceTask.run->Shuffle.run

public RawKeyValueIterator run() throws IOException, InterruptedException {

int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,

MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());

int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);

// Start the map-completion events fetcher thread

final EventFetcher eventFetcher =

new EventFetcher(reduceId, umbilical, scheduler, this,

maxEventsToFetch);

eventFetcher.start();

//通过查看EventFetcher我们看到他继承了Thread,所以他是一个线程

// Start the map-output fetcher threads

boolean isLocal = localMapFiles != null;

final int numFetchers = isLocal ? 1 :

jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);

Fetcher[] fetchers = new Fetcher[numFetchers];

//创建了一个线程池

if (isLocal) {

//如果Mapper和Reducer在同一台机器上,就在本地fetche

fetchers[0] = new LocalFetcher(jobConf, reduceId, scheduler,

merger, reporter, metrics, this, reduceTask.getShuffleSecret(),

localMapFiles);

//LocalFetcher是对Fetcher的扩展,也是线程。

fetchers[0].start();//本地Fecher只有一个

} else {

//Mapper集合Reducer不在同一个机器上,需要跨多个节点Fecher

for (int i=0; i < numFetchers; ++i) {

//启动所有的Fecher

fetchers[i] = new Fetcher(jobConf, reduceId, scheduler, merger,

reporter, metrics, this,

reduceTask.getShuffleSecret());

//创建Fecher线程

fetchers[i].start();

//跨节点的Fecher需要好多个,都需要开启

}

}

// Wait for shuffle to complete successfully

while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {

reporter.progress();

//等待所有的Fecher都完成,如果有超时情况就报告进度

synchronized (this) {

if (throwable != null) {

throw new ShuffleError("error in shuffle in " + throwingThreadName,

throwable);

}

}

}

// Stop the event-fetcher thread

eventFetcher.shutDown();

//关闭eventFetcher,代表shuffle操作完成,所有的MapTask的数据都拷贝过来了

// Stop the map-output fetcher threads

for (Fetcher fetcher : fetchers) {

fetcher.shutDown();//关闭所有的fetcher。

}

// stop the scheduler

scheduler.close();

//也不需要shuffle的调度,所以关闭

copyPhase.complete(); // copy is already complete

//文件复制阶段结束

以下就是Reduce阶段的MergeSort了

taskStatus.setPhase(TaskStatus.Phase.SORT);

//完成排序

reduceTask.statusUpdate(umbilical);

//通过umbilical向MRAppMaster汇报,更新状态

// Finish the on-going merges...

RawKeyValueIterator kvIter = null;

try {

kvIter = merger.close();

//合并和排序,完成后返回一个队列kvIter 。

} catch (Throwable e) {

throw new ShuffleError("Error while doing final merge " , e);

}

// Sanity check

synchronized (this) {

if (throwable != null) {

throw new ShuffleError("error in shuffle in " + throwingThreadName,

throwable);

}

}

return kvIter;

}

数据从MapTask转移到ReduceTask就两种方式,一MapTask送,二ReduceTask取,hadoop采用的是第二种方式,就是文件的复制。在Shuffle进入run之前,RduceTask.run调用过他的init函数shuffleConsumerPlugin.init(shuffleContext),在init里创建了scheduler和用于合并排序的merge,进入run后又创建了EventFetcher线程和若干个Fetcher线程。Fetcher的作用就是拿取,向MapTask节点提取数据。但是我们要清楚EventFetcher虽然也是Fetcher,但是提取的是event,不是数据本身。我们可以认为它只是对Fetcher过程的一个事件的控制。

Fetcher线程的数量也不一定,Uber模式下,MapTask和ReduceTask在同一个节点上,并且只有一个MapTask,所以只有一个Fetcher就能够完成,而且这个Fetcher是localFetcher。如果不是Uber模式可能会有很多MapTask并且一般和ReduceTask不在同一个节点。这时Fetcher的数量可以进行配置,默认有5个。数组fetchers就相当于Fetcher的线程池。

创建了EventFetcher和Fetcher线程池后,进入了while循环,但是while循环什么都不做,一直等待,所以实际的操作都是在线程完成的,也就是通过EventFetcher和若干的Fetcher完成。EventFetcher起到了非常关键的枢纽的作用。

我们查看EventFetcher的源代码摘要,我们提取关键的东西:

class EventFetcher extends Thread {

private final TaskAttemptID reduce;

private final TaskUmbilicalProtocol umbilical;

private final ShuffleScheduler scheduler;

private final int maxEventsToFetch;

public void run() {

int failures = 0;

LOG.info(reduce + " Thread started: " + getName());

try {

while (!stopped && !Thread.currentThread().isInterrupted()) {//线程没有被打断

try {

int numNewMaps = getMapCompletionEvents();

//获取Map的完成的事件,接着我们看getMapCompletionEvents源代码:

protected int getMapCompletionEvents()

throws IOException, InterruptedException {

int numNewMaps = 0;

TaskCompletionEvent events[] = null;

do {

MapTaskCompletionEventsUpdate update =

umbilical.getMapCompletionEvents(

(org.apache.hadoop.mapred.JobID)reduce.getJobID(),

fromEventIdx,

maxEventsToFetch,

(org.apache.hadoop.mapred.TaskAttemptID)reduce);

//汇报umbilical从MRAppMaster获取Map完成的时间的报告

events = update.getMapTaskCompletionEvents();

//获取有关具体的MapTask结束运行的情况

LOG.debug("Got " + events.length + " map completion events from " +

fromEventIdx);

assert !update.shouldReset() : "Unexpected legacy state";

//做了一个断言 获取更多大数据视频资料请加QQ群:947967114

// Update the last seen event ID

fromEventIdx += events.length;

// Process the TaskCompletionEvents:

// 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.

// 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop

// fetching from those maps.

// 3. Remove TIPFAILED maps from neededOutputs since we don‘t need their

// outputs at all.

for (TaskCompletionEvent event : events) {

//对于获取的每个事件的报告

scheduler.resolve(event);

//这里使用了ShuffleSchedullerImpl.resolve函数,源代码如下:

public void resolve(TaskCompletionEvent event) {

switch (event.getTaskStatus()) {

case SUCCEEDED://如果成功

URI u = getBaseURI(reduceId, event.getTaskTrackerHttp());//获取其URI

addKnownMapOutput(u.getHost() + ":" + u.getPort(),

u.toString(),

event.getTaskAttemptId());

//记录这个MapTask的节点主机记录下来,供Fetcher使用,getBaseURI的源代码:

static URI getBaseURI(TaskAttemptID reduceId, String url) {

StringBuffer baseUrl = new StringBuffer(url);

if (!url.endsWith("/")) {

baseUrl.append("/");

}

baseUrl.append("mapOutput?job=");

baseUrl.append(reduceId.getJobID());

baseUrl.append("&reduce=");

baseUrl.append(reduceId.getTaskID().getId());

baseUrl.append("&map=");

URI u = URI.create(baseUrl.toString());

return u;

获取各种信息,然后添加都URI对象中。

}

回到源代码

maxMapRuntime = Math.max(maxMapRuntime, event.getTaskRunTime());

//最大的尝试时间

break;

case FAILED:

case KILLED:

case OBSOLETE://如果MapTask运行失败

obsoleteMapOutput(event.getTaskAttemptId());//获取TaskId

LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +

" map-task: ‘" + event.getTaskAttemptId() + "‘");//写日志

break;

case TIPFAILED://如果失败

tipFailed(event.getTaskAttemptId().getTaskID());

LOG.info("Ignoring output of failed map TIP: ‘" +

event.getTaskAttemptId() + "‘");//写日志

break;

}

}

回到源代码

if (TaskCompletionEvent.Status.SUCCEEDED == event.getTaskStatus()) {//如果事件成功

++numNewMaps;//增加map数量

}

}

} while (events.length == maxEventsToFetch);

return numNewMaps;

}

回到源代码

failures = 0;

if (numNewMaps > 0) {

LOG.info(reduce + ": " + "Got " + numNewMaps + " new map-outputs");

}

LOG.debug("GetMapEventsThread about to sleep for " + SLEEP_TIME);

if (!Thread.currentThread().isInterrupted()) {

Thread.sleep(SLEEP_TIME);

}

} catch (InterruptedException e) {

LOG.info("EventFetcher is interrupted.. Returning");

return;

} catch (IOException ie) {

LOG.info("Exception in getting events", ie);

// check to see whether to abort

if (++failures >= MAX_RETRIES) {

throw new IOException("too many failures downloading events", ie);//失败数量大于重试的数量

}

// sleep for a bit

if (!Thread.currentThread().isInterrupted()) {

Thread.sleep(RETRY_PERIOD);

}

}

}

} catch (InterruptedException e) {

return;

} catch (Throwable t) {

exceptionReporter.reportException(t);

return;

}

}

MapTask和ReduceTask没有直接的关系,MapTask不知道ReduceTask在哪些节点上,它只是把进度的时间报告给MRAppMaster。ReduceTask通过“脐带”执行getMapCompletionEvents操作想MRAppMaster获取MapTask结束运行的时间报告。有个别的MapTask可能会失败,但是绝大多数都会成功,只要成功的就通过Fetcher去索取输出数据,这个信息就是通过shcheduler完成的也就是ShuffleSchedulerImpl对象,ShuffleSchedulerImpl对象并不多,只是个普通的对象。

fetchers就像线程池,里面有若干线程(默认有5个),这些线程等待EventFetcher的通知,一旦有MapTask完成就前往提取数据。

获取更多大数据视频资料请加QQ群:947967114

我们看Fetcher线程类的run方法:

public void run() {

try {

while (!stopped && !Thread.currentThread().isInterrupted()) {

MapHost host = null;

try {

// If merge is on, block

merger.waitForResource();

// Get a host to shuffle from

host = scheduler.getHost();

//从scheduler获取一个已经成功完成的MapTask的节点。

metrics.threadBusy();

//线程变成繁忙状态

// Shuffle

copyFromHost(host);

//开始复制这个节点的数据

} finally {

if (host != null) {//maphost还有运行中的

scheduler.freeHost(host);

//状态设置成空闲状态,等待其完成。

metrics.threadFree();

}

}

}

} catch (InterruptedException ie) {

return;

} catch (Throwable t) {

exceptionReporter.reportException(t);

}

}

这里的重点是copyFromHost获取数据的函数。

protected void copyFromHost(MapHost host) throws IOException {

// reset retryStartTime for a new host

//这是在ReduceTask的节点上运行的

retryStartTime = 0;

// Get completed maps on ‘host‘

List<TaskAttemptID> maps = scheduler.getMapsForHost(host);

//获取目标节点上的MapTask集合。

// Sanity check to catch hosts with only ‘OBSOLETE‘ maps,

// especially at the tail of large jobs

if (maps.size() == 0) {

return;//没有完成的直接返回

}

if(LOG.isDebugEnabled()) {

LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "

+ maps);

}

// List of maps to be fetched yet

Set remaining = new HashSet(maps);

//已经完成、等待shuffle的MapTask集合。

// Construct the url and connect

DataInputStream input = null;

URL url = getMapOutputURL(host, maps);

//生成MapTask所在节点的URL,下面要看getMapOutputURL源码:

private URL getMapOutputURL(MapHost host, Collection maps

) throws MalformedURLException {

// Get the base url

StringBuffer url = new StringBuffer(host.getBaseUrl());

boolean first = true;

for (TaskAttemptID mapId : maps) {

if (!first) {

url.append(",");

}

url.append(mapId);//在URL后面加上mapid

first = false;

}

LOG.debug("MapOutput URL for " + host + " -> " + url.toString());

//写日志

return new URL(url.toString());

//返回URL

}

回到主代码:

try {

setupConnectionsWithRetry(host, remaining, url);

//和对方主机建立HTTP连接,setupConnectionsWithRetry使用了openConnectionWithRetry函数打开链接。

openConnectionWithRetry(host, remaining, url);

这段源代码有使用了openConnection(url);方式,继续查看。

如下是链接的主要过程:

protected synchronized void openConnection(URL url)

throws IOException {

HttpURLConnection conn = (HttpURLConnection) url.openConnection();

//使用的是HTTPURL进行连接

if (sslShuffle) {//如果是有信任证书的

HttpsURLConnection httpsConn = (HttpsURLConnection) conn;

//强转conn类型

try {

httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());//添加一个证书socket的工厂

} catch (GeneralSecurityException ex) {

throw new IOException(ex);

}

httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());

}

connection = conn;

}

在setupConnectionsWithRetry中继续写到:

setupShuffleConnection(encHash);

//建立了Shuffle链接

connect(connection, connectionTimeout);

// verify that the thread wasn‘t stopped during calls to connect

if (stopped) {

return;

}

verifyConnection(url, msgToEncode, encHash);

}

//至此连接通过。

if (stopped) {

abortConnect(host, remaining);

//这里边是关闭连接,可以点进去看一下,满足列表和等待的两个条件

return;

}

} catch (IOException ie) {

boolean connectExcpt = ie instanceof ConnectException;

ioErrs.increment(1);

LOG.warn("Failed to connect to " + host + " with " + remaining.size() +

" map outputs", ie);

回到主代码

input = new DataInputStream(connection.getInputStream());

//实例一个输入流对象。

try {

// Loop through available map-outputs and fetch them

// On any error, faildTasks is not null and we exit

// after putting back the remaining maps to the

// yet_to_be_fetched list and marking the failed tasks.

TaskAttemptID[] failedTasks = null;

while (!remaining.isEmpty() && failedTasks == null) {

//如果需要fetcher的列表不空,并且失败的task数量没有

try {

failedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled);

//复制数据出来copyMapOutput的源代码如下:

try {

ShuffleHeader header = new ShuffleHeader();

header.readFields(input);

mapId = TaskAttemptID.forName(header.mapId);

//获取mapID

compressedLength = header.compressedLength;

decompressedLength = header.uncompressedLength;

forReduce = header.forReduce;

} catch (IllegalArgumentException e) {

badIdErrs.increment(1);

LOG.warn("Invalid map id ", e);

//Don‘t know which one was bad, so consider all of them as bad

return remaining.toArray(new TaskAttemptID[remaining.size()]);

}

InputStream is = input;

is = CryptoUtils.wrapIfNecessary(jobConf, is, compressedLength);

compressedLength -= CryptoUtils.cryptoPadding(jobConf);

decompressedLength -= CryptoUtils.cryptoPadding(jobConf);

//如果需要解压或解密

// Do some basic sanity verification

if (!verifySanity(compressedLength, decompressedLength, forReduce,

remaining, mapId)) {

return new TaskAttemptID[] {mapId};

}

if(LOG.isDebugEnabled()) {

LOG.debug("header: " + mapId + ", len: " + compressedLength +

", decomp len: " + decompressedLength);

}

try {

mapOutput = merger.reserve(mapId, decompressedLength, id);

//为merge预留一个MapOutput:是内存还是磁盘上。

} catch (IOException ioe) {

// kill this reduce attempt

ioErrs.increment(1);

scheduler.reportLocalError(ioe);

//报告错误

return EMPTY_ATTEMPT_ID_ARRAY;

}

// Check if we can shuffle *now* ...

if (mapOutput == null) {

LOG.info("fetcher#" + id + " - MergeManager returned status WAIT ...");

//Not an error but wait to process data.

return EMPTY_ATTEMPT_ID_ARRAY;

}

// The codec for lz0,lz4,snappy,bz2,etc. throw java.lang.InternalError

// on decompression failures. Catching and re-throwing as IOException

// to allow fetch failure logic to be processed

try {

// Go!

LOG.info("fetcher#" + id + " about to shuffle output of map "

+ mapOutput.getMapId() + " decomp: " + decompressedLength

+ " len: " + compressedLength + " to " + mapOutput.getDescription());

mapOutput.shuffle(host, is, compressedLength, decompressedLength,

metrics, reporter);

//跨节点把Mapper的文件内容拷贝到reduce的内存或者磁盘上。

} catch (java.lang.InternalError e) {

LOG.warn("Failed to shuffle for fetcher#"+id, e);

throw new IOException(e);

}

// Inform the shuffle scheduler

long endTime = Time.monotonicNow();

// Reset retryStartTime as map task make progress if retried before.

retryStartTime = 0;

scheduler.copySucceeded(mapId, host, compressedLength,

startTime, endTime, mapOutput);

//告诉调度器完成了一个节点的Map输出的文件拷贝。

remaining.remove(mapId);

//这个MapTask的输出已经shuffle完毕

metrics.successFetch();

return null;后面的异常失败信息我们不管。

这里的mapOutput是用来容纳MapTask输出文件的存储空间,根据输出文件的内容大小和内存的情况,可以是内存的Output也可以是DiskOutput。 如果是内存需要预约,因为不止一个Fetcher。我们以InMemoryMapOutput为例。

代码结构;

Fetcher.run-->copyFromHost-->copyMapOutput-->merger.reserve(MergeManagerImpl.reserve)-->InmemoryMapOutput.shuffle

public void shuffle(MapHost host, InputStream input,

long compressedLength, long decompressedLength,

ShuffleClientMetrics metrics,

Reporter reporter) throws IOException {

//跨节点从Mapper拷贝spill文件

IFileInputStream checksumIn =

new IFileInputStream(input, compressedLength, conf);

//校验和的输入流

input = checksumIn;

// Are map-outputs compressed?

if (codec != null) {

//如果涉及到了压缩

decompressor.reset();

//重启解压器

input = codec.createInputStream(input, decompressor);

//加了解压器的输入流

}

try {

IOUtils.readFully(input, memory, 0, memory.length);

//从Mapper方把特定的Partition数据读入Reducer的内存缓冲区。

metrics.inputBytes(memory.length);

reporter.progress();//汇报进度

LOG.info("Read " + memory.length + " bytes from map-output for " +

getMapId());

/**

* We‘ve gotten the amount of data we were expecting. Verify the

* decompressor has nothing more to offer. This action also forces the

* decompressor to read any trailing bytes that weren‘t critical

* for decompression, which is necessary to keep the stream

* in sync.

*/

if (input.read() >= 0 ) {

throw new IOException("Unexpected extra bytes from input stream for " +

getMapId());

}

} catch (IOException ioe) {

// Close the streams

IOUtils.cleanup(LOG, input);

// Re-throw

throw ioe;

} finally {

CodecPool.returnDecompressor(decompressor);

//释放解压器

}

}

从对方把spill文件中属于本partition数据复制过来,回到copyFromHost中,通过scheduler.copySuccessed告知scheduler,并把这个MapTask的ID从remaining集合中删除,进入下一个循环,复制下一个MapTask数据。直到把所有的属于本Partition的数据都复制过来。

以上是Reducer端Fetcher的过程,它向Mapper端发送HTTP GET请求,下载文件。在MapTask就有一个与之对应的Server,这个网络协议的源代码不做深究,课下有兴趣自己研究。 获取更多大数据视频资料请加QQ群:947967114

原文地址:https://www.cnblogs.com/aimakj/p/10006735.html

时间: 2024-10-03 14:57:11

大数据 : Hadoop reduce阶段的相关文章

王家林的云计算分布式大数据Hadoop企业级开发动手实践

一:课程简介: Hadoop是云计算分布式大数据的事实标准软件框架,Hadoop中的架构实现是整个云计算产业技术的基础,作为与Google三大核心技术DFS.MapReduce.BigTable相对的HDFS.MapReduce.和HBase也是整个Hadoop生态系统的核心的技术,本课程致力于帮您掌握这三大技术的同时掌握云计算的数据仓库挖掘技术Hive,助您在云计算技术时代自由翱翔. 二:课程特色 1,      深入浅出中动手实作: 2,      掌握Hadoop三大核心:HDFS.Map

王家林的云计算分布式大数据Hadoop征服之旅:HDFS&amp;MapReduce&amp;HBase&amp;Hive&amp;集群管理

一:课程简介: 作为云计算实现规范和实施标准的Hadoop恰逢其时的应运而生,使用Hadoop用户可以在不了解分布式底层细节的情况下开发出分布式程序,从而可以使用众多廉价的计算设备的集群的威力来高速的运算和存储,而且Hadoop的运算和存储是可靠的.高效,的.可伸缩的,能够使用普通的社区服务器出来PB级别的数据,是分布式大数据处理的存储的理想选择. 本课程会助你深入浅出的掌握Hadoop开发(包括HDFS.MapReduce.HBase.Hive等),并且在此基础上掌握Hadoop集群的配置.维

14周事情总结-机器人-大数据hadoop

14周随着考试的进行,其他该准备的事情也在并行的处理着,考试内容这里不赘述了 首先说下,关于机器人大赛的事情,受益颇多,机器人的制作需要机械和电控两方面 昨天参与舵机的测试,遇到的问题:舵机不动 排查顺序:1.程序(不过程序比较简单)不是这上面的问题 2.检查电路问题(电路设计,线比较脆弱,焊接的也不是很好,最主要不知道板子的完整性) 3.加了块电源可以动舵机了,但之后实验出现莫名的问题 4.最终解决办法:改变线,将舵机的线又连接杜邦线(12) -------------------------

自学it18大数据笔记-第一阶段Java-day16-day17-day18-day19--day20-day21-day22——会持续更新

转行大数据领域,没报班,自学试试,能坚持下来就以后好好做这行,不能就--!准备从现有这套it18的视屏残本开始--自学是痛苦的,发博客,算是监督自己,督促自己坚持学下去. (教学视屏是it18做活动送的,虽不全,但徐培成老师讲的真心不错,特此感谢it18掌--帮你们打打广告) 笔记为自学时记录,如有错误,欢迎指正,不胜感激! 笔记分享:自学it18大数据笔记-第一阶段Java-day16-day17-day18-day19--day20-day21-day22--会持续更新-- 第一阶段Java

成都大数据Hadoop与Spark技术培训班

成都大数据Hadoop与Spark技术培训班 中国信息化培训中心特推出了大数据技术架构及应用实战课程培训班,通过专业的大数据Hadoop与Spark技术架构体系与业界真实案例来全面提升大数据工程师.开发设计人员的工作水平,旨在培养专业的大数据Hadoop与Spark技术架构专家,更好地服务于各个行业的大数据项目开发和落地实施. 2015年近期公开课安排:(全国巡回开班) 08月21日——08月23日大连 09月23日——09月25日北京 10月16日——10月18日成都 11月27日——11月2

自学it18大数据笔记-第一阶段Java-day09-day10-day11-day12-day13-day14-day15

转行大数据领域,没报班,自学试试,能坚持下来就以后好好做这行,不能就--!准备从现有这套it18的视屏残本开始--自学是痛苦的,发博客,算是监督自己,督促自己坚持学下去. (教学视屏是it18做活动送的,虽不全,但徐培成老师讲的真心不错,特此感谢it18掌--帮你们打打广告) 笔记为自学时记录,如有错误,欢迎指正,不胜感激! 笔记分享:自学it18大数据笔记-第一阶段Java-day09-day10-day11-day12--day13-day14-day15--会持续更新-- 第一阶段Java

大数据Hadoop最佳实践(V3)

一:课程简介: Hadoop是当下云计算大数据的王者. Hadoop不仅是一个大数据的计算框架,同时也是大数据的存储平台. 使用Hadoop,用户可以在不了解分布式底层细节的情况下开发出分布式程序,从而可以使用众多廉价的计算设备的集群的威力来高速的运算和存储,而且Hadoop的运算和存储是可靠的.高效的.可伸缩的,能够使用普通的社区服务器出来PB级别的数据,是分布式大数据处理的存储的理想选择 使用Hadoop可以主要完成: 1,构建离线处理平台,完成海量离线数据的存储分析,相对于传统的关系型数据

自学it18大数据笔记-第一阶段Java-day05-day06-day07-day08

转行大数据领域,没报班,自学试试,能坚持下来就以后好好做这行,不能就--!准备从现有这套it18的视屏残本开始--自学是痛苦的,发博客,算是监督自己,督促自己坚持学下去. (教学视屏是it18做活动送的,虽不全,但徐培成老师讲的真心不错,特此感谢it18掌--帮你们打打广告) 笔记为自学时记录,如有错误,欢迎指正,不胜感激! 笔记分享:自学it18大数据笔记-第一阶段Java-day05-day06-day07-day08--会持续更新-- 第一阶段Java-day03-day04见:http:

自学it18大数据笔记-第一阶段Java-day03-day04

转行大数据领域,没报班,自学试试,能坚持下来就以后好好做这行,不能就--!准备从现有这套it18的视屏残本开始--自学是痛苦的,发博客,算是监督自己,督促自己坚持学下去. (教学视屏是it18做活动送的,虽不全,但徐培成老师讲的真心不错,特此感谢it18掌--帮你们打打广告) 笔记为自学时记录,如有错误,欢迎指正,不胜感激! 笔记分享:自学it18大数据笔记-第一阶段Java-day03-day04--会持续更新-- day03笔记见下图: day04笔记见下图:

搭建大数据hadoop完全分布式环境遇到的坑

搭建大数据hadoop完全分布式环境,遇到很多问题,这里记录一部分,以备以后查看. 1.在安装配置完hadoop以后,需要格式化namenode,输入指令:hadoop namenode -format,报错误信息:hadoop: command not found 本文网址:https://www.cnblogs.com/SH170706/p/10312667.html 2. 原文地址:https://www.cnblogs.com/SH170706/p/10312678.html