MapReduce源码分析之JobSplitWriter

JobSplitWriter被作业客户端用于写分片相关文件,包括分片数据文件job.split和分片元数据信息文件job.splitmetainfo。它有两个静态成员变量,如下:

  // 分片版本,当前默认为1
  private static final int splitVersion = JobSplit.META_SPLIT_VERSION;
  // 分片文件头部,为UTF-8格式的字符串"SPL"的字节数组"SPL"
  private static final byte[] SPLIT_FILE_HEADER;

并且,提供了一个静态方法,完成SPLIT_FILE_HEADER的初始化,代码如下:

  // 静态方法,加载SPLIT_FILE_HEADER为UTF-8格式的字符串"SPL"的字节数组byte[]
  static {
    try {
      SPLIT_FILE_HEADER = "SPL".getBytes("UTF-8");
    } catch (UnsupportedEncodingException u) {
      throw new RuntimeException(u);
    }
  }

JobSplitWriter实现其功能的为createSplitFiles()方法,它有三种实现,我们先看其中的public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir,Configuration conf, FileSystem fs, T[] splits),代码如下:

  // 创建分片文件
  public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir,
      Configuration conf, FileSystem fs, T[] splits)
  throws IOException, InterruptedException {

	// 调用createFile()方法,创建分片文件,并获取文件系统数据输出流FSDataOutputStream实例out,
	// 对应路径为jobSubmitDir/job.split,jobSubmitDir为参数yarn.app.mapreduce.am.staging-dir指定的路径/作业所属用户user/.staging/作业ID
	FSDataOutputStream out = createFile(fs,
        JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);

	// 调用writeNewSplits()方法,将分片数据写入分片文件,并得到分片元数据信息SplitMetaInfo数组info
    SplitMetaInfo[] info = writeNewSplits(conf, splits, out);

    // 关闭输出流
    out.close();

    // 调用writeJobSplitMetaInfo()方法,将分片元数据信息写入分片元数据文件
    writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
        new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
        info);
  }

createSplitFiles()方法的逻辑很清晰,大体如下:

1、调用createFile()方法,创建分片文件,并获取文件系统数据输出流FSDataOutputStream实例out,对应路径为jobSubmitDir/job.split,jobSubmitDir为参数yarn.app.mapreduce.am.staging-dir指定的路径/作业所属用户user/.staging/作业ID;

2、调用writeNewSplits()方法,将分片数据写入分片文件,并得到分片元数据信息SplitMetaInfo数组info;

3、关闭输出流out;

4、调用writeJobSplitMetaInfo()方法,将分片元数据信息写入分片元数据文件。

我们先来看下createFile()方法,代码如下:

  private static FSDataOutputStream createFile(FileSystem fs, Path splitFile,
      Configuration job)  throws IOException {

	// 调用HDFS文件系统FileSystem的create()方法,获取文件系统数据输出流FSDataOutputStream实例out,
	// 对应权限为JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--
    FSDataOutputStream out = FileSystem.create(fs, splitFile,
        new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));

    // 获取副本数replication,取参数mapreduce.client.submit.file.replication,参数未配置默认为10
    int replication = job.getInt(Job.SUBMIT_REPLICATION, 10);

    // 通过文件系统FileSystem实例fs的setReplication()方法,设置splitFile的副本数位10
    fs.setReplication(splitFile, (short)replication);

    // 调用writeSplitHeader()方法写入分片头信息
    writeSplitHeader(out);

    // 返回文件系统数据输出流out
    return out;
  }

首先,调用HDFS文件系统FileSystem的create()方法,获取文件系统数据输出流FSDataOutputStream实例out,对应权限为JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--;

其次,获取副本数replication,取参数mapreduce.client.submit.file.replication,参数未配置默认为10;

接着,通过文件系统FileSystem实例fs的setReplication()方法,设置splitFile的副本数位10;

然后,调用writeSplitHeader()方法写入分片头信息;

最后,返回文件系统数据输出流out。

writeSplitHeader()方法专门用于将分片头部信息写入分片文件,代码如下:

  private static void writeSplitHeader(FSDataOutputStream out)
  throws IOException {

	// 文件系统数据输出流out写入byte[],内容为UTF-8格式的"SPL"
    out.write(SPLIT_FILE_HEADER);
    // 文件系统数据输出流out写入int,分片版本号,目前为1
    out.writeInt(splitVersion);
  }

很简单,首先文件系统数据输出流out写入byte[],内容为UTF-8格式的"SPL",然后文件系统数据输出流out写入int,分片版本号,目前为1。

