Job和Task运行时信息的维护

JobTracker最重要的功能之一是状态监控,包括TaskTracker、Job和Task等运行时状态的监控,其中TaskTracker状态监控比较简单,只要记录其最近心跳汇报时间和健康状况(由TaskTracker端的监控脚本检测,并通过心跳将结果发送给JobTracker)即可。

作业描述模型

如下图所示

JobTracker在其内部以“三层多叉树”的方式描述和跟踪每个作业的运行状态。JobTracker为每个作业创建一个JobInProgress对象以跟踪和监控其运行状态。该对象存在于作业的整个运行过程中:它在作业提交时创建,作业运行完成时销毁。同时,为了采用分而治之的策略解决问题,JobTracker会将每个作业拆分成若干个任务,并为每个任务创建一个TaskInProgress对象以跟踪和监控其运行状态,而任务在运行过程中,可能会因为软件Bug、硬件故障等原因运行失败,此时JobTracker会按照一定的策略重新运行该任务,也就是说,每个任务可能会尝试运行多次,直到运行成功或者因超过尝试次数而失败。JobTracker将每运行一次任务称为一次“任务运行尝试”,即Task Attempt。对于某个任务,只要有一个Task Attempt运行成功,则相应的TaskInProgress对象会标注该任务运行成功,而当所有的TaskInProgress均标注其对应的任务运行成功后,JobInProgress对象会标识整个作业运行成功。

为了区分各个作业,如图所示

JobTracker会赋予每个作业一个唯一的ID。该ID由三部分组成:作业前缀字符串、JobTracker启动时间和作业提交顺序,各部分通过“_”连接起来组成一个完整的作业ID,比如 job_201208071706_0009,对应的三部分分别是“job”、“201208071706”和“009”(JobTracker运行以来第9个作业)。每个任务的ID继承了作业的ID,并在此基础上进行了扩展,它由三部分组成:作业ID(其中前缀字符串变为“task”)、任务类型(map还是reduce)和任务编号(从000000开始,一直到999999)。比如,task_201208071706_0009_m_000000,表示它的作业ID为task_201208071706_0009,任务类型为map,任务编号为000000。每个Task Attempt的ID继承了任务的ID,它由两部分组成:任务ID(其中前缀字符串变为“attempt”)和运行尝试次数(从0开始),比如,attempt_201208071706_0009_m_000000_0表示任务task_201208071706_0009_m_000000的第0次尝试。

JobInProgress

JobInProgress类主要用于监控和跟踪作业运行状态,并为调度器提供最底层的调度接口。

JobInProgress维护了两种作业信息:一种是静态信息,这些信息是作业提交之时就已经确定好的;另一种是动态信息,这些信息随着作业的运行而动态变化。

(1)作业静态信息

作业静态信息是指作业提交之时就已经确定好的属性信息,主要包括以下几项:

org.apache.hadoop.mapred.JobInProgress.java

//map task, reduce task , cleanup task和setup task对应的TaskInProgress
 TaskInProgress maps[] = new TaskInProgress[0];
  TaskInProgress reduces[] = new TaskInProgress[0];
  TaskInProgress cleanup[] = new TaskInProgress[0];
  TaskInProgress setup[] = new TaskInProgress[0];
  int numMapTasks = 0;//Map Task个数
  int numReduceTasks = 0;//Reduce Task个数
  final long memoryPerMap;//每个Map Task需要的内存量
  final long memoryPerReduce;//每个Reduce Task需要的内存量
  volatile int numSlotsPerMap = 1;//每个Map Task需要的slot个数
  volatile int numSlotsPerReduce = 1;//每个Reduce Task需要的slot个数
  /*允许每个TaskTracker上失败的Task个数,默认是4,通过参数mapred.max.tracker.failures设置。当该作业在某个TaskTracker上失败的个数超过该值时,会将该节点添加到该作业的黑名单中,调度器不再为该节点分配该作业的任务*/
  final int maxTaskFailuresPerTracker;

  private static float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;//当有5%的Map Task完成后,才可以调度Reduce Task
 int completedMapsForReduceSlowstart = 0;//多少Map Task完成后开始调度Reduce Task

 final int mapFailuresPercent;//允许的Map Task失败比例上限,通过参数mapred.max.map.failures.percent设置
 final int reduceFailuresPercent;//允许的Reduce Task失败比例上限,通过参数mapred.max.reduce.failures.percent设置

 JobPriority priority = JobPriority.NORMAL;//作业优先级

