JobTracker作业调度分析

JobTracker的作业调度给我感觉就是比较宏观意义上的操作。倘若你只了解了MapReduce的工作原理是远远不够的,这时去学习一下他在宏观层面的原理实现也是对我们非常有帮助的。首先我们又得从上次分析的任务提交之后的操作说起,Job作业通过RPC通信提交到JobTracker端之后,接下来会触发到下面的方法;

/**
   * 初始化作业操作
   */
  public void initJob(JobInProgress job) {
    if (null == job) {
      LOG.info("Init on null job is not valid");
      return;
    }

    try {
      JobStatus prevStatus = (JobStatus)job.getStatus().clone();
      LOG.info("Initializing " + job.getJobID());
      //初始化Task任务
      job.initTasks();
      ......

接着会执行initTasks的方法,但不是JobTracker,而是JobInProgress类中的方法:

 /**
   * Construct the splits, etc.  This is invoked from an async
   * thread so that split-computation doesn't block anyone.
   */
  public synchronized void initTasks()
  throws IOException, KillInterruptedException, UnknownHostException {
    if (tasksInited || isComplete()) {
      return;
    }
    ......

    jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
    jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
    this.queueMetrics.addWaitingMaps(getJobID(), numMapTasks);
    this.queueMetrics.addWaitingReduces(getJobID(), numReduceTasks);

    //根据numMapTasks任务数,创建MapTask的总数
    maps = new TaskInProgress[numMapTasks];
    for(int i=0; i < numMapTasks; ++i) {
      inputLength += splits[i].getInputDataLength();
      maps[i] = new TaskInProgress(jobId, jobFile,
                                   splits[i],
                                   jobtracker, conf, this, i, numSlotsPerMap);
    }
    ......

    //
    // Create reduce tasks
    //根据numReduceTasks,创建Reduce的Task数量
    this.reduces = new TaskInProgress[numReduceTasks];
    for (int i = 0; i < numReduceTasks; i++) {
      reduces[i] = new TaskInProgress(jobId, jobFile,
                                      numMapTasks, i,
                                      jobtracker, conf, this, numSlotsPerReduce);
      nonRunningReduces.add(reduces[i]);
    }

    ......

    // create cleanup two cleanup tips, one map and one reduce.
    //创建2个clean up Task任务,1个是Map Clean-Up Task,一个是Reduce Clean-Up Task
    cleanup = new TaskInProgress[2];

    // cleanup map tip. This map doesn't use any splits. Just assign an empty
    // split.
    TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
    cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
            jobtracker, conf, this, numMapTasks, 1);
    cleanup[0].setJobCleanupTask();

    // cleanup reduce tip.
    cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
                       numReduceTasks, jobtracker, conf, this, 1);
    cleanup[1].setJobCleanupTask();

    // create two setup tips, one map and one reduce.
    //原理同上
    setup = new TaskInProgress[2];

    // setup map tip. This map doesn't use any split. Just assign an empty
    // split.
    setup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
            jobtracker, conf, this, numMapTasks + 1, 1);
    setup[0].setJobSetupTask();

    // setup reduce tip.
    setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
                       numReduceTasks + 1, jobtracker, conf, this, 1);
    setup[1].setJobSetupTask();

    ......

可以看见,在这里JobInProgress首次被划分为了很多的小的Task任务的形式存在,而这些小的任务是以TaskInProgress的类表示。在这里MapReduce把1个作业做出了如下的分解,numMapTasks个Map Task ,numReduceTasks个Reduce Task,2个CleanUp任务,2个SetUp任务,(Map Reduce,每个各占1个),好,可以大致勾画一下,1个JobInProgress的执行流程了。

ok,initTask的任务已经完成,也就是说前面初始化的准备工作都已经完成了,后面就等着JobTacker分配作业给TaskTracker了。在这里MapReduce用的是HeartBeat的形式,就是心跳机制,心跳包在这里主要有3个作用:

1.判断TaskTracker是否活着

2.获取各个TaskTracker上的资源使用情况和任务的进度

3.给TaskTracker分配任务

而这里用到的就是第三作用。HeartBeat的调用形式同样是Hadoop自带的RPC实现方式。JobTracker不会直接分配作业给TaskTracker,中间会经过一个叫TaskScheduler掉调度器,这个可以用户自定义实现,满足不同的需求设计,在Hadoop中有默认的实现,所以你会看到大致这样的一个模型流程:

所以接下来JobTracker首先会收到很多来自TaskTracker的心跳包,判断此TaskTracker是否是无任务状态的,无任务的话,马上让TaskSchedulera分配任务给他:

public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
                                                  boolean restarted,
                                                  boolean initialContact,
                                                  boolean acceptNewTasks,
                                                  short responseId)
    throws IOException {
   ....

    //通过心跳机制发送命令回应
    // Initialize the response to be sent for the heartbeat
    HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
    List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
    boolean isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());
    // Check for new tasks to be executed on the tasktracker
    if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
      TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName);
      if (taskTrackerStatus == null) {
        LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
      } else {
        List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
        //说明此TaskTtracker上无任务了
        if (tasks == null ) {
          //为此TaskTracker分配任务
          tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));
        }

接下来就是TaskScheduler的方法了,不过得找出他的实现类,TaskScheduler只是一个基类:

  public synchronized List<Task> assignTasks(TaskTracker taskTracker)
      throws IOException {
    TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
    ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
    final int numTaskTrackers = clusterStatus.getTaskTrackers();
    final int clusterMapCapacity = clusterStatus.getMaxMapTasks();
    final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();

    //获取作业队列
    Collection<JobInProgress> jobQueue =
      jobQueueJobInProgressListener.getJobQueue();
     .....
        for (JobInProgress job : jobQueue) {
          if (job.getStatus().getRunState() != JobStatus.RUNNING ||
              job.numReduceTasks == 0) {
            continue;
          }

          //在这里分配了一个新的Reduce任务
          Task t =
            job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers,
                                    taskTrackerManager.getNumberOfUniqueHosts()
                                    );
          .....

首先获取一个作业列表,在里面挑出一个作业给,在比如从里面挑出1个Reduce的任务区给整个TaskTracker执行,因为我们刚刚已经知道,所有的Task都是以TaskInProgress形式被包含于JobInProgress中的,所以又来到了JobInProgress中了

/**
   * Return a ReduceTask, if appropriate, to run on the given tasktracker.
   * We don't have cache-sensitivity for reduce tasks, as they
   *  work on temporary MapRed files.
   */
  public synchronized Task obtainNewReduceTask(TaskTrackerStatus tts,
                                               int clusterSize,
                                               int numUniqueHosts
                                              ) throws IOException {
    .....

    int  target = findNewReduceTask(tts, clusterSize, numUniqueHosts,
                                    status.reduceProgress());
    if (target == -1) {
      return null;
    }

    //这里继续调用方法,获取目标任务
    Task result = reduces[target].getTaskToRun(tts.getTrackerName());
    if (result != null) {
      addRunningTaskToTIP(reduces[target], result.getTaskID(), tts, true);
    }

    return result;
  }

此时就执行了一个TIP就是TaskInProgress里面去执行了,此时的转变就是JIP->TIP的转变。继续往里看,这时候来到的是TaskInProgress的类里面了:

public Task getTaskToRun(String taskTracker) throws IOException {
    if (0 == execStartTime){
      // assume task starts running now
      execStartTime = jobtracker.getClock().getTime();
    }

    // Create the 'taskid'; do not count the 'killed' tasks against the job!
    TaskAttemptID taskid = null;
    if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts + numKilledTasks)) {
      // Make sure that the attempts are unqiue across restarts
      int attemptId = job.getNumRestarts() * NUM_ATTEMPTS_PER_RESTART + nextTaskId;
      //启动一次TA尝试
      taskid = new TaskAttemptID( id, attemptId);
      ++nextTaskId;
    } else {
      LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts) +
              " (plus " + numKilledTasks + " killed)"  +
              " attempts for the tip '" + getTIPId() + "'");
      return null;
    }

    //加入到相应的数据结构中
    return addRunningTask(taskid, taskTracker);
  }

在这里明显的执行了所谓的TA尝试,就是说这是一次Task的尝试执行,因为不能保证这次任务就一定能执行成功。把这次尝试的任务ID加入系统变量中,就来到了addRunningTask,也就是说来到了方法执行的最末尾:

/**
   * Adds a previously running task to this tip. This is used in case of
   * jobtracker restarts.
   * 添加任务
   */
  public Task addRunningTask(TaskAttemptID taskid,
                             String taskTracker,
                             boolean taskCleanup) {
    .....
    //添加任务和taskTracker的映射关系
    activeTasks.put(taskid, taskTracker);
    tasks.add(taskid);

    // Ask JobTracker to note that the task exists
    //在JobTracker中增加一对任务记录
    jobtracker.createTaskEntry(taskid, taskTracker, this);

    // check and set the first attempt
    if (firstTaskId == null) {
      firstTaskId = taskid;
    }
    return t;
  }

在这里,就增加了任务和TaskTracker的一些任务运行信息的变量关系。后面就等着TaskTracker自己去把任务挑出来,执行就OK了,上面这个步骤从TIP->TA的转变。我们把这种结构流程叫做“三层多叉树”的方式结构。

整个作业的调度的时序关系图如下:

时间: 2024-08-29 06:07:31

JobTracker作业调度分析的相关文章

