Yarn任务提交流程(源码分析)

Based on Hadoop 2.7.1

JobSubmitter

  • addMRFrameworkToDistributedCache(Configuration conf) : mapreduce.application.framework.path, 用于指定其他framework的hdfs 路径配置,默认yarn的可以不管
  • Token相关的方法:读取认证信息(支持二进制、json),并将其添加至相应的fileSystem中,以便以同样权限访问文件系统
  • copyAndConfigureFiles(Job job, Path jobSubmitDir): 上传配置、jar、files、libjars、archives等
  • submitJobInternal: 真正的提交任务接口

核心代码提交链

  1. JobSubmitter ->
  2. ClientProtocol(YARNRunner) ->
  3. ResourceMgrDelegate ->
  4. YarnClient(YarnClientImpl).submitApplication( ApplicationSubmissionContext appContext) ->
  5. 【RM】ApplicationClientProtocol(ClientRMService).submitApplication( SubmitApplicationRequest request) -> // fill ASC with dft values
  6. RMAppManager.submitApplication( ApplicationSubmissionContext submissionContext, long submitTime, String user) ->
  • ApplicationSubmissionContext 提交上下文,包含application各种元信息
  • SubmitApplicationRequest 提交Request对象
// Dispatcher is not yet started at this time, so these START events
// enqueued should be guaranteed to be first processed when dispatcher
// gets started.
this.rmContext.getDispatcher().getEventHandler()
    .handle(new RMAppEvent(applicationId, RMAppEventType.START));

START -> APP_NEW_SAVED

 stateMachineFactory.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
        RMAppEventType.START, new RMAppNewlySavingTransition())

//...

private static final class RMAppNewlySavingTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {

      // If recovery is enabled then store the application information in a
      // non-blocking call so make sure that RM has stored the information
      // needed to restart the AM after RM restart without further client
      // communication
      LOG.info("Storing application with id " + app.applicationId);
      app.rmContext.getStateStore().storeNewApplication(app);
    }
  }

  public synchronized void storeNewApplication(RMApp app) {
    ApplicationSubmissionContext context = app
                                            .getApplicationSubmissionContext();
    assert context instanceof ApplicationSubmissionContextPBImpl;
    ApplicationStateData appState =
        ApplicationStateData.newInstance(
            app.getSubmitTime(), app.getStartTime(), context, app.getUser());
    dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
  }

   private static final class AddApplicationToSchedulerTransition extends
      RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
        app.submissionContext.getQueue(), app.user,
        app.submissionContext.getReservationID()));
    }
  }

AppAddedSchedulerEvent 会由配置的Scheduler来handle。

P.S. 看 event 部分代码的方法,

  1. 找出状态,比如 APP_NEW_SAVED,
  2. 找出handle这个状态的事件类
  3. 找出处理这个事件的具体逻辑 (这里可能逻辑最复杂)
  4. 找下一个事件
  5. 重复。。

ApplicationMaster

START -> APPNEWSAVED -> APP_ACCEPTED ....

后面是一些attempt的启动等各种事件的反复。直接跳到 AM 部分。

ResourceManager内有 createApplicationMasterLauncher() 和 createApplicationMasterService()

private void launch() throws IOException, YarnException {
    connect();
    ContainerId masterContainerID = masterContainer.getId();
    ApplicationSubmissionContext applicationContext =
      application.getSubmissionContext();
    LOG.info("Setting up container " + masterContainer
        + " for AM " + application.getAppAttemptId());
    ContainerLaunchContext launchContext =
        createAMContainerLaunchContext(applicationContext, masterContainerID);

    StartContainerRequest scRequest =
        StartContainerRequest.newInstance(launchContext,
          masterContainer.getContainerToken());
    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
    list.add(scRequest);
    StartContainersRequest allRequests =
        StartContainersRequest.newInstance(list);

    StartContainersResponse response =
        containerMgrProxy.startContainers(allRequests);
    if (response.getFailedRequests() != null
        && response.getFailedRequests().containsKey(masterContainerID)) {
      Throwable t =
          response.getFailedRequests().get(masterContainerID).deSerialize();
      parseAndThrowException(t);
    } else {
      LOG.info("Done launching container " + masterContainer + " for AM "
          + application.getAppAttemptId());
    }
  }

   private ContainerLaunchContext createAMContainerLaunchContext(
      ApplicationSubmissionContext applicationMasterContext,
      ContainerId containerID) throws IOException {

    // Construct the actual Container
    ContainerLaunchContext container =
        applicationMasterContext.getAMContainerSpec();
    LOG.info("Command to launch container "
        + containerID
        + " : "
        + StringUtils.arrayToString(container.getCommands().toArray(
            new String[0])));

    // Finalize the container
    setupTokens(container, containerID);

    return container;
  }
  

注意以上其中两行:

  • ContainerLaunchContext launchContext = createAMContainerLaunchContext(applicationContext, masterContainerID) 创建 AM 请求
  • StartContainersResponse response = containerMgrProxy.startContainers(allRequests); 启动AM的容器并在容器内启动AM。
  @Override
  public ContainerLaunchContext getAMContainerSpec() {
    ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
    if (this.amContainer != null) {
      return amContainer;
    } // Else via proto
    if (!p.hasAmContainerSpec()) {
      return null;
    }
    amContainer = convertFromProtoFormat(p.getAmContainerSpec());
    return amContainer;
  }

  public class ApplicationSubmissionContextPBImpl
