1.一个标准 MR-Job 的执行入口:
//参数 true 表示检查并打印 Job 和 Task 的运行状况 System.exit(job.waitForCompletion(true) ? 0 : 1);
2.job.waitForCompletion(true)方法的内部实现
//job.waitForCompletion()方法的内部实现 public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit(); //此方法的核心在于submit() } if (verbose) { //根据传入的参数,决定是否打印Job运行的详细过程 monitorAndPrintJob(); } else { // get the completion poll interval from the client. int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf()); while (!isComplete()) { try { Thread.sleep(completionPollIntervalMillis); } catch (InterruptedException ie) { } } }
3. Job 类 submit()方法的内部实现
public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); setUseNewAPI();//使用MapReduce新的API connect();//返回一个【客户端代理对象 Cluster】(属于 Job 类),用于和服务端RM建立RPC通信 final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { //提交Job return submitter.submitJobInternal(Job.this, cluster); } }); state = JobState.RUNNING;//设置 JobStatus 为 Running LOG.info("The url to track the job: " + getTrackingURL()); }
3.1.1.查看Connect()方法的内部实现
private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { if (cluster == null) { cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() { public Cluster run() throws IOException, InterruptedException, ClassNotFoundException { //返回一个Cluster对象,并将此对象作为 Job 类的一个成员变量 //即 Job 类持有 Cluster 的引用。 return new Cluster(getConfiguration()); } }); } }
3.1.2.查看new Cluster()的实现过程
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { this.conf = conf; this.ugi = UserGroupInformation.getCurrentUser(); initialize(jobTrackAddr, conf);//重点在于此方法的内部实现 }
3.1.3.客户端代理对象Cluster实例化过程有两种实现:LocalClientProtocolProvider(本地模式)和YarnClientProtocolProvider(Yarn模式)。
synchronized (frameworkLoader) { for (ClientProtocolProvider provider : frameworkLoader) { LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName()); //ClientProtocol 是Client端和NN通信的RPC协议,根据RPC通信原理,此协议接口中必定包含一个 versionID 字段。 ClientProtocol clientProtocol = null; try { if (jobTrackAddr == null) { clientProtocol = provider.create(conf); } else { clientProtocol = provider.create(jobTrackAddr, conf); } if (clientProtocol != null) { //初始化Cluster内部成员变量 clientProtocolProvider = provider; client = clientProtocol; //创建Cluster类的客户端代理对象client LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider"); break; } else { LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol"); } } catch (Exception e) { LOG.info("Failed to use " + provider.getClass().getName() + " due to error: " + e.getMessage()); } } }
3.1.4.ClientProtocol接口中包含的versionID 字段
//Version 37: More efficient serialization format for framework counters public static final long versionID = 37L;
3.2.1.查看 JobSubmitter 类中 submitJobInternal()方法的实现:
时间: 2024-10-27 07:44:00