1.在客户端,我们进行Job相关属性设定后,最后使用job.waitForCompletion(true);提交任务到集群中,并等待集群作业完成
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(DataCount.class);
job.setMapperClass(DCMapper.class);
// k2 v2 and k3 v3
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(DataBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(DCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DataBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
2.核心是job.waitForCompletion(true);过程分析,下面我们重点分析此过程。
1)此过程检查Job状态后,状态OK则提交作业submit()
2)提交作业先,需要建立连接connect(),此连接过程会创建Cluster对象,并生成cluster的引用
3)Job类持有Cluster的引用,而Cluster持用ResourceManager进程的引用,Cluster也持有RPC代理对象client
说明:客户端持有服务端的引用,这样就可以建立RPC通信
4)Cluster的构造方法 initialize(jobTrackAddr, conf);会创建如下对象 ClientProtocol clientProtocol,而这一个对象就是一个接口,最后再将这一个代理对象赋值给 client = clientProtocol; 而client 就是Cluster的成员变量
5)上述就是Job建立连接的过程,完成连接后需要得到一个提交器,Job创建一个提交器JobSubmitter submitter
6)通过提交器将Job,cluster传入submitter.submitJobInternal(Job.this, cluster);
7)submitter 检查输出目录是否有异常,接着得到一个存储Jar包路径jobStagingArea ,再得到一个JobID,JobID是通过submitter里的RPC引用得到,实际JobID是在服务端实现
8)submitter提交器将提交jar地址是通过jobStagingArea和JobID拼接而成
9)submitter提交器copyAndConfigureFiles接口将Jar包和配置信息提交到hdfs里,默认向hdfs写10份(也可以通过配置mapreduce.client.submit.file.replication,当然提交的份数也可以通过读取配置文件获得mapreduce.client.submit.file.replication(mapred-default.xml 682行))
10)submitter提交器通过copyAndConfigureFiles拷贝Jar信息到hdfs
11)submitter提交器通过服务端代理对象submitClient.submitJob提交Job信息,服务端最终将信息提交给ResourceManager