Task运行过程分析4——Map Task内部实现2

Task运行过程分析3——MapTask内部实现中,我们分析了MapTask的Collect阶段,并且解读了环形缓冲区使得MapTask的Collect阶段和Spill阶段可并行执行。。。接下来分析Spill阶段和Combine阶段。。。

Spill过程分析

Spill过程由SpillThread线程完成,SpillThread线程实际上是缓冲区kvbuffer的消费者

  protected class SpillThread extends Thread {

      @Override
      public void run() {
        spillLock.lock();
        spillThreadRunning = true;
        try {
          while (true) {
            spillDone.signal();
            while (kvstart == kvend) {
              spillReady.await();
            }
            try {
              spillLock.unlock();
              sortAndSpill();//排序,然后将缓冲区kvbuffer中的数据写到磁盘上
            } catch (Exception e) {
              sortSpillException = e;
            } catch (Throwable t) {
              sortSpillException = t;
              String logMsg = "Task " + getTaskID() + " failed : "
                              + StringUtils.stringifyException(t);
              reportFatalError(getTaskID(), t, logMsg);
            } finally {
              spillLock.lock();
              if (bufend < bufindex && bufindex < bufstart) {//重置各个指针,以便为下一次溢写做准备
                bufvoid = kvbuffer.length;
              }
              kvstart = kvend;
              bufstart = bufend;
            }
          }
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        } finally {
          spillLock.unlock();
          spillThreadRunning = false;
        }
      }
    }

调用sortAndSpill()

   private void sortAndSpill() throws IOException, ClassNotFoundException,
                                       InterruptedException {
      //approximate the length of the output file to be the length of the
      //buffer + header lengths for the partitions
      long size = (bufend >= bufstart
          ? bufend - bufstart
          : (bufvoid - bufend) + bufstart) +
                  partitions * APPROX_HEADER_LENGTH;
      FSDataOutputStream out = null;
      try {
        // create spill file
        final SpillRecord spillRec = new SpillRecord(partitions);
        final Path filename =
            mapOutputFile.getSpillFileForWrite(numSpills, size);
        out = rfs.create(filename);

        final int endPosition = (kvend > kvstart)
          ? kvend
          : kvoffsets.length + kvend;
         //按照partition的顺序对buffer中的数据进行排序
        sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
        int spindex = kvstart;
        IndexRecord rec = new IndexRecord();
        InMemValBytes value = new InMemValBytes();
        //依次一个一个parition的写入文件
        for (int i = 0; i < partitions; ++i) {
          IFile.Writer<K, V> writer = null;
          try {
            long segmentStart = out.getPos();
            writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
                                      spilledRecordsCounter);
            //如果combiner为空,则直接写入文件
            if (combinerRunner == null) {
              // spill directly
              DataInputBuffer key = new DataInputBuffer();
              while (spindex < endPosition &&
                  kvindices[kvoffsets[spindex % kvoffsets.length]
                            + PARTITION] == i) {
                final int kvoff = kvoffsets[spindex % kvoffsets.length];
                getVBytesForOffset(kvoff, value);
                key.reset(kvbuffer, kvindices[kvoff + KEYSTART],
                          (kvindices[kvoff + VALSTART] -
                           kvindices[kvoff + KEYSTART]));
                writer.append(key, value);
                ++spindex;
              }
            } else {
              int spstart = spindex;
              while (spindex < endPosition &&
                  kvindices[kvoffsets[spindex % kvoffsets.length]
                            + PARTITION] == i) {
                ++spindex;
              }
              // Note: we would like to avoid the combiner if we‘ve fewer
              // than some threshold of records for a partition
              if (spstart != spindex) {
                combineCollector.setWriter(writer);
                RawKeyValueIterator kvIter =
                  new MRResultIterator(spstart, spindex);
                combinerRunner.combine(kvIter, combineCollector);
              }
            }

            // close the writer
            writer.close();

            // record offsets
            rec.startOffset = segmentStart;
            rec.rawLength = writer.getRawLength();
            rec.partLength = writer.getCompressedLength();
            spillRec.putIndex(rec, i);

            writer = null;
          } finally {
            if (null != writer) writer.close();
          }
        }

        if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
          // create spill index file
          Path indexFilename =
              mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
                  * MAP_OUTPUT_INDEX_RECORD_LENGTH);
          spillRec.writeToFile(indexFilename, job);
        } else {
          indexCacheList.add(spillRec);
          totalIndexCacheMemory +=
            spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
        }
        LOG.info("Finished spill " + numSpills);
        ++numSpills;
      } finally {
        if (out != null) out.close();
      }
    }