接下来,我们再看下writeNewSplits()方法,它将分片数据写入分片文件,并得到分片元数据信息SplitMetaInfo数组info,代码如下:

  @SuppressWarnings("unchecked")
  private static <T extends InputSplit>
  SplitMetaInfo[] writeNewSplits(Configuration conf,
      T[] array, FSDataOutputStream out)
  throws IOException, InterruptedException {

	// 根据array的大小,构造同等大小的分片元数据信息SplitMetaInfo数组info,
	// array其实是传入的分片数组
    SplitMetaInfo[] info = new SplitMetaInfo[array.length];
    if (array.length != 0) {// 如果array中有数据

      // 创建序列化工厂SerializationFactory实例factory
      SerializationFactory factory = new SerializationFactory(conf);
      int i = 0;

      // 获取最大的数据块位置maxBlockLocations,取参数mapreduce.job.max.split.locations,参数未配置默认为10
      int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY,
          MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT);

      // 通过输出流out的getPos()方法获取输出流out的当前位置offset
      long offset = out.getPos();

      // 遍历数组array中每个元素split

      for(T split: array) {

    	// 通过输出流out的getPos()方法获取输出流out的当前位置prevCount
        long prevCount = out.getPos();

        // 往输出流out中写入String,内容为split对应的类名
        Text.writeString(out, split.getClass().getName());

        // 获取序列化器Serializer实例serializer
        Serializer<T> serializer =
          factory.getSerializer((Class<T>) split.getClass());

        // 打开serializer,接入输出流out
        serializer.open(out);

        // 将split序列化到输出流out
        serializer.serialize(split);

        // 通过输出流out的getPos()方法获取输出流out的当前位置currCount
        long currCount = out.getPos();

        // 通过split的getLocations()方法,获取位置信息locations
        String[] locations = split.getLocations();
        if (locations.length > maxBlockLocations) {
          LOG.warn("Max block location exceeded for split: "
              + split + " splitsize: " + locations.length +
              " maxsize: " + maxBlockLocations);
          locations = Arrays.copyOf(locations, maxBlockLocations);
        }

        // 构造split对应的元数据信息,并加入info指定位置,
        // offset为当前split在split文件中的起始位置,数据长度为split.getLength(),位置信息为locations
        info[i++] =
          new JobSplit.SplitMetaInfo(
              locations, offset,
              split.getLength());

        // offset增加当前split已写入数据大小
        offset += currCount - prevCount;
      }
    }

    // 返回分片元数据信息SplitMetaInfo数组info
    return info;
  }

writeNewSplits()方法的逻辑比较清晰,大体如下:

1、根据array的大小,构造同等大小的分片元数据信息SplitMetaInfo数组info,array其实是传入的分片数组;

2、如果array中有数据:

2.1、创建序列化工厂SerializationFactory实例factory;

2.2、获取最大的数据块位置maxBlockLocations,取参数mapreduce.job.max.split.locations,参数未配置默认为10;

2.3、通过输出流out的getPos()方法获取输出流out的当前位置offset;

2.4、遍历数组array中每个元素split:

2.4.1、通过输出流out的getPos()方法获取输出流out的当前位置prevCount;

2.4.2、往输出流out中写入String,内容为split对应的类名;

2.4.3、获取序列化器Serializer实例serializer;

2.4.4、打开serializer,接入输出流out;

2.4.5、将split序列化到输出流out;

2.4.6、通过输出流out的getPos()方法获取输出流out的当前位置currCount;

2.4.7、通过split的getLocations()方法,获取位置信息locations;

2.4.8、确保位置信息locations的长度不能超过maxBlockLocations,超过则截断;

2.4.9、构造split对应的元数据信息,并加入info指定位置,offset为当前split在split文件中的起始位置,数据长度为split.getLength(),位置信息为locations;

2.4.10、offset增加当前split已写入数据大小;

3、返回分片元数据信息SplitMetaInfo数组info。

其中,序列化split对象时,我们以FileSplit为例来分析,其write()方法如下:

  @Override
  public void write(DataOutput out) throws IOException {
	// 写入文件路径全名
    Text.writeString(out, file.toString());
    // 写入分片在文件中的起始位置
    out.writeLong(start);
    // 写入分片在文件中的长度
    out.writeLong(length);
  }

比较简单,分别写入文件路径全名、分片在文件中的起始位置、分片在文件中的长度三个信息。

综上所述,分片文件job.split文件的内容为:

1、文件头:"SPL"+int类型版本号1;

2、分片类信息:String类型split对应类名;

3、分片数据信息:String类型文件路径全名+Long类型分片在文件中的起始位置+Long类型分片在文件中的长度。

而在最后,构造分片元数据信息时,产生的是JobSplit的静态内部类SplitMetaInfo对象,包括分片位置信息locations、split在split文件中的起始位置offset、分片长度split.getLength()。

