yarn作业提交过程源码

记录源码细节,内部有中文注释

Client 端:

//最终通过ApplicationClientProtocol协议提交到RM端的ClientRMService内
package org.apache.hadoop.mapred;
jobclient包内
YarnRunner
  public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
  throws IOException, InterruptedException {

    addHistoryToken(ts);

    // Construct necessary information to start the MR AM
    ApplicationSubmissionContext appContext =
      createApplicationSubmissionContext(conf, jobSubmitDir, ts);

    // Submit to ResourceManager
    try {
      ApplicationId applicationId =
          resMgrDelegate.submitApplication(appContext);  //提交作业

      ApplicationReport appMaster = resMgrDelegate
          .getApplicationReport(applicationId);

          ResourceMgrDelegate类

            public ApplicationId
      submitApplication(ApplicationSubmissionContext appContext)
          throws YarnException, IOException {
    return client.submitApplication(appContext);
  }

  public ResourceMgrDelegate(YarnConfiguration conf) {
    super(ResourceMgrDelegate.class.getName());
    this.conf = conf;
    this.client = YarnClient.createYarnClient(); //该方法会创建YarnClientImpl,具体提交逻辑在该类里
    init(conf);
    start();
  }

  YarnClientImpl类

  public ApplicationId
      submitApplication(ApplicationSubmissionContext appContext)
          throws YarnException, IOException {
    ApplicationId applicationId = appContext.getApplicationId();
    appContext.setApplicationId(applicationId);
    SubmitApplicationRequest request =
        Records.newRecord(SubmitApplicationRequest.class);
    request.setApplicationSubmissionContext(appContext);
    rmClient.submitApplication(request);  //ApplicationClientProtocol rmClient

RM端:

//提交只是往中央异步处理器加入RMAppEventType.START事件,异步处理,之后不等待处理结果,直接返回个简单的respone
ClientRMService内:

public SubmitApplicationResponse submitApplication(
      SubmitApplicationRequest request) throws YarnException {
    ApplicationSubmissionContext submissionContext = request
        .getApplicationSubmissionContext();
    ApplicationId applicationId = submissionContext.getApplicationId();
.....
      }
    }

    try {
      // call RMAppManager to submit application directly
      rmAppManager.submitApplication(submissionContext,
          System.currentTimeMillis(), false, user);   //作业提交,调用的是RMAppManager中方法

      LOG.info("Application with id " + applicationId.getId() + 
          " submitted by user " + user);
      RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
          "ClientRMService", applicationId);
    } catch (YarnException e) {
      LOG.info("Exception in submitting application with id " +
          applicationId.getId(), e);
      RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
          e.getMessage(), "ClientRMService",
          "Exception in submitting application", applicationId);
      throw e;
    }
    ...
       SubmitApplicationResponse response = recordFactory
        .newRecordInstance(SubmitApplicationResponse.class);
    return response;
    
    
    
    
    protected void submitApplication(
      ApplicationSubmissionContext submissionContext, long submitTime,
      boolean isRecovered, String user) throws YarnException {
   ......

    // Create RMApp
    RMApp application =
        new RMAppImpl(applicationId, rmContext, this.conf,
            submissionContext.getApplicationName(), user,
            submissionContext.getQueue(),
            submissionContext, this.scheduler, this.masterService,
            submitTime, submissionContext.getApplicationType());

    ....
  
    }

    // All done, start the RMApp
    this.rmContext.getDispatcher().getEventHandler().handle(
        new RMAppEvent(applicationId, isRecovered ? RMAppEventType.RECOVER:
            RMAppEventType.START)); //往异步处理器增加个RMAppEvent事件,类型枚值RMAppEventType.START
            //在RM内部会注册该类型的事件会用什么处理器来处理
  }
  
  在RM内部
     // Register event handler for RmAppEvents
    this.rmDispatcher.register(RMAppEventType.class,
        new ApplicationEventDispatcher(this.rmContext));
        ...
  
  //ApplicationEventDispatcher,最终会调用到RMAPPImpl来处理这个事件
    public void handle(RMAppEvent event) {

    this.writeLock.lock();
MAppEventType.START
    try {
      ApplicationId appID = event.getApplicationId();
      LOG.debug("Processing event for " + appID + " of type "
          + event.getType());
      final RMAppState oldState = getState();
      try {
        /* keep the master in sync with the state machine */
        this.stateMachine.doTransition(event.getType(), event);  //stateMachine通过状态工厂创建,状态工厂核心addTransition
        //各种状态转变对应的处理器,有个submit应该是对应到MAppEventType.START
      } catch (InvalidStateTransitonException e) {
        LOG.error("Can't handle this event at current state", e);
        
  
    private static final class StartAppAttemptTransition extends RMAppTransition {
    public void transition(RMAppImpl app, RMAppEvent event) {
      if (event.getType().equals(RMAppEventType.APP_SAVED)) {
        assert app.getState().equals(RMAppState.NEW_SAVING);
        RMAppStoredEvent storeEvent = (RMAppStoredEvent) event;
        if(storeEvent.getStoredException() != null) {
          // For HA this exception needs to be handled by giving up
          // master status if we got fenced
          LOG.error("Failed to store application: "
              + storeEvent.getApplicationId(),
              storeEvent.getStoredException());
          ExitUtil.terminate(1, storeEvent.getStoredException());
        }
      }

      app.createNewAttempt(true);  //
    };
  }
  
  
  
    private void createNewAttempt(boolean startAttempt) {
    ApplicationAttemptId appAttemptId =
        ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
    RMAppAttempt attempt =
        new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
          submissionContext, conf, user);  //新建个RMAppAttemptImpl
    attempts.put(appAttemptId, attempt);
    currentAttempt = attempt;
    if(startAttempt) {
      handler.handle(
          new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));//此处是RMAppAttemptEvent加入异步处理器的队列
          //RM register可以看到其对应的处理器,最终调用的是RMAppAttemptImpl的handle方法
          
          
    }
    
    RMAppAttemptImpl类:
      public void handle(RMAppAttemptEvent event) {

    this.writeLock.lock();

    try {
      ApplicationAttemptId appAttemptID = event.getApplicationAttemptId();
      LOG.debug("Processing event for " + appAttemptID + " of type "
          + event.getType());
      final RMAppAttemptState oldState = getAppAttemptState();
      try {
        /* keep the master in sync with the state machine */
        this.stateMachine.doTransition(event.getType(), event);  //
      } catch (InvalidStateTransitonException e) {
..
其中状态机有  .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED,
          RMAppAttemptEventType.START, new AttemptStartedTransition())  
          
          
         AttemptStartedTransition的 Transition方法
         ...
             // Add the application to the scheduler
      appAttempt.eventHandler.handle(
          new AppAddedSchedulerEvent(appAttempt.applicationAttemptId,
              appAttempt.submissionContext.getQueue(), appAttempt.user)) //该事件即是schedulerEventType,会交给schedulerDispatcher
              //该对象赋值SchedulerEventDispatcher,它在内部又维护了个类中央异步处理,run方法内都统一通过scheduler处理事件
              
          
          //查看FIFO Scheduler的handle方法:
              
          case APP_ADDED:
    {
      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
      addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent
          .getUser());  //
    }
                
    
      private synchronized void addApplication(ApplicationAttemptId appAttemptId,
      String user) {
    // TODO: Fix store
    FiCaSchedulerApp schedulerApp = 
        new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager,
            this.rmContext);
    applications.put(appAttemptId, schedulerApp);
    metrics.submitApp(user, appAttemptId.getAttemptId()); 
    LOG.info("Application Submission: " + appAttemptId.getApplicationId() + 
        " from " + user + ", currently active: " + applications.size());
    rmContext.getDispatcher().getEventHandler().handle(
        new RMAppAttemptEvent(appAttemptId,
            RMAppAttemptEventType.APP_ACCEPTED)); //又是个新的状态,最终RM的ApplicationMasterLauncher与NM通信
            //启动AM,AM又向RM注册,那AM实始化各个map task,reduce task是怎么做的呢
  }
  
  //该事件会ApplicationAttemptEventDispatcher来处理,在register里注册,会调用RMAppAttempImpl.handle来处理
  public void handle(RMAppAttemptEvent event) {

    this.writeLock.lock();

    try {
      ApplicationAttemptId appAttemptID = event.getApplicationAttemptId();
      LOG.debug("Processing event for " + appAttemptID + " of type "
          + event.getType());
      final RMAppAttemptState oldState = getAppAttemptState();
      try {
        /* keep the master in sync with the state machine */
        this.stateMachine.doTransition(event.getType(), event);  // RMAppAttemptEventType.APP_ACCEPTED会激发从什么状态到什么状态,然后执行什么事件.addTransition定义
        //会到schedulered状态,再通过CONTAINER_ALLOCATED事件到ALLOCATED_SAVING状态,再通过CONTAINER_ACQURIED到
        //ALLOCATED状态,再通过LAUNCHED事件到LAUNCHED状态
        
        
        比如:
          .addTransition(RMAppAttemptState.SCHEDULED,
          RMAppAttemptState.ALLOCATED_SAVING,
          RMAppAttemptEventType.CONTAINER_ALLOCATED,
          new AMContainerAllocatedTransition()) //CONTAINER_ALLOCATED会激动SCHEDULED到ALLOCATED_SAVING状态,并执行CONTAINER_ALLOCATED
          //最后会在nm端启动appmaster,appmaster会初始化一系列map,reduce task,再向RM注册,向RM发送heartbeat
          //为task请求资源,注意心跳可能没有新的请求资源信息,再从RM内存结构里已经分配好取
          //注意NM心跳到,也会执行资源分配,保留在内存结构,等appmaster来取
          
       关键是状态机RMAPPImpl RMAppAttempImpl,内部会定义一系列的状态到状态的转换及对应的处理类
  
时间: 2024-11-02 19:17:30

yarn作业提交过程源码的相关文章

ThreadPoolExecutor任务提交过程源码浅析

线程池是一种重复利用既有线程的池化技术 ,它大量减少了线程的创建初始化过程,也可以防止海量线程创建占尽资源的风险. 任务提交过程 学习使用线程池的使用,我们都大概知道这样一个过程,如图: 这个是一个Runnable实例提交到线程池的过程,大体分为4个步骤: 1)判断当前线程数量是否小于核心线程数量,如果小于则创建一个新的线程去执行该任务: 2)如果线程数已经超过了核心线程数,那么就提交到等待工作队列(等待队列的任务将会被既有的线程获取并处理). 3)如果等待队列已经满了,无法再提交任务,那么将会

JobTracker启动流程源码级分析

org.apache.hadoop.mapred.JobTracker类是个独立的进程,有自己的main函数.JobTracker是在网络环境中提交及运行MR任务的核心位置. main方法主要代码有两句: 1 //创建jobTracker对象 2 JobTracker tracker = startTracker(new JobConf()); 3 //启动各个服务,包括JT内部一些重要的服务或者线程 4 tracker.offerService(); 一.startTracker(new Jo

TaskTracker启动过程源码级分析

TaskTracker也是作为一个单独的JVM来运行的,其main函数就是TaskTracker的入口函数,当运行start-all.sh时,脚本就是通过SSH运行该函数来启动TaskTracker的. TaskTracker是JobTracker和Task之间的桥梁:一方面,从JobTracker接收并执行各种命令:运行任务.提交任务.杀死任务等:另一方面,将本地节点上各个任务的状态通过心跳周期性汇报给JobTracker.TaskTracker与JobTracker和Task之间采用了RPC

mapreduce job提交流程源码级分析(三)

mapreduce job提交流程源码级分析(二)(原创)这篇文章说到了jobSubmitClient.submitJob(jobId, submitJobDir.toString(), jobCopy.getCredentials())提交job,最终调用的是JobTracker.submitJob:而这篇文章JobTracker启动流程源码级分析则是分析的JobTracker的启动过程,JobTracker启动之后就会等待提交作业管理作业等. 接下来看看JobTracker.submitJo

GitHub超详细图文攻略 - Git客户端下载安装 GitHub提交修改源码工作流程 Git分支 标签 过滤 Git版本工作流(转载)

最近听同事说他都在使用GitHub,GitHub是程序员的社区,在里面可以学到很多书上学不到的东西,所以最近在准备入手这方面的知识去尝试学习,正好碰到这么详细完整的文章,就转载了,希望对自己和大家有帮助. GitHub操作总结 : 总结看不明白就看下面的详细讲解. GitHub操作流程 : 第一次提交 : 方案一 : 本地创建项目根目录, 然后与远程GitHub关联, 之后的操作一样; -- 初始化Git仓库 :git init ; -- 提交改变到缓存 :git commit -m 'desc

android 小说类源码制作教程源码下载

自己闲着没事制作了个小说软件用来自己看全本/连载小说, 翻页,字体大小,目录,自动更新 具体效果如下:奉献给大家下载查看... 下载APK效果查看地址: http://yun.baidu.com/s/1gdknYyJ 源码下载地址: http://download.csdn.net/detail/ainibaifenbai/7575817 android 小说类源码制作教程源码下载,布布扣,bubuko.com

Flume-NG启动过程源码分析(三)(原创)

上一篇文章分析了Flume如何加载配置文件的,动态加载也只是重复运行getConfiguration(). 本篇分析加载配置文件后各个组件是如何运行的? 加载完配置文件订阅者Application类会收到订阅信息执行: @Subscribe public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); startAllComponents(conf)

【Java】【Flume】Flume-NG启动过程源码分析(一)

从bin/flume 这个shell脚本可以看到Flume的起始于org.apache.flume.node.Application类,这是flume的main函数所在. main方法首先会先解析shell命令,如果指定的配置文件不存在就甩出异常. 根据命令中含有"no-reload-conf"参数,决定采用那种加载配置文件方式:一.没有此参数,会动态加载配置文件,默认每30秒加载一次配置文件,因此可以动态修改配置文件:二.有此参数,则只在启动时加载一次配置文件.实现动态加载功能采用了

【Java】【Flume】Flume-NG启动过程源码分析(二)

本节分析配置文件的解析,即PollingPropertiesFileConfigurationProvider.FileWatcherRunnable.run中的eventBus.post(getConfiguration()).分析getConfiguration()方法.此方法在AbstractConfigurationProvider类中实现了,并且这个类也初始化了三大组件的工厂类:this.sourceFactory = new DefaultSourceFactory();this.s