线程SpillThread调用函数sortAndSpill()将环形缓冲区kvbuffer中区间[bufstart,bufend)内的数据写到磁盘上。函数sortAndSpill()内部工作流程如下:

步骤1:利用快速排序算法对缓冲区kvbuffer中区间[bufstart,bufend)内的数据进行排序,排序方式是,先按照分区编号partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有的数据按照key有序

 sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);

调用org.apache.hadoop.util.IndexedSorter.java中的sort方法,IndexedSorter是一个接口,查看它的实现类org.apache.hadoop.util.QuickSort.java中的sort方法

 public void sort(final IndexedSortable s, int p, int r,
      final Progressable rep) {
    sortInternal(s, p, r, rep, getMaxDepth(r - p));
  }

调用org.apache.hadoop.util.QuickSort.java中的sortInternal方法

 private static void sortInternal(final IndexedSortable s, int p, int r,
      final Progressable rep, int depth) {
    if (null != rep) {
      rep.progress();
    }
    while (true) {
    if (r-p < 13) {
      for (int i = p; i < r; ++i) {
        for (int j = i; j > p && s.compare(j-1, j) > 0; --j) {
          s.swap(j, j-1);
        }
      }
      return;
    }
    if (--depth < 0) {
      // give up
      alt.sort(s, p, r, rep);
      return;
    }

    // select, move pivot into first position
    fix(s, (p+r) >>> 1, p);
    fix(s, (p+r) >>> 1, r - 1);
    fix(s, p, r-1);

    // Divide
    int i = p;
    int j = r;
    int ll = p;
    int rr = r;
    int cr;
    while(true) {
      while (++i < j) {
        if ((cr = s.compare(i, p)) > 0) break;
        if (0 == cr && ++ll != i) {
          s.swap(ll, i);
        }
      }
      while (--j > i) {
        if ((cr = s.compare(p, j)) > 0) break;
        if (0 == cr && --rr != j) {
          s.swap(rr, j);
        }
      }
      if (i < j) s.swap(i, j);
      else break;
    }
    j = i;
    // swap pivot- and all eq values- into position
    while (ll >= p) {
      s.swap(ll--, --i);
    }
    while (rr < r) {
      s.swap(rr++, j++);
    }

    // Conquer
    // Recurse on smaller interval first to keep stack shallow
    assert i != j;
    if (i - p < r - j) {
      sortInternal(s, p, i, rep, depth);
      p = j;
    } else {
      sortInternal(s, j, r, rep, depth);
      r = i;
    }
    }
  }

快速排序是应用最广泛的排序算法之一。它的基本思想是,选择序列中的一个元素作为枢轴,将小于枢轴的元素放在左边,将大于枢轴的元素放在右边,针对左右两个子序列重复此过程,直到序列为空或者只剩下一个元素。

在《算法导论》一书中,给出了一个教科书式的快速排序实现算法,它的实现方法是:选择序列的最后一个元素作为枢轴,并使用一个索引由前往后遍历整个序列,将小于枢轴的元素交换到左边,大于枢轴的元素交换到右边,直到序列为空或者只剩下一个元素。

Hadoop实现的快速排序在该快速排序之上进行了一下优化。

(1)枢轴选择

枢轴的选择好坏直接影响到快速排序的性能,而最坏的情况是划分过程中始终产生两个极端不对称的子序列(有一个长度为1,另一个长度为n-1),此时排序算法复杂度将增为O(N*2)。减小出现划分严重不对称的可能性,Hadoop将序列的收尾和中间元素中的中位数作为枢轴。

(2)子序列划分方法

Hadoop使用了两个索引 i 和 j 分别从左右两端进行扫描序列,并让索引 i 扫描到大于等于枢轴的元素停止,索引 j 扫描到小于等于枢轴的元素停止,然后交换两个元素,重复这个过程直到两个索引相遇。

(3)对相同元素的优化

在每次划分子序列时,将与枢轴相同的元素集中存放到中间位置,让它们不再参与后续的递归处理,即将序列划分成三部分:小于枢轴、等于枢轴和大于枢轴。

(4)减少递归次数

当子序列中元素数目小于13时,直接使用插入排序算法,不再继续递归。

