在Task运行过程分析3——MapTask内部实现中,我们分析了MapTask的Collect阶段,并且解读了环形缓冲区使得MapTask的Collect阶段和Spill阶段可并行执行。。。接下来分析Spill阶段和Combine阶段。。。
Spill过程分析
Spill过程由SpillThread线程完成,SpillThread线程实际上是缓冲区kvbuffer的消费者
protected class SpillThread extends Thread {
@Override
public void run() {
spillLock.lock();
spillThreadRunning = true;
try {
while (true) {
spillDone.signal();
while (kvstart == kvend) {
spillReady.await();
}
try {
spillLock.unlock();
sortAndSpill();//排序,然后将缓冲区kvbuffer中的数据写到磁盘上
} catch (Exception e) {
sortSpillException = e;
} catch (Throwable t) {
sortSpillException = t;
String logMsg = "Task " + getTaskID() + " failed : "
+ StringUtils.stringifyException(t);
reportFatalError(getTaskID(), t, logMsg);
} finally {
spillLock.lock();
if (bufend < bufindex && bufindex < bufstart) {//重置各个指针,以便为下一次溢写做准备
bufvoid = kvbuffer.length;
}
kvstart = kvend;
bufstart = bufend;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
spillLock.unlock();
spillThreadRunning = false;
}
}
}
调用sortAndSpill()
private void sortAndSpill() throws IOException, ClassNotFoundException,
InterruptedException {
//approximate the length of the output file to be the length of the
//buffer + header lengths for the partitions
long size = (bufend >= bufstart
? bufend - bufstart
: (bufvoid - bufend) + bufstart) +
partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
try {
// create spill file
final SpillRecord spillRec = new SpillRecord(partitions);
final Path filename =
mapOutputFile.getSpillFileForWrite(numSpills, size);
out = rfs.create(filename);
final int endPosition = (kvend > kvstart)
? kvend
: kvoffsets.length + kvend;
//按照partition的顺序对buffer中的数据进行排序
sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
int spindex = kvstart;
IndexRecord rec = new IndexRecord();
InMemValBytes value = new InMemValBytes();
//依次一个一个parition的写入文件
for (int i = 0; i < partitions; ++i) {
IFile.Writer<K, V> writer = null;
try {
long segmentStart = out.getPos();
writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
spilledRecordsCounter);
//如果combiner为空,则直接写入文件
if (combinerRunner == null) {
// spill directly
DataInputBuffer key = new DataInputBuffer();
while (spindex < endPosition &&
kvindices[kvoffsets[spindex % kvoffsets.length]
+ PARTITION] == i) {
final int kvoff = kvoffsets[spindex % kvoffsets.length];
getVBytesForOffset(kvoff, value);
key.reset(kvbuffer, kvindices[kvoff + KEYSTART],
(kvindices[kvoff + VALSTART] -
kvindices[kvoff + KEYSTART]));
writer.append(key, value);
++spindex;
}
} else {
int spstart = spindex;
while (spindex < endPosition &&
kvindices[kvoffsets[spindex % kvoffsets.length]
+ PARTITION] == i) {
++spindex;
}
// Note: we would like to avoid the combiner if we‘ve fewer
// than some threshold of records for a partition
if (spstart != spindex) {
combineCollector.setWriter(writer);
RawKeyValueIterator kvIter =
new MRResultIterator(spstart, spindex);
combinerRunner.combine(kvIter, combineCollector);
}
}
// close the writer
writer.close();
// record offsets
rec.startOffset = segmentStart;
rec.rawLength = writer.getRawLength();
rec.partLength = writer.getCompressedLength();
spillRec.putIndex(rec, i);
writer = null;
} finally {
if (null != writer) writer.close();
}
}
if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
// create spill index file
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, job);
} else {
indexCacheList.add(spillRec);
totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
LOG.info("Finished spill " + numSpills);
++numSpills;
} finally {
if (out != null) out.close();
}
}
线程SpillThread调用函数sortAndSpill()将环形缓冲区kvbuffer中区间[bufstart,bufend)内的数据写到磁盘上。函数sortAndSpill()内部工作流程如下:
步骤1:利用快速排序算法对缓冲区kvbuffer中区间[bufstart,bufend)内的数据进行排序,排序方式是,先按照分区编号partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有的数据按照key有序
sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
调用org.apache.hadoop.util.IndexedSorter.java中的sort方法,IndexedSorter是一个接口,查看它的实现类org.apache.hadoop.util.QuickSort.java中的sort方法
public void sort(final IndexedSortable s, int p, int r,
final Progressable rep) {
sortInternal(s, p, r, rep, getMaxDepth(r - p));
}
调用org.apache.hadoop.util.QuickSort.java中的sortInternal方法
private static void sortInternal(final IndexedSortable s, int p, int r,
final Progressable rep, int depth) {
if (null != rep) {
rep.progress();
}
while (true) {
if (r-p < 13) {
for (int i = p; i < r; ++i) {
for (int j = i; j > p && s.compare(j-1, j) > 0; --j) {
s.swap(j, j-1);
}
}
return;
}
if (--depth < 0) {
// give up
alt.sort(s, p, r, rep);
return;
}
// select, move pivot into first position
fix(s, (p+r) >>> 1, p);
fix(s, (p+r) >>> 1, r - 1);
fix(s, p, r-1);
// Divide
int i = p;
int j = r;
int ll = p;
int rr = r;
int cr;
while(true) {
while (++i < j) {
if ((cr = s.compare(i, p)) > 0) break;
if (0 == cr && ++ll != i) {
s.swap(ll, i);
}
}
while (--j > i) {
if ((cr = s.compare(p, j)) > 0) break;
if (0 == cr && --rr != j) {
s.swap(rr, j);
}
}
if (i < j) s.swap(i, j);
else break;
}
j = i;
// swap pivot- and all eq values- into position
while (ll >= p) {
s.swap(ll--, --i);
}
while (rr < r) {
s.swap(rr++, j++);
}
// Conquer
// Recurse on smaller interval first to keep stack shallow
assert i != j;
if (i - p < r - j) {
sortInternal(s, p, i, rep, depth);
p = j;
} else {
sortInternal(s, j, r, rep, depth);
r = i;
}
}
}
快速排序是应用最广泛的排序算法之一。它的基本思想是,选择序列中的一个元素作为枢轴,将小于枢轴的元素放在左边,将大于枢轴的元素放在右边,针对左右两个子序列重复此过程,直到序列为空或者只剩下一个元素。
在《算法导论》一书中,给出了一个教科书式的快速排序实现算法,它的实现方法是:选择序列的最后一个元素作为枢轴,并使用一个索引由前往后遍历整个序列,将小于枢轴的元素交换到左边,大于枢轴的元素交换到右边,直到序列为空或者只剩下一个元素。
Hadoop实现的快速排序在该快速排序之上进行了一下优化。
(1)枢轴选择
枢轴的选择好坏直接影响到快速排序的性能,而最坏的情况是划分过程中始终产生两个极端不对称的子序列(有一个长度为1,另一个长度为n-1),此时排序算法复杂度将增为O(N*2)。减小出现划分严重不对称的可能性,Hadoop将序列的收尾和中间元素中的中位数作为枢轴。
(2)子序列划分方法
Hadoop使用了两个索引 i 和 j 分别从左右两端进行扫描序列,并让索引 i 扫描到大于等于枢轴的元素停止,索引 j 扫描到小于等于枢轴的元素停止,然后交换两个元素,重复这个过程直到两个索引相遇。
(3)对相同元素的优化
在每次划分子序列时,将与枢轴相同的元素集中存放到中间位置,让它们不再参与后续的递归处理,即将序列划分成三部分:小于枢轴、等于枢轴和大于枢轴。
(4)减少递归次数
当子序列中元素数目小于13时,直接使用插入排序算法,不再继续递归。
而比较对象大小的方法是在org.apache.hadoop.mapred.MapTask中的内部类MapOutputBuffer覆写的compare方法
/**
* Compare logical range, st i, j MOD offset capacity.
* Compare by partition, then by key.
* @see IndexedSortable#compare
*/
public int compare(int i, int j) {
final int ii = kvoffsets[i % kvoffsets.length];
final int ij = kvoffsets[j % kvoffsets.length];
// sort by partition
if (kvindices[ii + PARTITION] != kvindices[ij + PARTITION]) {
return kvindices[ii + PARTITION] - kvindices[ij + PARTITION];
}
// sort by key
return comparator.compare(kvbuffer,
kvindices[ii + KEYSTART],
kvindices[ii + VALSTART] - kvindices[ii + KEYSTART],
kvbuffer,
kvindices[ij + KEYSTART],
kvindices[ij + VALSTART] - kvindices[ij + KEYSTART]);
}
/**
* Swap logical indices st i, j MOD offset capacity.
* @see IndexedSortable#swap
*/
public void swap(int i, int j) {
i %= kvoffsets.length;
j %= kvoffsets.length;
int tmp = kvoffsets[i];
kvoffsets[i] = kvoffsets[j];
kvoffsets[j] = tmp;
}
步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中,如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存中索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中
Combine过程分析
在org.apache.hadoop.mapred.MapTask.java的runNewMapper方法中,在运行完mapper任务,会调用RecordWriter.close函数
output.close(mapperContext);
其实是调用RecordWrite的实现类org.apache.hadoop.mapred.MapTask的内部类NewOutputCollect的close方法
@Override
public void close(TaskAttemptContext context
) throws IOException,InterruptedException {
try {
collector.flush();
} catch (ClassNotFoundException cnf) {
throw new IOException("can‘t find class ", cnf);
}
collector.close();
}
调用MapOutputCollector.flush()方法,查看MapOutputCollector的实现类org.apache.hadoop.mapred.MapTask.java中的内部类MapOutputBuffer的flush方法
public synchronized void flush() throws IOException, ClassNotFoundException,
InterruptedException {
LOG.info("Starting flush of map output");
spillLock.lock();
try {
while (kvstart != kvend) {
reporter.progress();
spillDone.await();
}
if (sortSpillException != null) {
throw (IOException)new IOException("Spill failed"
).initCause(sortSpillException);
}
if (kvend != kvindex) {
kvend = kvindex;
bufend = bufmark;
sortAndSpill();
}
} catch (InterruptedException e) {
throw (IOException)new IOException(
"Buffer interrupted while waiting for the writer"
).initCause(e);
} finally {
spillLock.unlock();
}
assert !spillLock.isHeldByCurrentThread();
// shut down spill thread and wait for it to exit. Since the preceding
// ensures that it is finished with its work (and sortAndSpill did not
// throw), we elect to use an interrupt instead of setting a flag.
// Spilling simultaneously from this thread while the spill thread
// finishes its work might be both a useful way to extend this and also
// sufficient motivation for the latter approach.
try {
spillThread.interrupt();
spillThread.join();
} catch (InterruptedException e) {
throw (IOException)new IOException("Spill failed"
).initCause(e);
}
// release sort buffer before the merge
kvbuffer = null;
mergeParts();
Path outputPath = mapOutputFile.getOutputFile();
fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
}
调用org.apache.hadoop.mapred.MapTask.java中的内部类MapOutputBuffer的mergeParts方法
private void mergeParts() throws IOException, InterruptedException,
ClassNotFoundException {
// get the approximate size of the final output/index files
long finalOutFileSize = 0;
long finalIndexFileSize = 0;
final Path[] filename = new Path[numSpills];
final TaskAttemptID mapId = getTaskID();
for(int i = 0; i < numSpills; i++) {
//通过spill文件的编号获取到指定的spill文件路径
filename[i] = mapOutputFile.getSpillFile(i);
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
}
//合并输出有俩文件一个是output/file.out,一个是output/file.out.index
if (numSpills == 1) { //the spill is the final output
rfs.rename(filename[0],
new Path(filename[0].getParent(), "file.out"));
if (indexCacheList.size() == 0) {
rfs.rename(mapOutputFile.getSpillIndexFile(0),
new Path(filename[0].getParent(),"file.out.index"));
} else {//写入文件
indexCacheList.get(0).writeToFile(
new Path(filename[0].getParent(),"file.out.index"), job);
}
return;
}
// read in paged indices
for (int i = indexCacheList.size(); i < numSpills; ++i) {
Path indexFileName = mapOutputFile.getSpillIndexFile(i);
indexCacheList.add(new SpillRecord(indexFileName, job, null));
}
//make correction in the length to include the sequence file header
//lengths for each partition
finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
Path finalOutputFile =
mapOutputFile.getOutputFileForWrite(finalOutFileSize);//output/file.out
Path finalIndexFile =
mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);//output/file.out.index
//The output stream for the final single output file
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
if (numSpills == 0) {
//create dummy files
IndexRecord rec = new IndexRecord();
SpillRecord sr = new SpillRecord(partitions);
try {
for (int i = 0; i < partitions; i++) {
long segmentStart = finalOut.getPos();
Writer<K, V> writer =
new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null);
writer.close();
rec.startOffset = segmentStart;
rec.rawLength = writer.getRawLength();
rec.partLength = writer.getCompressedLength();
sr.putIndex(rec, i);
}
sr.writeToFile(finalIndexFile, job);
} finally {
finalOut.close();
}
return;
}
{
IndexRecord rec = new IndexRecord();
final SpillRecord spillRec = new SpillRecord(partitions);
//对于每一个partition。finalOut最终输出文件。循环分区获得所有spill文件的该分区数据,合并写入finalOut
for (int parts = 0; parts < partitions; parts++) {
//create the segments to be merged
List<Segment<K,V>> segmentList =
new ArrayList<Segment<K, V>>(numSpills);
//依次从各个spill文件中收集属于当前partition的段
for(int i = 0; i < numSpills; i++) {
IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
Segment<K,V> s =
new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,
indexRecord.partLength, codec, true);
segmentList.add(i, s);
if (LOG.isDebugEnabled()) {
LOG.debug("MapId=" + mapId + " Reducer=" + parts +
"Spill =" + i + "(" + indexRecord.startOffset + "," +
indexRecord.rawLength + ", " + indexRecord.partLength + ")");
}
}
//merge
@SuppressWarnings("unchecked")
//将属于同一个partition的段merge到一起
RawKeyValueIterator kvIter = Merger.merge(job, rfs,
keyClass, valClass, codec,
segmentList, job.getInt("io.sort.factor", 100),
new Path(mapId.toString()),
job.getOutputKeyComparator(), reporter,
null, spilledRecordsCounter);
//write merged output to disk
//写入合并后的段到文件
long segmentStart = finalOut.getPos();
Writer<K, V> writer =
new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
spilledRecordsCounter);
if (combinerRunner == null || numSpills < minSpillsForCombine) {
Merger.writeFile(kvIter, writer, reporter, job);
} else {
combineCollector.setWriter(writer);
//其实写入数据的还是这里的writer类的append方法,这的输出是output/file.out文件,是合并后的文件
combinerRunner.combine(kvIter, combineCollector);
}
//close
writer.close();
// record offsets
rec.startOffset = segmentStart;
rec.rawLength = writer.getRawLength();
rec.partLength = writer.getCompressedLength();
spillRec.putIndex(rec, parts);
}
spillRec.writeToFile(finalIndexFile, job);//写入索引文件
finalOut.close();//合并后的输出文件
for(int i = 0; i < numSpills; i++) {
rfs.delete(filename[i],true);
}
}
}
当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。
在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式:每轮合并io.sort.factor(默认为100)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。
让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。。。
接下来分析ReduceTask内部实现。。。