[Hadoop] - TaskTracker源码分析(状态发送)

TaskTracker节点向JobTracker汇报当前节点的运行时信息时候,是将运行状态信息同心跳报告一起发送给JobTracker的,主要包括TaskTracker的基本信息、节点资源使用信息、各任务状态等。所以信息被序列化为TaskTrackerStatus实例对象。每次发送心跳报告的时候,会重新构造一个Status对象,并重置这些信息,而且需要主要的是每次发送的status对象的大小是不一定的,因为很多信息的发送是有时间间隔的。这些操作主要位于方法transmitHeartBeat的上半部分代码:

  1 HeartbeatResponse transmitHeartBeat(long now) throws IOException {
  2  // 计算是否发送任务计数器信息,间隔时间为${COUNTER_UPDATE_INTERVAL}对应的值为60s,不支持配置
  3     boolean sendCounters;
  4     if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
  5       sendCounters = true;
  6       previousUpdate = now;
  7     }
  8     else {
  9       sendCounters = false;
 10     }
 11
 12     //
 13     // Check if the last heartbeat got through...
 14     // if so then build the heartbeat information for the JobTracker;
 15     // else resend the previous status information.
 16     //
 17     if (status == null) {
 18       synchronized (this) {
 19         status = new TaskTrackerStatus(taskTrackerName, localHostname,
 20                                        httpPort,
 21                                        cloneAndResetRunningTaskStatuses(
 22                                          sendCounters),
 23                                        failures,
 24                                        maxMapSlots,
 25                                        maxReduceSlots);
 26       }
 27     } else {
 28       LOG.info("Resending ‘status‘ to ‘" + jobTrackAddr.getHostName() +
 29                "‘ with reponseId ‘" + heartbeatResponseId);
 30     }
 31
 32     //
 33     // Check if we should ask for a new Task
 34     // 计算节点资源使用信息
 35     boolean askForNewTask;
 36     long localMinSpaceStart;
 37     synchronized (this) {
 38       askForNewTask =
 39         ((status.countOccupiedMapSlots() < maxMapSlots ||
 40           status.countOccupiedReduceSlots() < maxReduceSlots) &&
 41          acceptNewTasks);
 42       localMinSpaceStart = minSpaceStart;
 43     }
 44     if (askForNewTask) {
 45       askForNewTask = enoughFreeSpace(localMinSpaceStart);
 46       long freeDiskSpace = getFreeSpace();
 47       long totVmem = getTotalVirtualMemoryOnTT();
 48       long totPmem = getTotalPhysicalMemoryOnTT();
 49       long availableVmem = getAvailableVirtualMemoryOnTT();
 50       long availablePmem = getAvailablePhysicalMemoryOnTT();
 51       long cumuCpuTime = getCumulativeCpuTimeOnTT();
 52       long cpuFreq = getCpuFrequencyOnTT();
 53       int numCpu = getNumProcessorsOnTT();
 54       float cpuUsage = getCpuUsageOnTT();
 55
 56       status.getResourceStatus().setAvailableSpace(freeDiskSpace);
 57       status.getResourceStatus().setTotalVirtualMemory(totVmem);
 58       status.getResourceStatus().setTotalPhysicalMemory(totPmem);
 59       status.getResourceStatus().setMapSlotMemorySizeOnTT(
 60           mapSlotMemorySizeOnTT);
 61       status.getResourceStatus().setReduceSlotMemorySizeOnTT(
 62           reduceSlotSizeMemoryOnTT);
 63       status.getResourceStatus().setAvailableVirtualMemory(availableVmem);
 64       status.getResourceStatus().setAvailablePhysicalMemory(availablePmem);
 65       status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime);
 66       status.getResourceStatus().setCpuFrequency(cpuFreq);
 67       status.getResourceStatus().setNumProcessors(numCpu);
 68       status.getResourceStatus().setCpuUsage(cpuUsage);
 69     }
 70     //add node health information 添加节点健康状态
 71     TaskTrackerHealthStatus healthStatus = status.getHealthStatus();
 72     synchronized (this) {
 73       if (healthChecker != null) {
 74         healthChecker.setHealthStatus(healthStatus);
 75       } else {
 76         healthStatus.setNodeHealthy(true);
 77         healthStatus.setLastReported(0L);
 78         healthStatus.setHealthReport("");
 79       }
 80     }
 81
 82 ......
 83 ...//发送心跳报告
 84 .....
 85     synchronized (this) {
 86       for (TaskStatus taskStatus : status.getTaskReports()) {
 87         if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
 88             taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
 89             taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
 90             !taskStatus.inTaskCleanupPhase()) {
 91           if (taskStatus.getIsMap()) {
 92             mapTotal--;
 93           } else {
 94             reduceTotal--;
 95           }
 96           myInstrumentation.completeTask(taskStatus.getTaskID());
 97           runningTasks.remove(taskStatus.getTaskID());
 98         }
 99       }