而比较对象大小的方法是在org.apache.hadoop.mapred.MapTask中的内部类MapOutputBuffer覆写的compare方法

 /**
     * Compare logical range, st i, j MOD offset capacity.
     * Compare by partition, then by key.
     * @see IndexedSortable#compare
     */
    public int compare(int i, int j) {
      final int ii = kvoffsets[i % kvoffsets.length];
      final int ij = kvoffsets[j % kvoffsets.length];
      // sort by partition
      if (kvindices[ii + PARTITION] != kvindices[ij + PARTITION]) {
        return kvindices[ii + PARTITION] - kvindices[ij + PARTITION];
      }
      // sort by key
      return comparator.compare(kvbuffer,
          kvindices[ii + KEYSTART],
          kvindices[ii + VALSTART] - kvindices[ii + KEYSTART],
          kvbuffer,
          kvindices[ij + KEYSTART],
          kvindices[ij + VALSTART] - kvindices[ij + KEYSTART]);
    }

    /**
     * Swap logical indices st i, j MOD offset capacity.
     * @see IndexedSortable#swap
     */
    public void swap(int i, int j) {
      i %= kvoffsets.length;
      j %= kvoffsets.length;
      int tmp = kvoffsets[i];
      kvoffsets[i] = kvoffsets[j];
      kvoffsets[j] = tmp;
    }

步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中,如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。

步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存中索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中

Combine过程分析

在org.apache.hadoop.mapred.MapTask.java的runNewMapper方法中,在运行完mapper任务,会调用RecordWriter.close函数

output.close(mapperContext);

其实是调用RecordWrite的实现类org.apache.hadoop.mapred.MapTask的内部类NewOutputCollect的close方法

@Override
    public void close(TaskAttemptContext context
                      ) throws IOException,InterruptedException {
      try {
        collector.flush();
      } catch (ClassNotFoundException cnf) {
        throw new IOException("can‘t find class ", cnf);
      }
      collector.close();
    }

调用MapOutputCollector.flush()方法,查看MapOutputCollector的实现类org.apache.hadoop.mapred.MapTask.java中的内部类MapOutputBuffer的flush方法

   public synchronized void flush() throws IOException, ClassNotFoundException,
                                            InterruptedException {
      LOG.info("Starting flush of map output");
      spillLock.lock();
      try {
        while (kvstart != kvend) {
          reporter.progress();
          spillDone.await();
        }
        if (sortSpillException != null) {
          throw (IOException)new IOException("Spill failed"
              ).initCause(sortSpillException);
        }
        if (kvend != kvindex) {
          kvend = kvindex;
          bufend = bufmark;
          sortAndSpill();
        }
      } catch (InterruptedException e) {
        throw (IOException)new IOException(
            "Buffer interrupted while waiting for the writer"
            ).initCause(e);
      } finally {
        spillLock.unlock();
      }
      assert !spillLock.isHeldByCurrentThread();
      // shut down spill thread and wait for it to exit. Since the preceding
      // ensures that it is finished with its work (and sortAndSpill did not
      // throw), we elect to use an interrupt instead of setting a flag.
      // Spilling simultaneously from this thread while the spill thread
      // finishes its work might be both a useful way to extend this and also
      // sufficient motivation for the latter approach.
      try {
        spillThread.interrupt();
        spillThread.join();
      } catch (InterruptedException e) {
        throw (IOException)new IOException("Spill failed"
            ).initCause(e);
      }
      // release sort buffer before the merge
      kvbuffer = null;
      mergeParts();
      Path outputPath = mapOutputFile.getOutputFile();
      fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
    }

