十三、MapReduce--output输出源码分析

当reducetask执行完成后,就会将结果的KV写入到指定路径下。下面分析这个output过程。

1、首先看 ReduceTask.run() 这个执行入口

//--------------------------ReduceTask.java
public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, InterruptedException, ClassNotFoundException {
    job.setBoolean("mapreduce.job.skiprecords", this.isSkipping());
    if (this.isMapOrReduce()) {
        this.copyPhase = this.getProgress().addPhase("copy");
        this.sortPhase = this.getProgress().addPhase("sort");
        this.reducePhase = this.getProgress().addPhase("reduce");
    }

    TaskReporter reporter = this.startReporter(umbilical);
    boolean useNewApi = job.getUseNewReducer();
    //reducetask初始化工作
    this.initialize(job, this.getJobID(), reporter, useNewApi);
    if (this.jobCleanup) {
        this.runJobCleanupTask(umbilical, reporter);
    } else if (this.jobSetup) {
        this.runJobSetupTask(umbilical, reporter);
    } else if (this.taskCleanup) {
        this.runTaskCleanupTask(umbilical, reporter);
    } else {
        this.codec = this.initCodec();
        RawKeyValueIterator rIter = null;
        ShuffleConsumerPlugin shuffleConsumerPlugin = null;
        Class combinerClass = this.conf.getCombinerClass();
        CombineOutputCollector combineCollector = null != combinerClass ? new CombineOutputCollector(this.reduceCombineOutputCounter, reporter, this.conf) : null;
        Class<? extends ShuffleConsumerPlugin> clazz = job.getClass("mapreduce.job.reduce.shuffle.consumer.plugin.class", Shuffle.class, ShuffleConsumerPlugin.class);
        shuffleConsumerPlugin = (ShuffleConsumerPlugin)ReflectionUtils.newInstance(clazz, job);
        LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
        Context shuffleContext = new Context(this.getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, this.codec, combinerClass, combineCollector, this.spilledRecordsCounter, this.reduceCombineInputCounter, this.shuffledMapsCounter, this.reduceShuffleBytes, this.failedShuffleCounter, this.mergedMapOutputsCounter, this.taskStatus, this.copyPhase, this.sortPhase, this, this.mapOutputFile, this.localMapFiles);
        shuffleConsumerPlugin.init(shuffleContext);
        rIter = shuffleConsumerPlugin.run();
        this.mapOutputFilesOnDisk.clear();
        this.sortPhase.complete();
        this.setPhase(Phase.REDUCE);
        this.statusUpdate(umbilical);
        Class keyClass = job.getMapOutputKeyClass();
        Class valueClass = job.getMapOutputValueClass();
        RawComparator comparator = job.getOutputValueGroupingComparator();
        //开始运行reducetask
        if (useNewApi) {
            this.runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);
        } else {
            this.runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);
        }

        shuffleConsumerPlugin.close();
        this.done(umbilical, reporter);
    }

和MapTask类似,主要有 this.initialize() 以及 this.runNewReducer() 这两个方法。做了初始化以及开始运行task的操作。

2、this.initialize()

//----------------------------------------ReduceTask.java
public void initialize(JobConf job, JobID id, Reporter reporter, boolean useNewApi) throws IOException, ClassNotFoundException, InterruptedException {

    //创建上下文对象
    this.jobContext = new JobContextImpl(job, id, reporter);
    this.taskContext = new TaskAttemptContextImpl(job, this.taskId, reporter);
    //修改reducetask的状态为运行中
    if (this.getState() == org.apache.hadoop.mapred.TaskStatus.State.UNASSIGNED) {
        this.setState(org.apache.hadoop.mapred.TaskStatus.State.RUNNING);
    }

    if (useNewApi) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("using new api for output committer");
        }

        //反射获取outputformat类对象。getOutputFormatClass这个方法在JobContextImpl中。
        //默认是TextOutputFormat.class
        this.outputFormat = (OutputFormat)ReflectionUtils.newInstance(this.taskContext.getOutputFormatClass(), job);
        this.committer = this.outputFormat.getOutputCommitter(this.taskContext);
    } else {
        this.committer = this.conf.getOutputCommitter();
    }

    //获取输出路径
    Path outputPath = FileOutputFormat.getOutputPath(this.conf);
    if (outputPath != null) {
        if (this.committer instanceof FileOutputCommitter) {
            FileOutputFormat.setWorkOutputPath(this.conf, ((FileOutputCommitter)this.committer).getTaskAttemptPath(this.taskContext));
        } else {
            FileOutputFormat.setWorkOutputPath(this.conf, outputPath);
        }
    }

    this.committer.setupTask(this.taskContext);
    Class<? extends ResourceCalculatorProcessTree> clazz = this.conf.getClass("mapreduce.job.process-tree.class", (Class)null, ResourceCalculatorProcessTree.class);
    this.pTree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree((String)System.getenv().get("JVM_PID"), clazz, this.conf);
    LOG.info(" Using ResourceCalculatorProcessTree : " + this.pTree);
    if (this.pTree != null) {
        this.pTree.updateProcessTree();
        this.initCpuCumulativeTime = this.pTree.getCumulativeCpuTime();
    }

}