下面,我们再看下分片的元数据信息文件是如何产生的,让我们来研究下writeJobSplitMetaInfo()方法,代码如下:

  // 写入作业分片元数据信息
  private static void writeJobSplitMetaInfo(FileSystem fs, Path filename,
      FsPermission p, int splitMetaInfoVersion,
      JobSplit.SplitMetaInfo[] allSplitMetaInfo)
  throws IOException {
    // write the splits meta-info to a file for the job tracker
	// 调用HDFS文件系统FileSystem的create()方法,生成分片元数据信息文件,并获取文件系统数据输出流FSDataOutputStream实例out,
	// 对应文件路径为jobSubmitDir/job.splitmetainfo,jobSubmitDir为参数yarn.app.mapreduce.am.staging-dir指定的路径/作业所属用户user/.staging/作业ID
	// 对应权限为JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--
    FSDataOutputStream out =
      FileSystem.create(fs, filename, p);

    // 写入分片元数据头部信息UTF-8格式的字符串"META-SPL"的字节数组byte[]
    out.write(JobSplit.META_SPLIT_FILE_HEADER);

    // 写入分片元数据版本号splitMetaInfoVersion,当前为1
    WritableUtils.writeVInt(out, splitMetaInfoVersion);
    // 写入分片元数据个数,为分片元数据信息SplitMetaInfo数组个数allSplitMetaInfo.length
    WritableUtils.writeVInt(out, allSplitMetaInfo.length);

    // 遍历分片元数据信息SplitMetaInfo数组allSplitMetaInfo中每个splitMetaInfo,挨个写入输出流
    for (JobSplit.SplitMetaInfo splitMetaInfo : allSplitMetaInfo) {
      splitMetaInfo.write(out);
    }

    // 关闭输出流out
    out.close();
  }

writeJobSplitMetaInfo()方法的主体逻辑也十分清晰,大体如下:

1、调用HDFS文件系统FileSystem的create()方法,生成分片元数据信息文件,并获取文件系统数据输出流FSDataOutputStream实例out,对应文件路径为jobSubmitDir/job.splitmetainfo,jobSubmitDir为参数yarn.app.mapreduce.am.staging-dir指定的路径/作业所属用户user/.staging/作业ID,对应权限为JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--;

2、写入分片元数据头部信息UTF-8格式的字符串"META-SPL"的字节数组byte[];

3、写入分片元数据版本号splitMetaInfoVersion,当前为1;

4、写入分片元数据个数,为分片元数据信息SplitMetaInfo数组个数allSplitMetaInfo.length;

5、遍历分片元数据信息SplitMetaInfo数组allSplitMetaInfo中每个splitMetaInfo,挨个写入输出流;

6、关闭输出流out。

我们看下如何序列化JobSplit.SplitMetaInfo,将其写入文件,JobSplit.SplitMetaInfo的write()如下:

    public void write(DataOutput out) throws IOException {

      // 将分片位置个数写入分片元数据信息文件
      WritableUtils.writeVInt(out, locations.length);
      // 遍历位置信息,写入分片元数据信息文件
      for (int i = 0; i < locations.length; i++) {
        Text.writeString(out, locations[i]);
      }
      // 写入分片元数据信息的起始位置
      WritableUtils.writeVLong(out, startOffset);
      // 写入分片大小
      WritableUtils.writeVLong(out, inputDataLength);
    }

每个分片的元数据信息,包括分片位置个数、分片文件位置、分片元数据信息的起始位置、分片大小等内容。

总结

JobSplitWriter被作业客户端用于写分片相关文件,包括分片数据文件job.split和分片元数据信息文件job.splitmetainfo。分片数据文件job.split存储的主要是每个分片对应的HDFS文件路径,和其在HDFS文件中的起始位置、长度等信息,而分片元数据信息文件job.splitmetainfo存储的则是每个分片在分片数据文件job.split中的起始位置、分片大小等信息。

job.split文件内容:文件头 + 分片 + 分片 + ... + 分片

文件头:"SPL" + 版本号1

分片:分片类 + 分片数据,分片类=String类型split对应类名,分片数据=String类型HDFS文件路径全名+Long类型分片在HDFS文件中的起始位置+Long类型分片在HDFS文件中的长度

job.splitmetainfo文件内容:文件头 + 分片元数据个数 + 分片元数据 + 分片元数据 + ... + 分片元数据

文件头:"META-SPL" + 版本号1

分片元数据个数:分片元数据的个数

分片元数据:分片位置个数+分片位置+在分片文件job.split中的起始位置+分片大小

时间: 2024-08-15 10:24:36

MapReduce源码分析之JobSplitWriter的相关文章