调用org.apache.hadoop.mapred.MapTask.java中的内部类MapOutputBuffer的mergeParts方法

  private void mergeParts() throws IOException, InterruptedException,
                                     ClassNotFoundException {
      // get the approximate size of the final output/index files
      long finalOutFileSize = 0;
      long finalIndexFileSize = 0;
      final Path[] filename = new Path[numSpills];
      final TaskAttemptID mapId = getTaskID();

      for(int i = 0; i < numSpills; i++) {
      //通过spill文件的编号获取到指定的spill文件路径
        filename[i] = mapOutputFile.getSpillFile(i);
        finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
      }
      //合并输出有俩文件一个是output/file.out,一个是output/file.out.index
      if (numSpills == 1) { //the spill is the final output
        rfs.rename(filename[0],
            new Path(filename[0].getParent(), "file.out"));
        if (indexCacheList.size() == 0) {
          rfs.rename(mapOutputFile.getSpillIndexFile(0),
              new Path(filename[0].getParent(),"file.out.index"));
        } else {//写入文件
          indexCacheList.get(0).writeToFile(
                new Path(filename[0].getParent(),"file.out.index"), job);
        }
        return;
      }

      // read in paged indices
      for (int i = indexCacheList.size(); i < numSpills; ++i) {
        Path indexFileName = mapOutputFile.getSpillIndexFile(i);
        indexCacheList.add(new SpillRecord(indexFileName, job, null));
      }

      //make correction in the length to include the sequence file header
      //lengths for each partition
      finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
      finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
      Path finalOutputFile =
          mapOutputFile.getOutputFileForWrite(finalOutFileSize);//output/file.out
      Path finalIndexFile =
          mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);//output/file.out.index

      //The output stream for the final single output file
      FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);

      if (numSpills == 0) {
        //create dummy files
        IndexRecord rec = new IndexRecord();
        SpillRecord sr = new SpillRecord(partitions);
        try {
          for (int i = 0; i < partitions; i++) {
            long segmentStart = finalOut.getPos();
            Writer<K, V> writer =
              new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null);
            writer.close();
            rec.startOffset = segmentStart;
            rec.rawLength = writer.getRawLength();
            rec.partLength = writer.getCompressedLength();
            sr.putIndex(rec, i);
          }
          sr.writeToFile(finalIndexFile, job);
        } finally {
          finalOut.close();
        }
        return;
      }
      {
        IndexRecord rec = new IndexRecord();
        final SpillRecord spillRec = new SpillRecord(partitions);
         //对于每一个partition。finalOut最终输出文件。循环分区获得所有spill文件的该分区数据,合并写入finalOut
        for (int parts = 0; parts < partitions; parts++) {
          //create the segments to be merged
          List<Segment<K,V>> segmentList =
            new ArrayList<Segment<K, V>>(numSpills);
      //依次从各个spill文件中收集属于当前partition的段
          for(int i = 0; i < numSpills; i++) {
            IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);

            Segment<K,V> s =
              new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,
                               indexRecord.partLength, codec, true);
            segmentList.add(i, s);

            if (LOG.isDebugEnabled()) {
              LOG.debug("MapId=" + mapId + " Reducer=" + parts +
                  "Spill =" + i + "(" + indexRecord.startOffset + "," +
                  indexRecord.rawLength + ", " + indexRecord.partLength + ")");
            }
          }

          //merge
          @SuppressWarnings("unchecked")
         //将属于同一个partition的段merge到一起
          RawKeyValueIterator kvIter = Merger.merge(job, rfs,
                         keyClass, valClass, codec,
                         segmentList, job.getInt("io.sort.factor", 100),
                         new Path(mapId.toString()),
                         job.getOutputKeyComparator(), reporter,
                         null, spilledRecordsCounter);

          //write merged output to disk
          //写入合并后的段到文件
          long segmentStart = finalOut.getPos();
          Writer<K, V> writer =
              new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
                               spilledRecordsCounter);
          if (combinerRunner == null || numSpills < minSpillsForCombine) {
            Merger.writeFile(kvIter, writer, reporter, job);
          } else {
            combineCollector.setWriter(writer);
         //其实写入数据的还是这里的writer类的append方法,这的输出是output/file.out文件,是合并后的文件
            combinerRunner.combine(kvIter, combineCollector);
          }

          //close
          writer.close();

          // record offsets
          rec.startOffset = segmentStart;
          rec.rawLength = writer.getRawLength();
          rec.partLength = writer.getCompressedLength();
          spillRec.putIndex(rec, parts);
        }
        spillRec.writeToFile(finalIndexFile, job);//写入索引文件
        finalOut.close();//合并后的输出文件
        for(int i = 0; i < numSpills; i++) {
          rfs.delete(filename[i],true);
        }
      }
    }

当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。

在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式:每轮合并io.sort.factor(默认为100)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。。。

接下来分析ReduceTask内部实现。。。

时间: 2024-10-04 07:43:36

Task运行过程分析4——Map Task内部实现2的相关文章

Task运行过程分析3——Map Task内部实现