主要就是初始化上下文对象,获取outputformat对象。

3、this.runNewReducer()

//-----------------------------------------------ReduceTask.java
private <INKEY, INVALUE, OUTKEY, OUTVALUE> void runNewReducer(JobConf job, TaskUmbilicalProtocol umbilical, final TaskReporter reporter, final RawKeyValueIterator rIter, RawComparator<INKEY> comparator, Class<INKEY> keyClass, Class<INVALUE> valueClass) throws IOException, InterruptedException, ClassNotFoundException {
    //匿名内部类,用于构建key,value的迭代器
    rIter = new RawKeyValueIterator() {
        public void close() throws IOException {
            rIter.close();
        }

        public DataInputBuffer getKey() throws IOException {
            return rIter.getKey();
        }

        public Progress getProgress() {
            return rIter.getProgress();
        }

        public DataInputBuffer getValue() throws IOException {
            return rIter.getValue();
        }

        public boolean next() throws IOException {
            boolean ret = rIter.next();
            reporter.setProgress(rIter.getProgress().getProgress());
            return ret;
        }
    };
    TaskAttemptContext taskContext = new TaskAttemptContextImpl(job, this.getTaskID(), reporter);
    //反射获取Reducer对象
    org.apache.hadoop.mapreduce.Reducer<INKEY, INVALUE, OUTKEY, OUTVALUE> reducer = (org.apache.hadoop.mapreduce.Reducer)ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
    //获取RecordWriter对象,用于将结果写入到文件中
    org.apache.hadoop.mapreduce.RecordWriter<OUTKEY, OUTVALUE> trackedRW = new ReduceTask.NewTrackingRecordWriter(this, taskContext);
    job.setBoolean("mapred.skip.on", this.isSkipping());
    job.setBoolean("mapreduce.job.skiprecords", this.isSkipping());
    //创建reduceContext对象,用于reduce任务
    org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, this.getTaskID(), rIter, this.reduceInputKeyCounter, this.reduceInputValueCounter, trackedRW, this.committer, reporter, comparator, keyClass, valueClass);

    //开始运行reduce
    try {
        reducer.run(reducerContext);
    } finally {
        //关闭输出流
        trackedRW.close(reducerContext);
    }

}

可以看到,主要做了以下工作:
1)获取reducer对象,用于运行run() ,也就是运行reduce方法
2)创建 RecordWriter对象
3)创建reduceContext
4)开始运行reducer中的run

4、ReduceTask.NewTrackingRecordWriter()

//--------------------------------------NewTrackingRecordWriter.java
static class NewTrackingRecordWriter<K, V> extends org.apache.hadoop.mapreduce.RecordWriter<K, V> {
    private final org.apache.hadoop.mapreduce.RecordWriter<K, V> real;
    private final org.apache.hadoop.mapreduce.Counter outputRecordCounter;
    private final org.apache.hadoop.mapreduce.Counter fileOutputByteCounter;
    private final List<Statistics> fsStats;