(2)作业动态信息

作业动态信息是指作业运行过程中会动态更新的信息。这些信息对于发现TaskTracker/Job/Task故障非常有用,也可以为调度器进行任务调度提供决策依据。

 int runningMapTasks = 0;//正在运行的Map Task数目
  int runningReduceTasks = 0;//正在运行的Reduce Task数目
  int finishedMapTasks = 0;//运行完成的Map Task数目
  int finishedReduceTasks = 0;//运行完成的Reduce Task数目
  int failedMapTasks = 0; //失败的Map Task Attempt数目
  int failedReduceTasks = 0;//失败的Reduce Task Attempt数目

  int speculativeMapTasks = 0;//正在运行的备份任务(MAP)数目
  int speculativeReduceTasks = 0;//正在运行的备份任务(REDUCE)数目

    int failedMapTIPs = 0;//失败的TaskInProgress(MAP)数目,这意味着对应的输入数据将被丢弃,不会产生最终结果
  int failedReduceTIPs = 0;//失败的TaskInProgress(REDUCE)数目
    private volatile boolean launchedCleanup = false;//是否已启动Cleanup Task
  private volatile boolean launchedSetup = false;//是否已启动Setup Task
  private volatile boolean jobKilled = false;//作业是否已被杀死
  private volatile boolean jobFailed = false;//作业是否已失败

 // NetworkTopology Node to the set of TIPs
  Map<Node, List<TaskInProgress>> nonRunningMapCache;//节点与TaskInProgress的映射关系,即TaskInProgress输入数据位置与节点对应关系

  // Map of NetworkTopology Node to set of running TIPs
  Map<Node, Set<TaskInProgress>> runningMapCache;//节点及其上面正在运行的Task映射关系

  // A list of non-local, non-running maps
  final List<TaskInProgress> nonLocalMaps;//不需要考虑数据本地性的Map Task,如果一个Map Task的InputSplit Location为空,则进行任务调度时不需要考虑本地性

  // Set of failed, non-running maps sorted by #failures
  final SortedSet<TaskInProgress> failedMaps;//按照失败次数进行排序的TIP集合

  // A set of non-local running maps
  Set<TaskInProgress> nonLocalRunningMaps;//未运行的Map Task集合

  // A list of non-running reduce TIPs
  Set<TaskInProgress> nonRunningReduces;//未运行的Reduce Task集合

  // A set of running reduce TIPs
  Set<TaskInProgress> runningReduces;//正在运行的Reduce Task集合

  // A list of cleanup tasks for the map task attempts, to be launched
  List<TaskAttemptID> mapCleanupTasks = new LinkedList<TaskAttemptID>();//待清理的Map Task列表,比如用户直接通过命令“bin/hadoop job -kill”杀死的Task

  // A list of cleanup tasks for the reduce task attempts, to be launched
  List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();

long startTime;//作业提交时间
  long launchTime;//作业开始执行时间
  long finishTime;//作业完成时间

TaskInProgress