100
101 .....
102 // 其他代码
103 }

transmitHeartBeat

  1、创建TaskTrackerStatus实例对象status,创建代码如下:

status = new TaskTrackerStatus(taskTrackerName, localHostname, httpPort, cloneAndResetRunningTaskStatuses(sendCounters), failures, maxMapSlots,maxReduceSlots); 

  创建status对象的时候参数分别是: taskTrackerName-->当前节点名称,value为{"tracker_" + localHostname + ":" + taskReportAddress},其中taskReportAddress是为task服务的监听地址。

        localHostname-->当前节点的指定的host名称,配置参数变量为slave.host.name,如果不指定该参数,那么从mapred.tasktracker.dns.interface和mapred.tasktracker.dns.nameserver指定的dns中获取,默认为本地hostname。

        httpPort-->Http监听的端口号

        cloneAndResetRunningTaskStatuses(sendCounters)-->根据是否进行任务计数器信息发送标志,clone真正运行的task状态信息

        failures-->当前节点上失败的任务次数,用于判断当前节点的完整性,当该值达到最大标准的时候,JobTracker不会再给该节点分配任务信息。

        maxMapSlots, maxReduceSlots-->该节点运行的最大slot个数。

  2、判断是否允许分配任务给该节点,这个是先通过判断当前节点的空闲slot个数,然后通过判断当前节点的磁盘剩余量来达到的。代码如下:

askForNewTask = ((status.countOccupiedMapSlots() < maxMapSlots || status.countOccupiedReduceSlots() < maxReduceSlots) && acceptNewTasks);askForNewTask = enoughFreeSpace(localMinSpaceStart); // 其中localMinSpaceStart为配置中给定的${mapred.local.dir.minspacestart},默认为0

  当满足第一个条件:使用的slot个数小于总slot个数的时候,那么给JobTracker发送节点资源使用情况。当满足第二个条件的时候,允许JobTracker给当前节点分配任务。

  3、初始化资源使用情况,主要是设置一系列的磁盘、内存等资源信息等,代码如下:

      long freeDiskSpace = getFreeSpace(); // 获取剩余的磁盘大小
      long totVmem = getTotalVirtualMemoryOnTT(); // 获取总的虚拟内存
      long totPmem = getTotalPhysicalMemoryOnTT(); // 获取总的物理内存
      long availableVmem = getAvailableVirtualMemoryOnTT(); // 获取可用的虚拟内存
      long availablePmem = getAvailablePhysicalMemoryOnTT(); // 获取可用的物理内存
      long cumuCpuTime = getCumulativeCpuTimeOnTT(); // 获取累积cpu时间
      long cpuFreq = getCpuFrequencyOnTT(); // 获取cpu频率
      int numCpu = getNumProcessorsOnTT(); // 获取总的进程数
      float cpuUsage = getCpuUsageOnTT(); // 获取cpu可用比例

      status.getResourceStatus().setAvailableSpace(freeDiskSpace);
      status.getResourceStatus().setTotalVirtualMemory(totVmem);
      status.getResourceStatus().setTotalPhysicalMemory(totPmem);
      status.getResourceStatus().setMapSlotMemorySizeOnTT(mapSlotMemorySizeOnTT); // 设置map阶段slot允许的内存大小, ${mapred.cluster.map.memory.mb}
      status.getResourceStatus().setReduceSlotMemorySizeOnTT(reduceSlotSizeMemoryOnTT); // 设置reduce阶段slot允许的内存大小, ${mapred.cluster.reduce.memory.mb}
      status.getResourceStatus().setAvailableVirtualMemory(availableVmem);
      status.getResourceStatus().setAvailablePhysicalMemory(availablePmem);
      status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime);
      status.getResourceStatus().setCpuFrequency(cpuFreq);
      status.getResourceStatus().setNumProcessors(numCpu);
      status.getResourceStatus().setCpuUsage(cpuUsage);

  4、获取当前节点的监控状态,获取当前节点的监控状态是有线程NodeHealthCheckerService来周期性的检查的,可以通过配置一个监控脚本来实现,默认为不实现。详细分析见TaskTracker源码分析(TaskTracker节点健康状况监控)

  5、发送心跳报告

  6、处理当前真正运行的Task,处理规则是:只要task不是出于运行、就绪、提交挂起或者cleanup阶段,那么就将该task设置为完成状态,从真正运行的task列表中移除,并针对该task是map阶段或者reduce阶段,分别对map/reduce solt进行操作。

  7、完成发送。

  发送的状态对象是org.apache.hadoop.mapred.TaskTrackerStatus,主要属性有:

  String trackerName; // task tracker 节点名称
  String host; // 主机名
  int httpPort; // http web监听端口
  int failures; // 在该节点上失败的task次数
  List<TaskStatus> taskReports; // 当前节点上真正运行的各个人物的状态

  volatile long lastSeen; // 上次汇报时间
  private int maxMapTasks; // 当前节点上允许的最大map slot个数
  private int maxReduceTasks; // 当前节点上允许的最大reduce slot个数
  private TaskTrackerHealthStatus healthStatus; // 当前节点的健康状态对象

  public static final int UNAVAILABLE = -1; // 是否不可用
  private ResourceStatus resStatus; // 当前节点的资源对象

  其中ResourceStatus和TaskTrackerHealthStatus分别表示当前节点的资源信息和状态信息,是一个简单的model类。在这里不做分析。

  TaskStatus类全称为org.apache.hadoop.mapred.TaskStatus。主要保存当前TaskTracker上运行的所有任务的运行状态,基本属性如下:

  private final TaskAttemptID taskid; // task任务id
  private float progress; // 任务执行进度,0-1.0
  private volatile State runState; // 任务运行所处状态,详见TaskStatus.State枚举类
  private String diagnosticInfo; // 诊断信息,一般为异常信息或者错误信息
  private String stateString; // 字符串信息的运行状态
  private String taskTracker; // 所属task tracker名称
  private int numSlots; // 运行该task所需的slot个数,默认为1

  private long startTime; // 任务启动时间
  private long finishTime;  // 任务完成时间
  private long outputSize = -1L; // 输出数据量

  private volatile Phase phase = Phase.STARTING; // 任务运行阶段,详见TaskStatus.Phase枚举类
  private Counters counters; // 该任务中定义的计数器(包括系统自带计数器和用户自定义计数器)
  private boolean includeCounters; // 是否包含计数器,计数器没个60s发送一次,也就是说每隔60s,发送的数据中包含一次计数器
  private SortedRanges.Range nextRecordRange = new SortedRanges.Range(); // 下一个要处理的数据区间,用于定位坏记录所在的空间

  ===================================

  ResourceStatus实例对象resStatus的属性是由抽象类ResourceCalculatorPlugin来获取的,如果不指定该抽象类的具体实现类,那么获取的value值全部都是-1。在linux平台上,默认实现为LinuxResourceCalculatorPlugin类。

    // 创建获取资源的对象
    Class<? extends ResourceCalculatorPlugin> clazz = fConf.getClass(TT_RESOURCE_CALCULATOR_PLUGIN,
            null, ResourceCalculatorPlugin.class);
    resourceCalculatorPlugin = ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, fConf);

  另外,用户可以自定义该实现类,配置参数为${mapreduce.tasktracker.resourcecalculatorplugin},默认为空。获取代码如下:

public static ResourceCalculatorPlugin getResourceCalculatorPlugin(
      Class<? extends ResourceCalculatorPlugin> clazz, Configuration conf) {

    if (clazz != null) {
      return ReflectionUtils.newInstance(clazz, conf); // 如果已经配置了class,那么直接使用配置的class
    }

    // No class given, try a os specific class
    try {
      String osName = System.getProperty("os.name"); // 获取操作系统
      if (osName.startsWith("Linux")) { // 如果是linux
        return new LinuxResourceCalculatorPlugin(); // 使用已经实现的一种
      }
    } catch (SecurityException se) {
      // Failed to get Operating System name.
      return null;
    }

    // Not supported on this system.
    return null;
  }

  在LinuxResourceCalculatorPlugin中,其实获取系统的资源信息都是通过读取proc虚拟文件系统中的一些信息来达成的,比如从/proc/meminfo中读取内存,从/proc/cpuinfo中读取cpu信息等。