extends ApplicationSubmissionContext {
  ApplicationSubmissionContextProto proto =
      ApplicationSubmissionContextProto.getDefaultInstance();
  ApplicationSubmissionContextProto.Builder builder = null;
  boolean viaProto = false;

  private ApplicationId applicationId = null;
  private Priority priority = null;
  private ContainerLaunchContext amContainer = null;
  private Resource resource = null;
  private Set<String> applicationTags = null;
  private ResourceRequest amResourceRequest = null;
  private LogAggregationContext logAggregationContext = null;
  private ReservationId reservationId = null;

  /// ...
  }

接下来便是启动后的AppMaster 创建job,并通过AMRMClient向ResourceManager申请资源等。

时间: 2024-10-05 17:17:06

Yarn任务提交流程(源码分析)的相关文章

struts--token防止表单重复提交(源码分析)

表单重复提交 1.造成重复提交主要的两个原因: (1)        服务器处理时间久.当用户在表单中填完信息,点击"提交"按钮后,由于服务器反应时间过长没能及时看到响应信息,或者出于其它目的,再次点击"提交"按钮,从而导致在服务器端接收到两条或多条相同的信息. (2)      forward跳转引起的重复提交.当用户将信息提交到服务器,服务器响应采用forward方式调转到下一个页面后,此时地址栏中显示的是上个页面的URL,若刷新当前页面,浏览器会将再次提交用户

solr源码分析之solrclound

一.简介 SolrCloud是Solr4.0版本以后基于Solr和Zookeeper的分布式搜索方案.SolrCloud是Solr的基于Zookeeper一种部署方式.Solr可以以多种方式部署,例如单机方式,多机Master-Slaver方式. 二.特色功能 SolrCloud有几个特色功能: 集中式的配置信息使用ZK进行集中配置.启动时可以指定把Solr的相关配置文件上传Zookeeper,多机器共用.这些ZK中的配置不会再拿到本地缓存,Solr直接读取ZK中的配置信息.配置文件的变动,所有

Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(一)

我们知道,如果想要在Yarn上运行MapReduce作业,仅需实现一个ApplicationMaster组件即可,而MRAppMaster正是MapReduce在Yarn上ApplicationMaster的实现,由其控制MR作业在Yarn上的执行.如此,随之而来的一个问题就是,MRAppMaster是如何控制MapReduce作业在Yarn上运行的,换句话说,MRAppMaster上MapReduce作业处理总流程是什么?这就是本文要研究的重点. 通过MRAppMaster类的定义我们就能看出

八、MapReduce--job提交源码分析

一.源码分析 1.提交job的入口 通过 job.waitForCompletion(true)完成job的提交以及运行,下面从这个方法入手分析源码. //-----------------job.java public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException { //如果job的状态为未运行,则提交任务 if (this

spark core源码分析6 Spark job的提交

本节主要讲解SparkContext的逻辑 首先看一个spark自带的最简单的例子: object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = ma

spark源码分析之Executor启动与任务提交篇

任务提交流程 概述 在阐明了Spark的Master的启动流程与Worker启动流程.接下继续执行的就是Worker上的Executor进程了,本文继续分析整个Executor的启动与任务提交流程 Spark-submit 提交一个任务到集群通过的是Spark-submit 通过启动脚本的方式启动它的主类,这里以WordCount为例子 spark-submit --class cn.itcast.spark.WordCount bin/spark-clas -> org.apache.spar

YARN源码分析(三)-----ResourceManager HA之应用状态存储与恢复

前言 任何系统即使做的再大,都会有可能出现各种各样的突发状况.尽管你可以说我在软件层面上已经做到所有情况的意外处理了,但是万一硬件出问题了或者说物理层面上出了问题,恐怕就不是多写几行代码能够立刻解决的吧,说了这么多,无非就是想强调HA,系统高可用性的重要性.在YARN中,NameNode的HA方式估计很多人都已经了解了,那本篇文章就来为大家梳理梳理RM资源管理器HA方面的知识,并不是指简单的RM的HA配置,确切的说是RM的应用状态存储于恢复. RM应用状态存储使用 RM应用状态存储是什么意思呢,

Yarn源码分析之如何确定作业运行方式Uber or Non-Uber?

在MRAppMaster中,当MapReduce作业初始化时,它会通过作业状态机JobImpl中InitTransition的transition()方法,进行MapReduce作业初始化相关操作,而这其中就包括: 1.调用createSplits()方法,创建分片,并获取任务分片元数据信息TaskSplitMetaInfo数组taskSplitMetaInfo: 2.确定Map Task数目numMapTasks:分片元数据信息数组的长度,即有多少分片就有多少numMapTasks: 3.确定

Spark源码分析之四:Stage提交

各位看官,上一篇<Spark源码分析之Stage划分>详细讲述了Spark中Stage的划分,下面,我们进入第三个阶段--Stage提交. Stage提交阶段的主要目的就一个,就是将每个Stage生成一组Task,即TaskSet,其处理流程如下图所示: 与Stage划分阶段一样,我们还是从handleJobSubmitted()方法入手,在Stage划分阶段,包括最好的ResultStage和前面的若干ShuffleMapStage均已生成,那么顺理成章的下一步便是Stage的提交.在han

HBase1.0.0源码分析之请求处理流程分析以Put操作为例(一)

如下面的代码所示,是HBase Put操作的简单代码实例,关于代码中的Connection connection = ConnectionFactory.createConnection(conf),已近在前一篇博 HBase1.0.0源码分析之Client启动连接流程,中介绍了链接的相关流程以及所启动的服务信息. TableName tn = TableName.valueOf("test010"); try (Connection connection = ConnectionFa