TaskInProgress类维护了一个Task运行过程中的全部信息。在Hadoop中,由于一个任务可能会推测执行或者重新执行,所以会存在多个Task Attempt,且同一时刻,可能有多个处理相同的任务尝试同时在执行,而这些任务被同一个TaskInProgress对象管理和跟踪,只要任何一个任务尝试运行成功,TaskInProgress就会标注该任务执行成功。

 private final TaskSplitMetaInfo splitInfo;//Task要处理的Split信息
  private int numMaps;//Map Task数目,只对Reduce Task有用
  private int partition;//该Task在task列表中的索引
  private JobTracker jobtracker;//JobTracker对象,用于获取全局时钟
  private TaskID id;//task ID,其后面加下标构成Task Attempt ID
  private JobInProgress job;//该TaskInProgress所在的JobInProgress
  private final int numSlotsRequired;//运行该Task需要的slot数目

  // Status of the TIP
  private int successEventNumber = -1;
  private int numTaskFailures = 0;//Task Attempt失败次数
  private int numKilledTasks = 0;//Task Attempt被杀死次数
  private double progress = 0;//任务运行进度
  private String state = "";//运行状态
  private long startTime = 0;//TaskInProgress对象创建时间
  private long execStartTime = 0;//第一个Task Attempt开始运行时间
  private long execFinishTime = 0;//最后一个运行成功的Task Attempt完成时间
  private int completes = 0;//Task Attempt运行完成数目,实际只有两个值:0和1
  private boolean failed = false;//该TaskInProgress是否运行失败
  private boolean killed = false;//该TaskInProgress是否被杀死
  private boolean jobCleanup = false; //该TaskInProgress是否为Cleanup Task
  private boolean jobSetup = false;//该TaskInProgress是否为Setup Task

  // The ‘next‘ usable taskid of this tip
  int nextTaskId = 0;//该TaskInProgress的下一个可用Task Attempt ID

  // The taskid that took this TIP to SUCCESS
  private TaskAttemptID successfulTaskId;//使得该TaskInProgress运行成功的那个Task ID

  // The first taskid of this tip
  private TaskAttemptID firstTaskId;//第一个运行的Task Attemp的ID

  // Map from task Id -> TaskTracker Id, contains tasks that are
  // currently runnings
  private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>();//正在运行的Task ID与TaskTracker ID之间的映射关系
  // All attempt Ids of this TIP
  private TreeSet<TaskAttemptID> tasks = new TreeSet<TaskAttemptID>();//该TaskInProgress已运行的所有TaskAttempt ID,包括已经运行完成的和正在运行的
  /**
   * Map from taskId -> TaskStatus
   */
  private TreeMap<TaskAttemptID,TaskStatus> taskStatuses =
    new TreeMap<TaskAttemptID,TaskStatus>();//Task ID与TaskStatus映射关系

  // Map from taskId -> TaskTracker Id,
  // contains cleanup attempts and where they ran, if any
  private TreeMap<TaskAttemptID, String> cleanupTasks =
    new TreeMap<TaskAttemptID, String>();//Cleanup Task ID与TaskTracker ID映射关系

  private TreeSet<String> machinesWhereFailed = new TreeSet<String>();//所有已经运行失败的Task所在的节点列表
  private TreeSet<TaskAttemptID> tasksReportedClosed = new TreeSet<TaskAttemptID>();//某个Task Attempt运行成功后,其他所有正在运行的Task Attempt保存在该集合中

  //list of tasks to kill, <taskid> -> <shouldFail>
  private TreeMap<TaskAttemptID, Boolean> tasksToKill = new TreeMap<TaskAttemptID, Boolean>();//待杀死的Task列表

  //task to commit, <taskattemptid>
  private TaskAttemptID taskToCommit;//等待被提交的Task Attempt,该Task Attempt最终使得TaskInProgress运行成功
时间: 2024-07-29 23:43:53

Job和Task运行时信息的维护的相关文章

第三十六课 Spark之TaskScheduler Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详

</pre></h2><div><p>本节课内容:</p><p>1.     TaskSchedulerBackend与SchedulerBackend</p><p>2.     FIFO与FAIR两种调度模式</p><p>3.     Task数据本地性资源的分配</p></div><h3>一.Scheduler运行过程(Spark-shell角度)

python日志功能实现-自动获取程序运行时信息

通过python的inspect模块,我们可以获取程序的运行时栈.一个python的运行时栈是一个六元组:(frame对象, 文件名, 当前行号, 函数名, 保存相关源代码行的列表, 当前行在源代码列表中的位置). 栈中第一个元素代表当前执行的位置信息,最后一个表示最外层的执行信息. 如: 1 import inspect 2 3 class Foo: 4 def __init__(self): 5 pass 6 def say(self): 7 print inspect.stack()[1]

TaskScheduler内幕天机:Spark shell案例,TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详解

