YARN AM与RM通信

AppMaster向RM请求资源

//APPMASTER向RM发送心跳,更新资源请求结构,从分配好的内存结构取出分配的资源,具体任务分配是后台异步的由NM发送心跳驱动
MRAppMaster
:serviceinit
  // service to allocate containers from RM (if non-uber) or to fake it (uber)
      containerAllocator = createContainerAllocator(null, context);
      addIfService(containerAllocator);
      dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);

        protected ContainerAllocator createContainerAllocator(
      final ClientService clientService, final AppContext context) {
    return new ContainerAllocatorRouter(clientService, context);  //
  }

    private final class ContainerAllocatorRouter extends AbstractService
      implements ContainerAllocator, RMHeartbeatHandler {
    private final ClientService clientService;
    private final AppContext context;
    private ContainerAllocator containerAllocator;

   .....
    @Override
    protected void serviceStart() throws Exception {
      if (job.isUber()) {
        this.containerAllocator = new LocalContainerAllocator(
            this.clientService, this.context, nmHost, nmPort, nmHttpPort
            , containerID);
      } else {
        this.containerAllocator = new RMContainerAllocator(              ///
            this.clientService, this.context);
      }
      ((Service)this.containerAllocator).init(getConfig());
      ((Service)this.containerAllocator).start();
      super.serviceStart();

     org.apache.hadoop.mapreduce.v2.app.rm; RMContainerAllocator类有该方法

       protected synchronized void heartbeat() throws Exception {
    scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
    List<Container> allocatedContainers = getResources();  //发远程RM发送心跳信息,注意心跳里可能没有新的资源请求信息
    //只是告诉RM自己还活着,或者只是从RM取得分配资源
    if (allocatedContainers.size() > 0) {
      scheduledRequests.assign(allocatedContainers); //获得的container具体分配到任务task (应该是重排序)
    }

    资源请求包括的字段:
    优先级,期望在的host,内存大小等 (默认三路复制,可能会有7个资源请求,3个local,3个 rack,1个随机)
    }

    RMContainerAllocator父类RMCommunicator的方法
  protected void startAllocatorThread() {
    allocatorThread = new Thread(new Runnable() {
      @Override
      public void run() {
        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
          try {
            Thread.sleep(rmPollInterval);  //默认每秒
            try {
              heartbeat();   //发送心跳
              ...

    private List<Container> getResources() throws Exception {
    int headRoom = getAvailableResources() != null
        ? getAvailableResources().getMemory() : 0;//first time it would be null
    AllocateResponse response;
    /*
     * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
     * milliseconds before aborting. During this interval, AM will still try
     * to contact the RM.
     */
    try {
      response = makeRemoteRequest();  //关键

      makeRemoteRequest方法为其父类RMContainerRequestor定义的方法

        protected AllocateResponse makeRemoteRequest() throws IOException {
    ResourceBlacklistRequest blacklistRequest =
        ResourceBlacklistRequest.newInstance(new ArrayList<String>(blacklistAdditions),
            new ArrayList<String>(blacklistRemovals));
    AllocateRequest allocateRequest =   //新建个资源请求
        AllocateRequest.newInstance(lastResponseID,
          super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),  //这个ask是集合类,存ResourceRequest实例,
          //只有个新建方法,在哪赋值的呢
          new ArrayList<ContainerId>(release), blacklistRequest);
    AllocateResponse allocateResponse;
    try {
      allocateResponse = scheduler.allocate(allocateRequest);   //关键,分配资源,此处的scheduler 并非是调度器
      //而是ApplicationMasterProtocol,他会终调用到调度器

      scheduler为其父类RMCommunicator新建
       protected ApplicationMasterProtocol scheduler;
       ...
         protected void serviceStart() throws Exception {
    scheduler= createSchedulerProxy();
    ..

  protected ApplicationMasterProtocol createSchedulerProxy() {
    final Configuration conf = getConfig();

    try {
      return ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);  //ApplicationMasterProtocol协议是关键
      //通过他远程调用ApplicationMasterService中的方法
    } catch (IOException e) {
      throw new YarnRuntimeException(e);
    }
  }

  //后面追踪ask的的赋值最终是在哪里调用

//ask的赋值方法,最后是由  addContainerReq方法,该方法在RMContainerAllocator调用
    private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
    // because objects inside the resource map can be deleted ask can end up
    // containing an object that matches new resource object but with different
    // numContainers. So exisintg values must be replaced explicitly
    if(ask.contains(remoteRequest)) {
      ask.remove(remoteRequest);
    }
    ask.add(remoteRequest);
  }

    protected void addContainerReq(ContainerRequest req) {
    // Create resource requests
    for (String host : req.hosts) {
      // Data-local
      if (!isNodeBlacklisted(host)) {
        addResourceRequest(req.priority, host, req.capability);
      }
    }

    // Nothing Rack-local for now
    for (String rack : req.racks) {
      addResourceRequest(req.priority, rack, req.capability);
    }

    // Off-switch
    addResourceRequest(req.priority, ResourceRequest.ANY, req.capability);
  }

RMContainerAllocator内
void addMap(ContainerRequestEvent event) {  //addMap方法
      ContainerRequest request = null;

      if (event.getEarlierAttemptFailed()) {
        earlierFailedMaps.add(event.getAttemptID());
        request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP);
        LOG.info("Added "+event.getAttemptID()+" to list of failed maps");
      } else {
        for (String host : event.getHosts()) {
          LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
          if (list == null) {
            list = new LinkedList<TaskAttemptId>();
            mapsHostMapping.put(host, list);
          }
          list.add(event.getAttemptID());
          if (LOG.isDebugEnabled()) {
            LOG.debug("Added attempt req to host " + host);
          }
       }
       for (String rack: event.getRacks()) {
         LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
         if (list == null) {
           list = new LinkedList<TaskAttemptId>();
           mapsRackMapping.put(rack, list);
         }
         list.add(event.getAttemptID());
         if (LOG.isDebugEnabled()) {
            LOG.debug("Added attempt req to rack " + rack);
         }
       }
       request = new ContainerRequest(event, PRIORITY_MAP);
      }
      maps.put(event.getAttemptID(), request);
      addContainerReq(request);           //调用

      //addMap在该方法内被调用
    protected synchronized void handleEvent(ContainerAllocatorEvent event) {
    recalculateReduceSchedule = true;
    ..................
        scheduledRequests.addMap(reqEvent);//maps are immediately scheduled

          protected void serviceStart() throws Exception {
    this.eventHandlingThread = new Thread() {
      @SuppressWarnings("unchecked")
      @Override
      public void run() {

        ContainerAllocatorEvent event;

        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
          try {
            event = RMContainerAllocator.this.eventQueue.take();  //取出事件
          } catch (InterruptedException e) {
            if (!stopped.get()) {
              LOG.error("Returning, interrupted : " + e);
            }
            return;
          }

          try {
            handleEvent(event);   //调用

           // 事件加入在MRAppMaster内,加入的事件在上面的方法被处理,该方法在哪里调用了呢?
              public void handle(ContainerAllocatorEvent event) {
      this.containerAllocator.handle(event);
    }
   

RM端接受AppMaster心跳请求

 //总结,applicationmaster最终通过ApplicationMasterProtocol#allocate向RM汇报资源需求,RM端的ApplicationMasterService提供服务,并最终调用调度器的allocate
 //将新的资源需求写入内存结构,并返回已经分配的资源
  public class ApplicationMasterService extends AbstractService implements
    ApplicationMasterProtocol {
  public AllocateResponse allocate(AllocateRequest request)
      throws YarnException, IOException {
      	..
      	   // Allow only one thread in AM to do heartbeat at a time.
    synchronized (lastResponse) {

      // Send the status update to the appAttempt.
      this.rmContext.getDispatcher().getEventHandler().handle(
          new RMAppAttemptStatusupdateEvent(appAttemptId, request
              .getProgress()));

      List<ResourceRequest> ask = request.getAskList();  //ask,release为封装的请求
      List<ContainerId> release = request.getReleaseList(

      // Send new requests to appAttempt.
      Allocation allocation =
          this.rScheduler.allocate(appAttemptId, ask, release,
              blacklistAdditions, blacklistRemovals);  //调有RM端的调度器 rScheduler
              ..
                allocateResponse.setUpdatedNodes(updatedNodeReports);
      }

//封装一个response返回
      allocateResponse.setAllocatedContainers(allocation.getContainers());
      allocateResponse.setCompletedContainersStatuses(appAttempt
          .pullJustFinishedContainers());
      allocateResponse.setResponseId(lastResponse.getResponseId() + 1);
      allocateResponse.setAvailableResources(allocation.getResourceLimit());

      allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());

      // add preemption to the allocateResponse message (if any)
      allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation));

      // Adding NMTokens for allocated containers.
      if (!allocation.getContainers().isEmpty()) {
        allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager()
            .createAndGetNMTokens(app.getUser(), appAttemptId,

//FIFO Scheduler的allocate方法
...
    // Update application requests
        application.updateResourceRequests(ask);  //将此次资源请求写入application的请求内存结构,等待nm发送心跳分配完后,写入application的分配内存结构,
        //最终要更新到这样的一个内存结构 final Map<Priority, Map<String, ResourceRequest>> requests =
   // new HashMap<Priority, Map<String, ResourceRequest>>();
...
      return new Allocation(
          application.pullNewlyAllocatedContainers(),    //application内部的集合类,从分配好的内存结构里取
          application.getHeadroom());

//application为FiCaSchedulerApp类
  synchronized public List<Container> pullNewlyAllocatedContainers() {
    List<Container> returnContainerList = new ArrayList<Container>(
        newlyAllocatedContainers.size());
    for (RMContainer rmContainer : newlyAllocatedContainers) {   //只是从newlyAllocatedContainers里面取,newlyAllocatedContainers的赋值是NM发送心跳后调用assignContainer后赋值的
      rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
          RMContainerEventType.ACQUIRED));
      returnContainerList.add(rmContainer.getContainer());
    }
    newlyAllocatedContainers.clear();
    return returnContainerList;
  }

  synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
      Priority priority, ResourceRequest request,
      Container container) {

   ....
    // Add it to allContainers list.
    newlyAllocatedContainers.add(rmContainer);  //给其赋值

    //FIFO scheduler类调用上面方法,该方法是NM发送心跳最终调用的方法
     private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application,
      Priority priority, int assignableContainers,
      ResourceRequest request, NodeType type) {
  ....
        }

        // Create the container
        Container container =
            BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
              .getHttpAddress(), capability, priority, containerToken);

        // Allocate!

        // Inform the application
        RMContainer rmContainer =
            application.allocate(type, node, priority, request, container);

//总结以上看到的,也就是appmaster向RM发送请求,是从当前内存结构返回资源请求,这个过程是异步的,当nm发送心跳,会根据appmaster的资源请求分配资源
//写到内存结构,等appmaster来取 (发送的资源请求,要先保存下来,资源请求的内存结构里,保存在application FiCaSchedulerApp里application.showRequests()
时间: 2024-08-29 06:41:04

YARN AM与RM通信的相关文章

YARN NM与RM通信

NM端发送心跳 //NM发送心跳,增加一个NODE_UPDATE事件,简单返回一个respone,异步驱动型,事件再驱动assignContainers,从资源请求结构里取出需求分配资源 //AsyncDispatcher原理 //一个event队列,一个eventtype.class 到处理器对应关系(仅仅是一个class对应一个处理器,class是个Enum可能会有很多种值,具体逻辑在处理器内部) //从队列取出event,再从event取到type类型,再找到处理器,处理器调用handle

&lt;YaRN&gt;&lt;Official doc&gt;&lt;RM REST API&#39;s&gt;

Overview ... YARN Architecture The fundamental idea of YARN is to split up the functionalities of resource management and job scheduling/monitoring into separate daemons. [基本思想是将资源管理和任务调度/监控分开.] The idea is to have a global ResourceManager(RM) and pe

YARN的 AM与RM通信,申请资源分配过程

AppMaster向RM请求资源 MRAppMaster :serviceinit // service to allocate containers from RM (if non-uber) or to fake it (uber) containerAllocator = createContainerAllocator(null, context); addIfService(containerAllocator); dispatcher.register(ContainerAlloca

Yarn 详解

唐 清原, 咨询顾问 简介: 本文介绍了 Hadoop 自 0.23.0 版本后新的 map-reduce 框架(Yarn) 原理,优势,运作机制和配置方法等:着重介绍新的 yarn 框架相对于原框架的差异及改进:并通过 Demo 示例详细描述了在新的 yarn 框架下搭建和开发 hadoop 程序的方法. 读者通过本文中新旧 hadoop map-reduce 框架的对比,更能深刻理解新的 yarn 框架的技术原理和设计思想,文中的 Demo 代码经过微小修改即可用于用户基于 hadoop 新

认识YARN

YARN(Yet Another Resource Negotiator),它是统一的资源管理平台,是在Hadoop2.0中才出现的一个组件. YARN是Hadoop的处理层,包含资源管理器和作业调度器.它在它的资源管理器中去分配好队列,这个队列划分可以根据业务规则,根据技术的模块进行划分,比如同时拥有hive和hbase的应用,我可以为hive分配多少内存,分配多少cpu,为hbase分配多少资源,优先级多少等等. YARN允许多个数据处理引擎同时运行在单个集群上,比如: 批处理程序(比如:S

Hadoop MapReduceV2(Yarn) 框架

Hadoop MapReduceV2(Yarn) 框架简介 原 Hadoop MapReduce 框架的问题 对于业界的大数据存储及分布式处理系统来说,Hadoop 是耳熟能详的卓越开源分布式文件存储及处理框架,对于 Hadoop 框架的介绍在此不再累述,读者可参考 Hadoop 官方简介.使用和学习过老 Hadoop 框架(0.20.0 及之前版本)的同仁应该很熟悉如下的原 MapReduce 框架图: 图 1.Hadoop 原 MapReduce 架构 从上图中可以清楚的看出原 MapRed

Hadoop 新 MapReduce 框架 Yarn 详解

原 Hadoop MapReduce 框架的问题 对于业界的大数据存储及分布式处理系统来说,Hadoop 是耳熟能详的卓越开源分布式文件存储及处理框架,对于 Hadoop 框架的介绍在此不再累述,读者可参考 Hadoop 官方简介.使用和学习过老 Hadoop 框架(0.20.0 及之前版本)的同仁应该很熟悉如下的原 MapReduce 框架图: 图 1.Hadoop 原 MapReduce 架构 从上图中可以清楚的看出原 MapReduce 程序的流程及设计思路: 首先用户程序 (JobCli

YARN Container 启动流程分析

YARN Container 启动流程分析 本文档从代码出发,分析了 YARN 中 Container 启动的整个过程,希望给出这个过程的一个整体的概念. 文档分为两个部分:第一部分是全局,从头至尾地把 Container 启动的整个流程串联起来:第二部分是细节,简要分析了 Container 启动流程中涉及到的服务.接口和类. 注意: 基于 hadoop-2.6.0 的代码 只写了与 Container 启动相关的逻辑,并且还大量忽略了很多细节,目的是为了得到一个整体的概念. 为了让分析更具体

Hadoop - YARN 启动流程

   一  YARN的启动流程   watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvemhhbmd6aGVianV0/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" > YARN 启动流程 1.提交应用及其使用到的资源 用户向YARN中(RM)提交应用程序,当中包含ApplicationMaster程序.启动ApplicationMaster的命令.用