YARN NM与RM通信

NM端发送心跳

//NM发送心跳,增加一个NODE_UPDATE事件,简单返回一个respone,异步驱动型,事件再驱动assignContainers,从资源请求结构里取出需求分配资源
//AsyncDispatcher原理
//一个event队列,一个eventtype.class 到处理器对应关系(仅仅是一个class对应一个处理器,class是个Enum可能会有很多种值,具体逻辑在处理器内部)
//从队列取出event,再从event取到type类型,再找到处理器,处理器调用handler(event)方法

//nodeHeartBeat增加一个RMStatusEvent事件(事件类型是RMNodeType.Status_UPDATE)
RM register到他对应的处理器
该处理器 最终调用RMNodeImpl
RMNodeImpl会增加SchedulerEvent
//
NodeManager类会调以下这个类

  NodeStatusUpdaterImpl类
  protected void startStatusUpdater() {

    statusUpdaterRunnable = new Runnable() {
      @Override
      @SuppressWarnings("unchecked")
      public void run() {
        int lastHeartBeatID = 0;
        while (!isStopped) {
 ....
            response = resourceTracker.nodeHeartbeat(request); //发送心跳到ResourceTrackerService
            ..

会rpc远程调用   ResourceTrackerService类里
  public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
      throws YarnException, IOException {

    NodeStatus remoteNodeStatus = request.getNodeStatus();
    /**
     * Here is the node heartbeat sequence...
     * 1. Check if it's a registered node
     * 2. Check if it's a valid (i.e. not excluded) node
     * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
     * 4. Send healthStatus to RMNode
     */
....

             // 4. Send status to RMNode, saving the latest response.
    this.rmContext.getDispatcher().getEventHandler().handle(
        new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),  //RMNodeStatusEvent是RMNodeEvent的子类,构造器指定RMNodeEventType.STATUS_UPDATE 类型 事件
        //在RM会通过register给asyncDispatcher指定类型对应的处理器,可查看后面代码,对应到NodeEventDispatcher处理器,该类内部会用RMNodeImpl,该类又会引起
        //scheduler相关事件
            remoteNodeStatus.getContainersStatuses(),  // 包含各个container状态,是一个list
            remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse)); //新建个事件,把他放入AsyncDispatcher里的队列,最后应该会激起ResourceScheduler来处理

   rmContext是在ResourceManager里构建,这里重点知道Dispatcher用的是哪个
   ..
    this.rmContext =
        new RMContextImpl(this.rmDispatcher, rmStore,
          this.containerAllocationExpirer, amLivelinessMonitor,
          amFinishingMonitor, delegationTokenRenewer, this.amRmTokenSecretManager,
          this.containerTokenSecretManager, this.nmTokenSecretManager,
          this.clientToAMSecretManager);     

      。。

        protected Dispatcher createDispatcher() {
    return new AsyncDispatcher();  //rmDispatcher 通过该方法构建,org.apache.hadoop.yarn.event.AsyncDispatcher
    //有个事件队列,和事件类型到事件处理器的map关系,异步线程根据event内部取出事件类型(包含事件是哪种事件类型是在其内部设置的)
    //,再找到哪个处理器,具体处理器内部处理逻辑根据不同类型enum特定值区分
    //类型.class与处理器对应关系,通过register

  }
   //同时RM里register注册各个,事件类型对应的事件处理器,  在AsyncDispatcher内的异步线程里再根据这个map对应关系知道用哪个事件处理器

       this.rmDispatcher.register(SchedulerEventType.class,  //enum类也有NODE-UPDATE的值
        this.schedulerDispatcher);

   rmDispatcher
       // Register event handler for RmAppEvents
    this.rmDispatcher.register(RMAppEventType.class,
        new ApplicationEventDispatcher(this.rmContext));

    // Register event handler for RmAppAttemptEvents
    this.rmDispatcher.register(RMAppAttemptEventType.class,
        new ApplicationAttemptEventDispatcher(this.rmContext));

    // Register event handler for RmNodes
    this.rmDispatcher.register(RMNodeEventType.class,  //枚举值,有NODE-UPDATE,NodeEventDispatcher里的处理逻辑会根据RMNodeEventType里的值做
    //分别的处理,类似case when ....
        new NodeEventDispatcher(this.rmContext));     //注册事件处理器

 NodeEventDispatcher类,在RM内部
     public void handle(RMNodeEvent event) {  //事件处理方法
      NodeId nodeId = event.getNodeId();
      RMNode node = this.rmContext.getRMNodes().get(nodeId);
      if (node != null) {
        try {
          ((EventHandler<RMNodeEvent>) node).handle(event)    ;  //通过RMNode强制转换成处理器,对RMNodeImpl同时也继承EventHandler,其内部
          //会调用scheduler相关
        } catch (Throwable t) {
          LOG.error("Error in handling event type " + event.getType()
              + " for node " + nodeId, t);
        }

   //总结
   NodeManager发送心跳到RM端的ResourceManagerService,调用nodeHeartbeat方法,发送STATUS_UPDATE 类型的事件给到RMNode,RMNodeImpl类内
   .addTransition(NodeState.RUNNING,
         EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
         RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())

   StatusUpdateWhenHealthyTransition类 transition方法
    rmNode.context.getDispatcher().getEventHandler().handle(
            new NodeUpdateSchedulerEvent(rmNode)); //会触发调度器

    //下面分析调度

RM端接受心跳后调度器分配

接上面分析

   StatusUpdateWhenHealthyTransition类
    rmNode.context.getDispatcher().getEventHandler().handle(
            new NodeUpdateSchedulerEvent(rmNode)); //会触发调度器

会增加个scheduler的事件

  在RM构造方法内已经注册了对应类型的处理事件,如下:
    // Initialize the scheduler
    this.scheduler = createScheduler();
    this.schedulerDispatcher = createSchedulerEventDispatcher();
    addIfService(this.schedulerDispatcher);
    this.rmDispatcher.register(SchedulerEventType.class,
        this.schedulerDispatcher);  //事件处理器

  protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
    return new SchedulerEventDispatcher(this.scheduler);
  }

SchedulerEventDispatcher内部又构建了个队列,将事件放入,异步处理,最后调用scheduler来处理该事件

   public void run() {

        SchedulerEvent event;

        while (!stopped && !Thread.currentThread().isInterrupted()) {
          try {
            event = eventQueue.take();
             scheduler.handle(event); //该方法调用调度器
            ...

    public void handle(SchedulerEvent event) {
      try {
        int qSize = eventQueue.size();
        if (qSize !=0 && qSize %1000 == 0) {
          LOG.info("Size of scheduler event-queue is " + qSize);
        }
        int remCapacity = eventQueue.remainingCapacity();
        if (remCapacity < 1000) {
          LOG.info("Very low remaining capacity on scheduler event queue: "
              + remCapacity);
        }
        this.eventQueue.put(event);
      } catch (InterruptedException e) {
        throw new YarnRuntimeException(e);
      }
    }

//FIFOScheduler

  public void handle(SchedulerEvent event) {
    switch(event.getType()) {
    case NODE_ADDED:
    {
      NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
      addNode(nodeAddedEvent.getAddedRMNode());
    }
    break;
    case NODE_REMOVED:
    {
      NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
      removeNode(nodeRemovedEvent.getRemovedRMNode());
    }
    break;
    case NODE_UPDATE:
    {
      NodeUpdateSchedulerEvent nodeUpdatedEvent =
      (NodeUpdateSchedulerEvent)event;
      nodeUpdate(nodeUpdatedEvent.getRMNode());
      ....

  nodeUpdate方法
     private synchronized void nodeUpdate(RMNode rmNode) {
    FiCaSchedulerNode node = getNode(rmNode.getNodeID());

    List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
....
      assignContainers(node);   

  //核心方法,分配containers
    private void assignContainers(FiCaSchedulerNode node) {
    LOG.debug("assignContainers:" +
        " node=" + node.getRMNode().getNodeAddress() +
        " #applications=" + applications.size());

    // Try to assign containers to applications in fifo order
    for (Map.Entry<ApplicationAttemptId, FiCaSchedulerApp> e : applications
        .entrySet()) {
      FiCaSchedulerApp application = e.getValue();
      LOG.debug("pre-assignContainers");
      application.showRequests();
      synchronized (application) {
        // Check if this resource is on the blacklist
        if (FiCaSchedulerUtils.isBlacklisted(application, node, LOG)) {
          continue;
        }

        for (Priority priority : application.getPriorities()) {
          int maxContainers =
            getMaxAllocatableContainers(application, priority, node,
                NodeType.OFF_SWITCH);
          // Ensure the application needs containers of this priority
          if (maxContainers > 0) {
            int assignedContainers =
              assignContainersOnNode(node, application, priority); //分配方法
            // Do not assign out of order w.r.t priorities
            if (assignedContainers == 0) {
              break;
            }
          }
        }
      }

      LOG.debug("post-assignContainers");
      application.showRequests();

      // Done
      if (Resources.lessThan(resourceCalculator, clusterResource,
              node.getAvailableResource(), minimumAllocation)) {
        break;
      }
    }

    // Update the applications' headroom to correctly take into
    // account the containers assigned in this update.
    for (FiCaSchedulerApp application : applications.values()) {
      application.setHeadroom(Resources.subtract(clusterResource, usedResource));
    }    

  assignContainersOnNode
    private int assignContainersOnNode(FiCaSchedulerNode node,
      FiCaSchedulerApp application, Priority priority
  ) {
    // Data-local
    int nodeLocalContainers =
      assignNodeLocalContainers(node, application, priority); 

    // Rack-local
    int rackLocalContainers =
      assignRackLocalContainers(node, application, priority);
    .....

   assignNodeLocalContainers
   ..
         int assignableContainers =
        Math.min(
            getMaxAllocatableContainers(application, priority, node,
                NodeType.NODE_LOCAL),
                request.getNumContainers());
      assignedContainers =
        assignContainer(node, application, priority,
            assignableContainers, request, NodeType.NODE_LOCAL);   

         //总结:NM发送心跳到RM,发送NODE_UPDATE事件,激发相关事件,最终到RMNode RMNodeImpl,将事件加入RMNodeImpl ,RMNodeImpl是一个状态机
         //addTransition内可以看到会调用到StatusUpdateWhenHealthyTransition,StatusUpdateWhenHealthyTransition类 transition方法会将NodeUpdateSchedulerEvent
         //事件加入到异步处理器, 最终会调用scheduler的assignContainers方法,该方法从application里资源请求的内存结构里取资源请求,进行分配
         //并将结果保存在application的分配内存结构等待appmaster来取
         //appmaster来取的时候,首先更新资源请求内存结构,再取分配内存结构
时间: 2024-08-03 17:33:55

YARN NM与RM通信的相关文章

YARN AM与RM通信

AppMaster向RM请求资源 //APPMASTER向RM发送心跳,更新资源请求结构,从分配好的内存结构取出分配的资源,具体任务分配是后台异步的由NM发送心跳驱动 MRAppMaster :serviceinit // service to allocate containers from RM (if non-uber) or to fake it (uber) containerAllocator = createContainerAllocator(null, context); ad

&lt;YaRN&gt;&lt;Official doc&gt;&lt;RM REST API&#39;s&gt;

Overview ... YARN Architecture The fundamental idea of YARN is to split up the functionalities of resource management and job scheduling/monitoring into separate daemons. [基本思想是将资源管理和任务调度/监控分开.] The idea is to have a global ResourceManager(RM) and pe

YARN的 AM与RM通信,申请资源分配过程

AppMaster向RM请求资源 MRAppMaster :serviceinit // service to allocate containers from RM (if non-uber) or to fake it (uber) containerAllocator = createContainerAllocator(null, context); addIfService(containerAllocator); dispatcher.register(ContainerAlloca

认识YARN

YARN(Yet Another Resource Negotiator),它是统一的资源管理平台,是在Hadoop2.0中才出现的一个组件. YARN是Hadoop的处理层,包含资源管理器和作业调度器.它在它的资源管理器中去分配好队列,这个队列划分可以根据业务规则,根据技术的模块进行划分,比如同时拥有hive和hbase的应用,我可以为hive分配多少内存,分配多少cpu,为hbase分配多少资源,优先级多少等等. YARN允许多个数据处理引擎同时运行在单个集群上,比如: 批处理程序(比如:S

Yarn之ResourceManager详细分析笔记(一)待续

一.概述     本文将介绍ResourceManager在Yarn中的功能作用,从更细的粒度分析RM内部组成的各个组件功能和他们相互的交互方式. 二.ResourceManager的交互协议与基本职能 1.ResourceManager交互协议 在整个Yarn框架中主要涉及到7个协议,分别是ApplicationClientProtocol.MRClientProtocol.ContainerManagementProtocol.ApplicationMasterProtocol.Resour

Hadoop - YARN 通信协议

一 简单介绍 RPC协议是连接各个组件的"大动脉",了解不同组件之间的RPC协议有助于我们更深入地学习YARN框架. 在YARN中.不论什么两个需相互通信的组件之间仅有一个RPC协议,而对于不论什么一个RPC协议,通信两方有一端是Client,还有一端为Server,且Client总是主动连接Server的,因此.YARN实际上採用的是拉式(pull-based)通信模型. 二  协议类型 YARN主要由下面几个RPC协议组成,各组件的通信协议(箭头指向的组件是RPC Server,而

spark 笔记 4:Apache Hadoop YARN: Yet Another Resource Negotiator

spark支持YARN做资源调度器,所以YARN的原理还是应该知道的:http://www.socc2013.org/home/program/a5-vavilapalli.pdf    但总体来说,这是一篇写得一般的论文,它的原理没有什么特别突出的,而且它列举的数据没有对比性,几乎看不出YARN有什么优势.反正我看完的感觉是,YARN的资源分配在延迟上估计很糟糕.而实际使用似乎也印证了这个预感. Abstract  two key shortcomings: 1) tight coupling

YARN Container 启动流程分析

YARN Container 启动流程分析 本文档从代码出发,分析了 YARN 中 Container 启动的整个过程,希望给出这个过程的一个整体的概念. 文档分为两个部分:第一部分是全局,从头至尾地把 Container 启动的整个流程串联起来:第二部分是细节,简要分析了 Container 启动流程中涉及到的服务.接口和类. 注意: 基于 hadoop-2.6.0 的代码 只写了与 Container 启动相关的逻辑,并且还大量忽略了很多细节,目的是为了得到一个整体的概念. 为了让分析更具体

Hadoop - YARN 启动流程

   一  YARN的启动流程   watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvemhhbmd6aGVianV0/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" > YARN 启动流程 1.提交应用及其使用到的资源 用户向YARN中(RM)提交应用程序,当中包含ApplicationMaster程序.启动ApplicationMaster的命令.用