TaskSchedulerBackend与SchedulerBackend FIFO与FAIR两种调度模式 Task数据本地性资源的分配 一.TaskScheduler运行过程(Spark-shell角度) 1.启动Spark-shell 当我们spark-shell本身的时候命令终端返回来的主要是ClientEndpoint和SparkDeploySchedulerBakcend.这是因为此时还没有任何应用程序Job的触发,这是启动Application本身而已,所以主要就是实例化SparkC

在Amazon FreeRTOS V10中使用运行时统计信息

在MCU on Eclipse网站上看到Erich Styger在8月2日发的博文,一篇关于在Amazon FreeRTOS V10中使用运行时统计信息的文章,本人觉得很有启发,特将其翻译过来以备参考.原文网址:https://mcuoneclipse.com/2018/08/02/tutorial-using-runtime-statistics-with-amazon-freertos-v10/ FreeRTOS包含一个很好的功能,可以向我提供有关每个任务在系统上运行的时间的信息: Free

Android Studio使用时源码到处报红色警告,运行时又没错

转载地址:http://www.07net01.com/program/2016/04/1452749.html [摘要:正在AS上开辟时,碰到那个题目,翻开全部的Java源文件,右边一起标赤色,找没有到类,到没有到方式,由于不克不及面击跳转,开辟时纠结了很久,试了clean.rebuild等种种方式皆没有起感化,又] 在AS上开发时,遇到这个问题,打开所有的java源文件,右侧一路标红色,找不到类,到不到方法,因为不能点击跳转,开发时纠结了好久,试了clean.rebuild等各种方法都不起作

ART运行时为新创建对象分配内存的过程分析

ART运行时和Dalvik虚拟机一样,在堆上为对象分配内存时都要解决内存碎片和内存不足问题.内存碎片问题可以使用dlmalloc技术解决.内存不足问题则通过垃圾回收和在允许范围内增长堆大小解决.由于垃圾回收会影响程序,因此ART运行时采用力度从小到大的进垃圾回收策略.一旦力度小的垃圾回收执行过后能满足分配要求,那就不需要进行力度大的垃圾回收了.本文就详细分析ART运行时在堆上为对象分配内存的过程. 本博参加博客之星评选,求投票:点击投票 老罗的新浪微博:http://weibo.com/shen

Java 笔记(四) RTTI - 运行时类型检查

运行时类型检查,即Run-time Type Identification.这是Java语言里一个很强大的机制,那么它到底给我们的程序带来了什么样的好处呢? 在了解运行时类型检查之前,我们要首先知道另一个密切相关的概念,即运行时类型信息(Run-time Information - 也可以缩写为RTTI) 运行时类型信息使得你可以在程序运行时发现和使用类型信息. 来自:<Thinking in Java>. OK,那么我们总结来说,RTTI就是能够让我们在程序的运行时去获取类型的信息.接下来我

java 反射机制:运行时的类信息(为框架服务的Bug存在)

反射机制:JAVA反射机制是在运行状态中,对于任意一个类,都能够知道这个类的所有属性和方法:对于任意一个对象,都能够调用它的任意一个方法:这种动态获取的信息以及动态调用对象的方法的功能称为java语言的反射机制. 换一种引出反射的说法是:当通过反射与一个未知的类型的对象打交道是,JVM只是简单地检查这个类,看它是属于哪个特定的类(就想RTTI那样).在用它做其他事情之前必须先加载那个类的Class对象.因此,那个类的.class文件对于JVM来说必须是可获取的:那么在本地机器上,要么通过网络获得

运行时类型信息RTTI

我们在写C++代码的时候经常碰到使用dynamic_cast进行类型转换的情况,也都知道经过dynamic_cast的转换更加安全,因为dynamic_cast进行了类型检查. 但是可能很多人不知道dynamic_cast是C++ 运行时类型信息(RTTI)机制链条上的一个节点. RTTI提供了两个操作符和一个类: dynamic_cast typeid type_info 整个RTTI, 作为一个整体,暴露给程序员的就是这三个元素.因此我们关注的焦点也就在它们身上了. 什么是RTTI 在C++