Hadoop 源码分析——Job提交过程

1.在客户端,我们进行Job相关属性设定后,最后使用job.waitForCompletion(true);提交任务到集群中,并等待集群作业完成

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

job.setJarByClass(DataCount.class);

job.setMapperClass(DCMapper.class);

// k2 v2 and k3 v3

// job.setMapOutputKeyClass(Text.class);

// job.setMapOutputValueClass(DataBean.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));

job.setReducerClass(DCReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(DataBean.class);

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);

}

2.核心是job.waitForCompletion(true);过程分析,下面我们重点分析此过程。

1)此过程检查Job状态后,状态OK则提交作业submit()

2)提交作业先,需要建立连接connect(),此连接过程会创建Cluster对象,并生成cluster的引用

3)Job类持有Cluster的引用,而Cluster持用ResourceManager进程的引用,Cluster也持有RPC代理对象client

说明:客户端持有服务端的引用,这样就可以建立RPC通信

4)Cluster的构造方法 initialize(jobTrackAddr, conf);会创建如下对象 ClientProtocol clientProtocol,而这一个对象就是一个接口,最后再将这一个代理对象赋值给 client = clientProtocol;  而client 就是Cluster的成员变量

5)上述就是Job建立连接的过程,完成连接后需要得到一个提交器,Job创建一个提交器JobSubmitter submitter

6)通过提交器将Job,cluster传入submitter.submitJobInternal(Job.this, cluster);

7)submitter 检查输出目录是否有异常,接着得到一个存储Jar包路径jobStagingArea ,再得到一个JobID,JobID是通过submitter里的RPC引用得到,实际JobID是在服务端实现

8)submitter提交器将提交jar地址是通过jobStagingArea和JobID拼接而成

9)submitter提交器copyAndConfigureFiles接口将Jar包和配置信息提交到hdfs里,默认向hdfs写10份(也可以通过配置mapreduce.client.submit.file.replication,当然提交的份数也可以通过读取配置文件获得mapreduce.client.submit.file.replication(mapred-default.xml 682行))

10)submitter提交器通过copyAndConfigureFiles拷贝Jar信息到hdfs

11)submitter提交器通过服务端代理对象submitClient.submitJob提交Job信息,服务端最终将信息提交给ResourceManager

时间: 2024-11-03 07:53:52

Hadoop 源码分析——Job提交过程的相关文章

hadoop源码分析解读入门

hadoop 源代码分析(一) Google 的核心竞争技术是它的计算平台.HadoopGoogle的大牛们用了下面5篇文章,介绍了它们的计算设施. Google的几篇论文 GoogleCluster:http://research.google.com/archive/googlecluster.html Chubby:http://labs.google.com/papers/chubby.html GFS:http://labs.google.com/papers/gfs.html Big

细水长流Hadoop源码分析(3)RPC Server初始化构造

声明:个人原创,转载请注明出处.文中引用了一些网上或书里的资料,如有不妥之处请告之. 本文是我阅读Hadoop 0.20.2第二遍时写的笔记,在阅读过程中碰到很多问题,最终通过各种途径解决了大部分.Hadoop整个系统设计精良,源码值得学习分布式的同学们阅读,以后会将所有笔记一一贴出,希望能方便大家阅读源码,少走弯路. 目录 4 RPC服务器(org.apache.hadoop,ipc.Server) 4.1 服务器初始化 4 RPC服务器(org.apache.hadoop,ipc.Serve

Hadoop源码分析(2)——Configuration类

这篇文章主要介绍Hadoop的系统配置类Configuration. 接着上一篇文章介绍,上一篇文章中Hadoop Job的main方法为: public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new CalculateSumJob(),args); System.exit(res); } 其中ToolRunner.run方法传入的第一个变量

Hadoop源码分析—— Job任务的程序入口

这篇文章大致介绍Hadoop Job的程序是如何启动的. 通常用Java编写的Hadoop MapReduce程序是通过一个main方法作为程序的整个入口,如下: public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new CalculateSumJob(),args); System.exit(res);} 可以看到这个Job任务的MapR

SOFA 源码分析 —— 服务引用过程

前言 在前面的 SOFA 源码分析 -- 服务发布过程 文章中,我们分析了 SOFA 的服务发布过程,一个完整的 RPC 除了发布服务,当然还需要引用服务. So,今天就一起来看看 SOFA 是如何引用服务的.实际上,基础逻辑和我们之前用 Netty 写的 RPC 小 demo 类似.有兴趣可以看看这个 demo-- 自己用 Netty 实现一个简单的 RPC. 示例代码 ConsumerConfig<HelloService> consumerConfig = new ConsumerCon

MyBatis 源码分析 - 配置文件解析过程

* 本文速览 由于本篇文章篇幅比较大,所以这里拿出一节对本文进行快速概括.本篇文章对 MyBatis 配置文件中常用配置的解析过程进行了较为详细的介绍和分析,包括但不限于settings,typeAliases和typeHandlers等,本文的篇幅也主要在对这三个配置解析过程的分析上.下面,我们来一起看一下本篇文章的目录结构. 从目录上可以看出,2.3节.2.5节和2.8节的内容比较多.其中2.3节是关于settings配置解析过程的分析,除了对常规的 XML 解析过程分析,本节额外的分析了元

源码分析HotSpot GC过程(一)

«上一篇:源码分析HotSpot GC过程(一)»下一篇:源码分析HotSpot GC过程(三):TenuredGeneration的GC过程 原文地址:https://www.cnblogs.com/WCFGROUP/p/9743676.html

Hadoop源码分析之Map输入

对于MapReduce的输入输出Hadoop的官网如下所示 Input and Output types of a MapReduce job: (input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output) 这里将从源码分析 input <k1,v1>->map 的过程, Mapper 基

MapReduce阶段源码分析以及shuffle过程详解

MapReducer工作流程图: 1. MapReduce阶段源码分析 1)客户端提交源码分析 解释:   - 判断是否打印日志   - 判断是否使用新的API,检查连接   - 在检查连接时,检查输入输出路径,计算切片,将jar.配置文件复制到HDFS   - 计算切片时,计算最小切片数(默认为1,可自定义)和最大切片数(默认是long的最大值,可以自定义)   - 查看给定的是否是文件,如果是否目录计算目录下所有文件的切片   - 通过block大小和最小切片数.最大切片数计算出切片大小