时间: 2024-08-06 05:46:50

[Hadoop] - TaskTracker源码分析(状态发送)的相关文章

[Hadoop] - TaskTracker源码分析(TaskTracker节点健康状况监控)

在TaskTracker中对象healthStatus保存了当前节点的健康状况,对应的类是org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus.定义如下: static class TaskTrackerHealthStatus implements Writable { private boolean isNodeHealthy; // 节点是否健康 private String healthReport; //

Hadoop HDFS源码分析 关于数据块的类

Hadoop HDFS源码分析 关于数据块的类 1.BlocksMap 官方代码中的注释为: /** * This class maintains the map from a block to its metadata. * block's metadata currently includes blockCollection it belongs to and * the datanodes that store the block. */ BlocksMap数据块映射,管理名字节点上的数据

Hadoop TextInputFormat源码分析

InputFormat主要用于描述输入数据的格式(我们只分析新API,即org.apache.hadoop.mapreduce.lib.input.InputFormat),提供以下两个功能: (1).数据切分:按照某个策略将输入数据切分成若干个split,以便确定MapTask个数以及对应的split. (2).为Mapper提供输入数据:读取给定的split的数据,解析成一个个的key/value对,共Mapper使用. InputFormat抽象类中只有两个方法,分别对应上面两个功能,源码

Hadoop InputFormat源码分析

平时我们写MapReduce程序的时候,在设置输入格式的时候,总会调用形如job.setInputFormatClass(KeyValueTextInputFormat.class)来保证输入文件按照我们想要的格式被读取.所有的输入格式都继承于InputFormat,这是一个抽象类,其子类有专门用于读取普通文件的FileInputFormat,用来读取数据库的DBInputFormat等等. 不同的InputFormat都会按自己的实现来读取输入数据并产生输入分片,一个输入分片会被单独的MapT

Hadoop HDFS源码分析 读取命名空间镜像和编辑日志数据

读取命名空间镜像和编辑日志数据 1.读取命名空间镜像 类FSImage是 命名空间镜像的java实现,在源码中,英文注释为, /** * FSImage handles checkpointing and logging of the namespace edits. * */ FSImage.loadFSImage(FSNamesystem, StartupOption, MetaRecoveryContext) 读取命名空间镜像. 1 private boolean loadFSImage(

MPTCP 源码分析(四) 发送和接收数据

简述: MPTCP在发送数据方面和TCP的区别是可以从多条路径中选择一条 路径来发送数据.MPTCP在接收数据方面与TCP的区别是子路径对无序包 进行重排后,MPTCP的mpcb需要多所有子路径的包进行排序.查看图1可知. +-------------------------------+ | Application | +---------------+ +-------------------------------+ | Application | | MPTCP | +---------

Apache Hadoop hdfs源码分析

FileSystem.get --> 通过反射实例化了一个DistributedFileSystem --> new DFSCilent()把他作为自己的成员变量 在DFSClient构造方法里面,调用了createNamenode,使用了RPC机制,得到了一个NameNode的代理对象,就可以和NameNode进行通信了 FileSystem --> DistributedFileSystem --> DFSClient --> NameNode的代理

基于TCP网络通信的自动升级程序源码分析-启动升级文件下载程序

升级程序启动后,首先会连接服务器 private void Connect() { try { int port = int.Parse(System.Configuration.ConfigurationManager.AppSettings["Port"]); connnectionInfo = new ConnectionInfo(IPAddress, port); connection = TCPConnection.GetConnection(connnectionInfo)

Hadoop2源码分析-准备篇

1.概述 我们已经能够搭建一个高可用的Hadoop平台了,也熟悉并掌握了一个项目在Hadoop平台下的开发流程,基于Hadoop的一些套件我们也能够使用,并且能利用这些套件进行一些任务的开发.在Hadoop的应用级别上,我们接着往后面去研究学习,那就是Hadoop的源码了,作为Hadoop开发人员,我们得去学习和研究Hadoop得实现原理,底层框架的设计,编码的实现过程等等,下面就开始我们今天的Hadoop源码分析之旅. 2.准备 在分析源码之前,我们需要准备好分析源码的环境,以及如何去分析(分