    NewTrackingRecordWriter(ReduceTask reduce, TaskAttemptContext taskContext) throws InterruptedException, IOException {
        this.outputRecordCounter = reduce.reduceOutputCounter;
        this.fileOutputByteCounter = reduce.fileOutputByteCounter;
        List<Statistics> matchedStats = null;
        if (reduce.outputFormat instanceof FileOutputFormat) {
            matchedStats = Task.getFsStatistics(FileOutputFormat.getOutputPath(taskContext), taskContext.getConfiguration());
        }

        this.fsStats = matchedStats;
        long bytesOutPrev = this.getOutputBytes(this.fsStats);
        //通过outputFormat创建RecordWriter对象
        this.real = reduce.outputFormat.getRecordWriter(taskContext);
        long bytesOutCurr = this.getOutputBytes(this.fsStats);
        this.fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
    }
    .....................
}

重点的就是通过outputFormat.getRecordWriter来创建 RecordWriter 对象。
上面也说到,outputFormat默认就是 TextOutputFormat,所以下面看看
TextOutputFormat.getRecordWriter()

5、TextOutputFormat.getRecordWriter()

public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
    public TextOutputFormat() {
    }

    //可以看到,返回的是静态内部类TextOutputFormat.LineRecordWriter
    public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException {
        boolean isCompressed = getCompressOutput(job);
        //key和value的分隔符,默认是 \t
        String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t");
        //分为压缩和非压缩输出
        if (!isCompressed) {
            //获取输出路径
            Path file = FileOutputFormat.getTaskOutputPath(job, name);
            FileSystem fs = file.getFileSystem(job);
            //创建输出流
            FSDataOutputStream fileOut = fs.create(file, progress);
            return new TextOutputFormat.LineRecordWriter(fileOut, keyValueSeparator);
        } else {
            Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
            CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, job);
            Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension());
            FileSystem fs = file.getFileSystem(job);
            FSDataOutputStream fileOut = fs.create(file, progress);
            //返回LineRecordWriter对象
            return new TextOutputFormat.LineRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator);
        }
    }

    //这里就是 LineRecordWriter 类
    protected static class LineRecordWriter<K, V> implements RecordWriter<K, V> {
        private static final byte[] NEWLINE;
        protected DataOutputStream out;
        private final byte[] keyValueSeparator;

        public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
            this.out = out;
            this.keyValueSeparator = keyValueSeparator.getBytes(StandardCharsets.UTF_8);
        }

        public LineRecordWriter(DataOutputStream out) {
            this(out, "\t");
        }

        private void writeObject(Object o) throws IOException {
            if (o instanceof Text) {
                Text to = (Text)o;
                this.out.write(to.getBytes(), 0, to.getLength());
            } else {
                this.out.write(o.toString().getBytes(StandardCharsets.UTF_8));
            }

        }

        //将KV输出
        public synchronized void write(K key, V value) throws IOException {
            boolean nullKey = key == null || key instanceof NullWritable;
            boolean nullValue = value == null || value instanceof NullWritable;
            if (!nullKey || !nullValue) {
                //先写key
                if (!nullKey) {
                    this.writeObject(key);
                }

                //接着写入key和value之间的分隔符
                if (!nullKey && !nullValue) {
                    this.out.write(this.keyValueSeparator);
                }

                //最后写入value
                if (!nullValue) {
                    this.writeObject(value);
                }

                //接着写入新的一行
                this.out.write(NEWLINE);
            }
        }

        public synchronized void close(Reporter reporter) throws IOException {
            this.out.close();
        }

        static {
            NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
        }
    }
}

可以看到,最终返回的RecordWriter对象是 LineRecordWriter 类型的。
接着回到3中,看 reduceContext这个对象的类

6、reduceContext = ReduceTask.createReduceContext()

protected static <INKEY, INVALUE, OUTKEY, OUTVALUE> Reducer<INKEY, INVALUE, OUTKEY, OUTVALUE>.Context createReduceContext(Reducer<INKEY, INVALUE, OUTKEY, OUTVALUE> reducer, Configuration job, org.apache.hadoop.mapreduce.TaskAttemptID taskId, RawKeyValueIterator rIter, org.apache.hadoop.mapreduce.Counter inputKeyCounter, org.apache.hadoop.mapreduce.Counter inputValueCounter, RecordWriter<OUTKEY, OUTVALUE> output, OutputCommitter committer, StatusReporter reporter, RawComparator<INKEY> comparator, Class<INKEY> keyClass, Class<INVALUE> valueClass) throws IOException, InterruptedException {
    ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE> reduceContext = new ReduceContextImpl(job, taskId, rIter, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass);
    Reducer<INKEY, INVALUE, OUTKEY, OUTVALUE>.Context reducerContext = (new WrappedReducer()).getReducerContext(reduceContext);
    return reducerContext;
}