Map Task内部实现 在Task运行过程分析2中提到,MapTask分为4种,分别是Job-setup Task.Job-cleanup Task.Task-cleanup Task和Map Task.其中,Job-setup Task和Job-cleanup Task分别是作业运行时启动的第一个任务和最后一个任务,主要工作分别是进行一些作业初始化和收尾工作,比如创建和删除作业临时输出目录:而Task-cleanup Task则是任务失败或者被杀死后,用于清理已写入临时目录中数据的任务.本文

Task运行过程分析1

1.Task运行过程概述 在MapReduce计算框架中,一个应用程序被划分成Map和Reduce两个计算阶段,它们分别由一个或者多个Map Task和Reduce Task组成.其中,每个Map Task处理输入数据集合中的一片数据(InputSplit),并将产生的若干个数据片段写到本地磁盘上,而Reduce Task则从每个Map Task上远程拷贝相应的数据片段,经分组聚集和归约后,将结果写到HDFS上作为最终结果,如下图所示: 总体上看,Map Task与Reduce Task之间的数

Task运行过程分析2

上篇文章Task运行过程1讲到脚本会运行org.apache.hadoop.mapred.Child类... Child类包含一个入口主方法main,在运行的时候需要传递对应的参数,来运行MapTask和ReduceTask,通过命令行输入如下5个参数: host:表示TaskTracker节点的主机名称 port:表示TaskTracker节点RPc端口号 taskID:表示启动的Task对应的TaskAttemptID,标识一个Task的一个运行实例 log location:表示该Task

Map Task内部实现分析

转自:http://blog.csdn.net/androidlushangderen/article/details/41142795 上篇我刚刚学习完,Spilt的过程,还算比较简单的了,接下来学习的就是Map操作的过程了,Map和Reduce一样,是整个MapReduce的重要内容,所以,这一篇,我会好好的讲讲里面的内部实现过程.首先要说,MapTask,分为4种,可能这一点上有人就可能知道了,分别是Job-setup Task,Job-cleanup Task,Task-cleanup和

Job和Task运行时信息的维护

JobTracker最重要的功能之一是状态监控,包括TaskTracker.Job和Task等运行时状态的监控,其中TaskTracker状态监控比较简单,只要记录其最近心跳汇报时间和健康状况(由TaskTracker端的监控脚本检测,并通过心跳将结果发送给JobTracker)即可. 作业描述模型 如下图所示 JobTracker在其内部以"三层多叉树"的方式描述和跟踪每个作业的运行状态.JobTracker为每个作业创建一个JobInProgress对象以跟踪和监控其运行状态.该对

Spark源码分析之八:Task运行(二)

在<Spark源码分析之七:Task运行(一)>一文中,我们详细叙述了Task运行的整体流程,最终Task被传输到Executor上,启动一个对应的TaskRunner线程,并且在线程池中被调度执行.继而,我们对TaskRunner的run()方法进行了详细的分析,总结出了其内Task执行的三个主要步骤: Step1:Task及其运行时需要的辅助对象构造,主要包括: 1.当前线程设置上下文类加载器: 2.获取序列化器ser: 3.更新任务状态TaskState: 4.计算垃圾回收时间: 5.反

TaskScheduler内幕天机:Spark shell案例,TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详解

TaskSchedulerBackend与SchedulerBackend FIFO与FAIR两种调度模式 Task数据本地性资源的分配 一.TaskScheduler运行过程(Spark-shell角度) 1.启动Spark-shell 当我们spark-shell本身的时候命令终端返回来的主要是ClientEndpoint和SparkDeploySchedulerBakcend.这是因为此时还没有任何应用程序Job的触发,这是启动Application本身而已,所以主要就是实例化SparkC

第三十六课 Spark之TaskScheduler Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详

</pre></h2><div><p>本节课内容:</p><p>1.     TaskSchedulerBackend与SchedulerBackend</p><p>2.     FIFO与FAIR两种调度模式</p><p>3.     Task数据本地性资源的分配</p></div><h3>一.Scheduler运行过程(Spark-shell角度)

MapReduce作业的map task和reduce task调度参数

MapReduce作业可以细分为map task和reduce task,而MRAppMaster又将map task和reduce task分为四种状态: 1.pending:刚启动但尚未向resourcemanager发送资源请求: 2.scheduled:已经向resourceManager发送资源请求,但尚未分配到资源: 3.assigned:已经分配到了资源且正在运行: 4.completed:已经运行完成. map task的生命周期为:scheduled -> assigned -