大数据基础Hadoop 2.x入门

hadoop概述 存储和分析网络数据 三大组件 MapReduce 对海量数据的处理 思想: 分而治之 每个数据集进行逻辑业务处理map 合并统计数据结果reduce HDFS 储存海量数据 分布式存储 安全性高 副本数据 YARN 分布式资源管理框架 管理整个集群的资源(内存.CPU核数) 分配调度集群资源 Common 工具 hadoop生态圈 Hive(蜜蜂)通过使用sql语句来执行hadoop任务 HBase 存储结构化数据的分布式数据库 HBase放弃了事务特性,追求更高的扩展 和HD

MapReduce计算框架

MapReduce计算框架 一.MapReduce实现原理 图展示了MapReduce实现中的全部流程,处理步骤如下: 1.用户程序中的MapReduce函数库首先把输入文件分成M块(每块大小默认64M),在集群上执行处理程序,见序号1 2.主控程序master分配Map任务和Reduce任务给工作执行机器worker.见序号2 3.一个分配了Map任务的worker读取并处理输入数据块.从数据片段中解析出key/value键值对,然后把其传递给Map函数,由Map函数生成并输出中间key/va

hadoop运行原理之Job运行(四) JobTracker端心跳机制分析

接着上篇来说,TaskTracker端的transmitHeartBeat()方法通过RPC调用JobTracker端的heartbeat()方法来接收心跳并返回心跳应答.还是先看看这张图,对它的大概流程有个了解. 下面来一段一段的分析该方法. 1 public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, 2 boolean restarted, 3 boolean initialContact, 4 bo

JobTracker启动流程源码级分析

org.apache.hadoop.mapred.JobTracker类是个独立的进程,有自己的main函数.JobTracker是在网络环境中提交及运行MR任务的核心位置. main方法主要代码有两句: 1 //创建jobTracker对象 2 JobTracker tracker = startTracker(new JobConf()); 3 //启动各个服务,包括JT内部一些重要的服务或者线程 4 tracker.offerService(); 一.startTracker(new Jo

MapReduce job在JobTracker初始化源码级分析

mapreduce job提交流程源码级分析(三)中已经说明用户最终调用JobTracker.submitJob方法来向JobTracker提交作业.而这个方法的核心提交方法是JobTracker.addJob(JobID jobId, JobInProgress job)方法,这个addJob方法会把Job提交到调度器(默认是JobQueueTaskScheduler)的监听器JobQueueJobInProgressListener和EagerTaskInitializationListen

监听器初始化Job、JobTracker相应TaskTracker心跳、调度器分配task源码级分析

JobTracker和TaskTracker分别启动之后(JobTracker启动流程源码级分析,TaskTracker启动过程源码级分析),taskTracker会通过心跳与JobTracker通信,并获取分配它的任务.用户将作业提交到JobTracker之后,放入相应的数据结构中,静等被分配.mapreduce job提交流程源码级分析(三)这篇文章已经分析了用户提交作业的最后步骤,主要是构造作业对应的JobInProgress并加入jobs,告知所有的JobInProgressListen

JobTracker作业启动过程分析

在Hadoop中,启动作业运行的方式有很多,可以用命令行格式把打包好后的作业提交还可以,用Hadoop的插件进行应用开发,在这么多的方式中,都会必经过一个流程,作业会以JobInProgress的形式提交到JobTracker中.什么叫JobTracker呢,也许有些人了解Hadoop只知道他的MapReduce计算模型,那个过程只是其中的Task执行的一个具体过程,比较微观上的流程,而JobTrack是一个比较宏观上的东西.涉及到作业的提交的过程.Hadoop遵循的是Master/Slave的

实验二 作业调度模拟程序

实验二 作业调度模拟程序 一.        实验目的 (1)加深对作业调度算法的理解: (2)进行程序设计的训练. 二.        实验内容和要求 用高级语言编写一个或多个作业调度的模拟程序. 单道批处理系统的作业调度程序.作业一投入运行,它就占有计算机的一切资源直到作业完成为止,因此调度作业时不必考虑它所需要的资源是否得到满足,它所运行的时间等因素.      作业调度算法: 1)      采用先来先服务(FCFS)调度算法,即按作业到达的先后次序进行调度.总是首先调度在系统中等待时间

MapReduce源码分析总结

转自:http://www.cnblogs.com/forfuture1978/archive/2010/11/19/1882279.html 转者注:本来想在Hadoop学习总结系列详细解析HDFS以及Map-Reduce的,然而查找资料的时候,发现了这篇文章,并且发现caibinbupt已经对Hadoop的源代码已经进行了详细的分析,推荐大家阅读. 转自http://blog.csdn.net/HEYUTAO007/archive/2010/07/10/5725379.aspx 参考: 1