可以看到reducerContext是一个ReduceContextImpl类对象。
下面看看ReduceContextImpl 这个类的构造方法

//---------------------------------ReduceContextImpl.java
public ReduceContextImpl(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input, Counter inputKeyCounter, Counter inputValueCounter, RecordWriter<KEYOUT, VALUEOUT> output, OutputCommitter committer, StatusReporter reporter, RawComparator<KEYIN> comparator, Class<KEYIN> keyClass, Class<VALUEIN> valueClass) throws InterruptedException, IOException {
    //父类是 TaskInputOutputContextImpl,把outputformat对象传递进去了
    super(conf, taskid, output, committer, reporter);
    this.input = input;
    this.inputKeyCounter = inputKeyCounter;
    this.inputValueCounter = inputValueCounter;
    this.comparator = comparator;
    this.serializationFactory = new SerializationFactory(conf);
    this.keyDeserializer = this.serializationFactory.getDeserializer(keyClass);
    this.keyDeserializer.open(this.buffer);
    this.valueDeserializer = this.serializationFactory.getDeserializer(valueClass);
    this.valueDeserializer.open(this.buffer);
    this.hasMore = input.next();
    this.keyClass = keyClass;
    this.valueClass = valueClass;
    this.conf = conf;
    this.taskid = taskid;
}

这里面,它继续调用了父类的构造方法,把outputformat对象传递进去了。
继续看看父类 TaskInputOutputContextImpl

public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid, RecordWriter<KEYOUT, VALUEOUT> output, OutputCommitter committer, StatusReporter reporter) {
    //可以看到这里的output就是recordWriter对象
    super(conf, taskid, reporter);
    this.output = output;
    this.committer = committer;
}

//这里的逻辑其实就是先读取KV到 this.key和this.value中,如果没有KV就返回false,否则返回true
public abstract boolean nextKeyValue() throws IOException, InterruptedException;

public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;

public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;