MapReduce源码分析之JobSubmitter(一)

JobSubmitter,顾名思义,它是MapReduce中作业提交者,而实际上JobSubmitter除了构造方法外,对外提供的唯一一个非private成员变量或方法就是submitJobInternal()方法,它是提交Job的内部方法,实现了提交Job的所有业务逻辑.本文,我们将深入研究MapReduce中用于提交Job的组件JobSubmitter. 首先,我们先看下JobSubmitter的类成员变量,如下: // 文件系统FileSystem实例 private FileSystem

MapReduce源码分析之MapTask分析(二)

SpillThread分析 为什么需要Spill 内存大小总是有效,因此在Mapper在处理过程中,数据持续输出到内存中时,必然需要有机制能将内存中的数据换出,合理的刷出到磁盘上.SpillThread就是用来完成这部分工作. SpillThread的线程处理函数只是做一层封装,当索引表中的kvstart和kvend指向一样的索引位置时,会持续处于等待过程,等待外部通知需要触发spill动作,当有spill请求时,会触发StartSpill来唤醒SpillThread线程,进入到sortAndS

MapReduce源码分析之MapTask分析

前言 MapReduce的源码分析是基于Hadoop1.2.1基础上进行的代码分析. 该章节会分析在MapTask端的详细处理流程以及MapOutputCollector是如何处理map之后的collect输出的数据. map端的主要处理流程 图1 MapTask处理流程 图1所示为MapTask的主要代码执行流程,在MapTask启动后会进入入口run函数,根据是否使用新的api来决定选择运行新的mapper还是旧的mapper,最后完成执行向外汇报. 在这,我们选择分析旧的api,也就是ru

MapReduce源码分析之LocatedFileStatusFetcher

LocatedFileStatusFetcher是MapReduce中一个针对给定输入路径数组,使用配置的线程数目来获取数据块位置的实用类.它的主要作用就是利用多线程技术,每个线程对应一个任务,每个任务针对给定输入路径数组Path[],解析出文件状态列表队列BlockingQueue<List<FileStatus>>.其中,输入数据输入路径只不过是一个Path,而输出数据则是文件状态列表队列BlockingQueue<List<FileStatus>>,文

MapReduce源码分析总结

转自:http://www.cnblogs.com/forfuture1978/archive/2010/11/19/1882279.html 转者注:本来想在Hadoop学习总结系列详细解析HDFS以及Map-Reduce的,然而查找资料的时候,发现了这篇文章,并且发现caibinbupt已经对Hadoop的源代码已经进行了详细的分析,推荐大家阅读. 转自http://blog.csdn.net/HEYUTAO007/archive/2010/07/10/5725379.aspx 参考: 1

MapReduce源码分析之Task中关于对应TaskAttempt存储Map方案的一些思考

我们知道,MapReduce有三层调度模型,即Job-->Task-->TaskAttempt,并且: 1.通常一个Job存在多个Task,这些Task总共有Map Task和Redcue Task两种大的类型(为简化描述,Map-Only作业.JobSetup Task等复杂的情况这里不做考虑): 2.每个Task可以尝试运行1-n此,而且通常很多情况下都是1次,只有当开启了推测执行原理且存在拖后腿Task,或者Task之前执行失败时,Task才执行多次. 而TaskImpl中存在一个成员变

MapReduce源码分析:Mapper和Reducer类

一:Mapper类 在Hadoop的mapper类中,有4个主要的函数,分别是:setup,clearup,map,run.代码如下: protected void setup(Context context) throws IOException, InterruptedException { // NOTHING } protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, Interr

Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(一)

我们知道,如果想要在Yarn上运行MapReduce作业,仅需实现一个ApplicationMaster组件即可,而MRAppMaster正是MapReduce在Yarn上ApplicationMaster的实现,由其控制MR作业在Yarn上的执行.如此,随之而来的一个问题就是,MRAppMaster是如何控制MapReduce作业在Yarn上运行的,换句话说,MRAppMaster上MapReduce作业处理总流程是什么?这就是本文要研究的重点. 通过MRAppMaster类的定义我们就能看出

MapReduce阶段源码分析以及shuffle过程详解

MapReducer工作流程图: 1. MapReduce阶段源码分析 1)客户端提交源码分析 解释:   - 判断是否打印日志   - 判断是否使用新的API,检查连接   - 在检查连接时,检查输入输出路径,计算切片,将jar.配置文件复制到HDFS   - 计算切片时,计算最小切片数(默认为1,可自定义)和最大切片数(默认是long的最大值,可以自定义)   - 查看给定的是否是文件,如果是否目录计算目录下所有文件的切片   - 通过block大小和最小切片数.最大切片数计算出切片大小