//调用recordWriter的write方法,将KV输出,默认是LineRecordWriter这个类
public void write(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException {
    this.output.write(key, value);

可以看到,这里有3个抽象方法(在子类ReduceContextImpl中实现了逻辑,和RecordWriter无关),以及write这个具体方法。分别用于获取KV以及将结果KV写入。write这个写入方法,就是调用的 recordWriter的write方法,也就是5中创建的LineRecordWriter对象中的write方法,将KV输出。

7、reducer.run()

public void run(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        this.setup(context);

        try {
            while(context.nextKey()) {
                this.reduce(context.getCurrentKey(), context.getValues(), context);
                Iterator<VALUEIN> iter = context.getValues().iterator();
                if (iter instanceof ValueIterator) {
                    ((ValueIterator)iter).resetBackupStore();
                }
            }
        } finally {
            this.cleanup(context);
        }

    }

可以看到,这里就是调用6中创建的 reduceContext中的方法来获取KV。而且在reduce方法中,我们会通过 context.write(key,value)来将结果KV输出。调用的其实就是 LineRecordWriter对象中的write方法。

8、总结

输出的过程中,主要涉及到两个对象 OutputFormat 以及 RecordWriter
OutputFormat :创建输出流,以及创建RecordWriter对象
RecordWriter:将KV输出到文件中的 write方法

原文地址:https://blog.51cto.com/kinglab/2445195

时间: 2024-10-10 14:05:03

十三、MapReduce--output输出源码分析的相关文章

Spark 源码分析系列

如下,是 spark 源码分析系列的一些文章汇总,持续更新中...... Spark RPC spark 源码分析之五--Spark RPC剖析之创建NettyRpcEnv spark 源码分析之六--Spark RPC剖析之Dispatcher和Inbox.Outbox剖析 spark 源码分析之七--Spark RPC剖析之RpcEndPoint和RpcEndPointRef剖析 spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClie

MapReduce源码分析之JobSubmitter(一)

JobSubmitter,顾名思义,它是MapReduce中作业提交者,而实际上JobSubmitter除了构造方法外,对外提供的唯一一个非private成员变量或方法就是submitJobInternal()方法,它是提交Job的内部方法,实现了提交Job的所有业务逻辑.本文,我们将深入研究MapReduce中用于提交Job的组件JobSubmitter. 首先,我们先看下JobSubmitter的类成员变量,如下: // 文件系统FileSystem实例 private FileSystem

MapReduce源码分析之MapTask分析(二)

SpillThread分析 为什么需要Spill 内存大小总是有效,因此在Mapper在处理过程中,数据持续输出到内存中时,必然需要有机制能将内存中的数据换出,合理的刷出到磁盘上.SpillThread就是用来完成这部分工作. SpillThread的线程处理函数只是做一层封装,当索引表中的kvstart和kvend指向一样的索引位置时,会持续处于等待过程,等待外部通知需要触发spill动作,当有spill请求时,会触发StartSpill来唤醒SpillThread线程,进入到sortAndS

MapReduce源码分析之MapTask分析

前言 MapReduce的源码分析是基于Hadoop1.2.1基础上进行的代码分析. 该章节会分析在MapTask端的详细处理流程以及MapOutputCollector是如何处理map之后的collect输出的数据. map端的主要处理流程 图1 MapTask处理流程 图1所示为MapTask的主要代码执行流程,在MapTask启动后会进入入口run函数,根据是否使用新的api来决定选择运行新的mapper还是旧的mapper,最后完成执行向外汇报. 在这,我们选择分析旧的api,也就是ru

MapReduce阶段源码分析以及shuffle过程详解

MapReducer工作流程图: 1. MapReduce阶段源码分析 1)客户端提交源码分析 解释:   - 判断是否打印日志   - 判断是否使用新的API,检查连接   - 在检查连接时,检查输入输出路径,计算切片,将jar.配置文件复制到HDFS   - 计算切片时,计算最小切片数(默认为1,可自定义)和最大切片数(默认是long的最大值,可以自定义)   - 查看给定的是否是文件,如果是否目录计算目录下所有文件的切片   - 通过block大小和最小切片数.最大切片数计算出切片大小  

MapReduce源码分析之LocatedFileStatusFetcher

LocatedFileStatusFetcher是MapReduce中一个针对给定输入路径数组,使用配置的线程数目来获取数据块位置的实用类.它的主要作用就是利用多线程技术,每个线程对应一个任务,每个任务针对给定输入路径数组Path[],解析出文件状态列表队列BlockingQueue<List<FileStatus>>.其中,输入数据输入路径只不过是一个Path,而输出数据则是文件状态列表队列BlockingQueue<List<FileStatus>>,文

Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(一)

我们知道,如果想要在Yarn上运行MapReduce作业,仅需实现一个ApplicationMaster组件即可,而MRAppMaster正是MapReduce在Yarn上ApplicationMaster的实现,由其控制MR作业在Yarn上的执行.如此,随之而来的一个问题就是,MRAppMaster是如何控制MapReduce作业在Yarn上运行的,换句话说,MRAppMaster上MapReduce作业处理总流程是什么?这就是本文要研究的重点. 通过MRAppMaster类的定义我们就能看出

Vue.js 源码分析(二十三) 高级应用 自定义指令详解

除了核心功能默认内置的指令 (v-model 和 v-show),Vue 也允许注册自定义指令. 官网介绍的比较抽象,显得很高大上,我个人对自定义指令的理解是:当自定义指令作用在一些DOM元素或组件上时,该元素在初次渲染.插入到父节点.更新.解绑时可以执行一些特定的操作(钩子函数() 自定义指令有两种注册方式,一种是全局注册,使用Vue.directive(指令名,配置参数)注册,注册之后所有的Vue实例都可以使用,另一种是局部注册,在创建Vue实例时通过directives属性创建局部指令,局

ABP源码分析三十三:ABP.Web

ABP.Web模块并不复杂,主要完成ABP系统的初始化和一些基础功能的实现. AbpWebApplication : 继承自ASP.Net的HttpApplication类,主要完成下面三件事一,在Application_Start完成AbpBootstrapper的初始化.整个ABP系统的初始化就是通过AbpBootstrapper完成初始化的.二,在Application_BeginRequest设置根据request或cookie中的Culture信息,完成当前工作线程的CurrentCu