DataNode启动优化改进:磁盘检测并行化

前言



在本篇文章中,本人打算聊一个大家平常都会遇见到的场景:HDFS中的DataNode启动的问题。DataNode启动不是一件非常迅速的事情吗?这其中能有大问题?看到这里,如果你也是这么想的话,那说明一点:你所运维的集群碰到的异常场景还不太多。本文所要讲述的问题并不是DataNode无法启动的问题,而是它启动有时会过慢的问题。DataNode进程启动过慢造成的直接影响是其上数据的服务延时。集群规模在一个很大规模量级的情况下,如果出现了大量DataNode慢启动的现象,这将会对集群本身对外提供服务造成不小的影响。本文所要讲述的内容是对于目前DataNode的启动优化,以此加速其启动时间,优化的主要点在于将DataNode启动时的磁盘检测行为。

现有DataNode启动时的磁盘检测



在讲述本节主要内容之前,我们要大概了解DataNode启动时的相关操作。在HDFS中,一个DataNode从启动开始到最终提供数据服务,中间会做很多的操作步骤。这里主要概括为以下几点:

  • 1.读取解析数据目录即datadir所配置的目录。
  • 2.检查这些目录对应磁盘是否是坏的磁盘(此步骤现有的逻辑为串行执行)。
  • 3.DataNode发送心跳信息,向NameNode进行注册。
  • 4.扫描各个数据目录下的数据块,并将这些数据块初次汇报给NameNode。

主要为以上的逻辑,而本节我们要优化的点在于其中第2点,也就是磁盘检测相关的操作。在DataNode的启动过程中,为什么要对磁盘做一次健康检查呢?因为它是保证节点本身数据可用性的一个重要指标,如果DataNode在磁盘检测中发现坏盘的个数超出了可容忍阈值(可配)的情况下,会直接让DataNode启动失败,并抛出异常。由此可见,HDFS对其磁盘可用性的一个重视。在正常情况下,这部分的检测操作会非常顺利,但是在某些情况下,可能会出现检测十分耗时的情况,比如下面两类情况:

第一个,如果节点内配置的磁盘目录非常多,比如一个机器,上面有10来块,20来块盘,然后我配置了对应盘数的目录。由于目前磁盘健康检测的逻辑是串行执行,所以总执行时间会线性增长。当然,如果机器磁盘本身都比较健康,它所花的总时间也不会多多少时间。可怕的是第二种情况。

第二个,个别DataNode磁盘数据目录检测出现非常慢的现象,可能是这个目录对应磁盘本身的性能问题(DataNode磁盘健康检测时会尝试在目录下创建文件、目录动作以此确定磁盘是否可用)。这个时候后面待检测的目录就会被迫等待当前磁盘检测动作的完成,最后就会导致总检测时间过长。

所以为了避免出现第二点提到的个别磁盘检测极慢影响到整体的问题,我们可以对其进行改造,改造的核心点就在于将原有磁盘检测的执行逻辑由串行化改为并行化。这个改进想法是目前社区在做的,JIRA编号HDFS-11086DataNode disk check improvements)。本文的主要思想和代码也是借鉴于这个JIRA上的。

现有DataNode内部磁盘检测代码



接下来我们来看一下目前DataNode磁盘检测代码,既然我们已经知道它是串行执行的逻辑了,那么DataNode内部到底是怎么执行的呢?

首先它是在初始化DataNode节点的操作中,代码如下:

  public static DataNode instantiateDataNode(String args [], Configuration conf,
      SecureResources resources) throws IOException {
    if (conf == null)
      conf = new HdfsConfiguration();

    if (args != null) {
      // parse generic hadoop options
      GenericOptionsParser hParser = new GenericOptionsParser(conf, args);
      args = hParser.getRemainingArgs();
    }

    // 解析DataNode启动参数
    if (!parseArguments(args, conf)) {
      printUsage(System.err);
      return null;
    }
    Collection<StorageLocation> dataLocations = getStorageLocations(conf);
    UserGroupInformation.setConfiguration(conf);
    SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
        DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, getHostName(conf));
    // 进入DataNode构建实例方法
    return makeInstance(dataLocations, conf, resources);
  }

r然后我们进入makeInstance方法,

  static DataNode makeInstance(Collection<StorageLocation> dataDirs,
      Configuration conf, SecureResources resources) throws IOException {
    LocalFileSystem localFS = FileSystem.getLocal(conf);
    FsPermission permission = new FsPermission(
        conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
                 DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
    // 初始化磁盘检测对象
    DataNodeDiskChecker dataNodeDiskChecker =
        new DataNodeDiskChecker(permission);
    // 传入磁盘检测对象进行磁盘检测
    List<StorageLocation> locations =
        checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker);
    DefaultMetricsSystem.initialize("DataNode");

    assert locations.size() > 0 : "number of data directories should be > 0";
    return new DataNode(conf, locations, resources);
  }

最后我们进入其中真正的磁盘检测动作,

  static List<StorageLocation> checkStorageLocations(
      Collection<StorageLocation> dataDirs,
      LocalFileSystem localFS, DataNodeDiskChecker dataNodeDiskChecker)
          throws IOException {
    ArrayList<StorageLocation> locations = new ArrayList<StorageLocation>();
    StringBuilder invalidDirs = new StringBuilder();
    // 遍历数据目录,注意这里就是串行的方式
    for (StorageLocation location : dataDirs) {
      final URI uri = location.getUri();
      try {
        // 利用磁盘检测对象进行磁盘目录的检测
        dataNodeDiskChecker.checkDir(localFS, new Path(uri));
        // 检测完毕没有抛出异常,则说明目录可用,加入到可用列表
        locations.add(location);
      } catch (IOException ioe) {
        // 如果出现IO异常,说明此磁盘目录不可用,加入到目录中
        LOG.warn("Invalid " + DFS_DATANODE_DATA_DIR_KEY + " "
            + location + " : ", ioe);
        invalidDirs.append("\"").append(uri.getPath()).append("\" ");
      }
    }
    // 如果可用目录数量为0,表明所有的目录都不可用
    if (locations.size() == 0) {
      throw new IOException("All directories in "
          + DFS_DATANODE_DATA_DIR_KEY + " are invalid: "
          + invalidDirs);
    }
    // 返回可用数据目录列表
    return locations;
  }

OK,由此我们可以明确地知道我们将要改造的地方了。

DataNode磁盘检测并行化改造



在此小节部分,我们将要介绍社区目前在这方面的改进,主要集中在JIRA HDFS-11086下的2个子JIRA:HDFS-11119Support for parallel checking of StorageLocations on DataNode startup)和HDFS-11148Update DataNode to use StorageLocationChecker at startup)。

在HDFS-11086中,作者引入了类似于Future-Get的异步执行模式,但是它没有用JDK中原生的Future-Get,而是名叫ListenableFuture的类(包名com.google.common.util.concurrent.ListenableFuture)。大家可以试着用用这个Future类

还有2个小点是本人同样认为是不错的优化点:第一,它额外保留了最近一次磁盘检测的结果,以及新定义了最小检测需要间隔的时间大小。这一点其实是非常有意义的,它可以避免短时间内的重复检测动作。也就是说,如果某块磁盘在最小检测间隔时间内又一次被检测了,则将直接返回上次的检测结果,不执行真正的检测操作。第二,它的内部新定义了磁盘检测的最大超时时间,换句话说,如果某磁盘检测处于非常慢的情况下时,直接抛出IO异常来终止此操作,进行下一个磁盘检测结果的返回

磁盘检测改造相关类设计



在本次磁盘检测改造的相关类设计中,定义了下面几个类,这几个类被放在了新的package:org.apache.hadoop.hdfs.server.datanode.checker下(大家获取最新的hadoop-trunk的代码就能找到)。

  • AsyncChecker:最基本的接口类,内部定义了用于发启异步检测与停止的操作方法。
  • ThrottledAsyncChecker:异步检测接口的具体实现类。
  • StorageLocationChecker:磁盘检测对象类,此对象会调用上面异步检测磁盘类来对各个磁盘进行并行地检测。
  • VolumeCheckResult:磁盘检测结果类,里面定义了3种检测结果:HEALTHY、DEGRADED、FAILED。

磁盘检测具体代码实现



了解完相关类的设计之后,我们最后要来真正学习此部分的代码实现了。首先要改造的地方就是前面篇幅makeInstance方法中的串行检测的逻辑,原始代码如下:

  static DataNode makeInstance(Collection<StorageLocation> dataDirs,
      Configuration conf, SecureResources resources) throws IOException {
    LocalFileSystem localFS = FileSystem.getLocal(conf);
    FsPermission permission = new FsPermission(
        conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
                 DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
    DataNodeDiskChecker dataNodeDiskChecker =
        new DataNodeDiskChecker(permission);
    // 这里就是要改造的地方
    List<StorageLocation> locations =
        checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker);
    DefaultMetricsSystem.initialize("DataNode");

    assert locations.size() > 0 : "number of data directories should be > 0";
    return new DataNode(conf, locations, resources);
  }

在此处代码部分,将会改造为如下代码:

  static DataNode makeInstance(Collection<StorageLocation> dataDirs,
      Configuration conf, SecureResources resources) throws IOException {
    // 改造后的结果,利用StorageLocationChecker类来做并行检测
    List<StorageLocation> locations;
    StorageLocationChecker storageLocationChecker =
        new StorageLocationChecker(conf, new Timer());
    try {
      locations = storageLocationChecker.check(conf, dataDirs);
    } catch (InterruptedException ie) {
      throw new IOException("Failed to instantiate DataNode", ie);
    }
    DefaultMetricsSystem.initialize("DataNode");
    assert locations.size() > 0 : "number of data directories should be > 0";
    return new DataNode(conf, locations, resources);
  }

然后我们来重点查看StorageLocationChecker内的检测方法,

  public List<StorageLocation> check(
      final Configuration conf,
      final Collection<StorageLocation> dataDirs)
      throws InterruptedException, IOException {

    // 定义可用磁盘、不可用磁盘列表
    final ArrayList<StorageLocation> goodLocations = new ArrayList<>();
    final Set<StorageLocation> failedLocations = new HashSet<>();
    // 磁盘检测结果
    final Map<StorageLocation, ListenableFuture<VolumeCheckResult>> futures =
        Maps.newHashMap();
    final LocalFileSystem localFS = FileSystem.getLocal(conf);
    final CheckContext context = new CheckContext(localFS, expectedPermission);

    // 遍历磁盘目录,将待检测目录加入到AsyncChecker中,进行并行检测
    for (StorageLocation location : dataDirs) {
      futures.put(location,
          delegateChecker.schedule(location, context));
    }

    // 记录当前开始检测时间
    final long checkStartTimeMs = timer.monotonicNow();

    // Retrieve the results of the disk checks.
    for (Map.Entry<StorageLocation,
             ListenableFuture<VolumeCheckResult>> entry : futures.entrySet()) {

      // 计算当前已经等待的执行时间
      final long waitSoFarMs = (timer.monotonicNow() - checkStartTimeMs);
      // 用最大允许检测时间减去当前已经执行的时间来计算可允许再等待的时间
      final long timeLeftMs = Math.max(0,
          maxAllowedTimeForCheckMs - waitSoFarMs);
      final StorageLocation location = entry.getKey();

      try {
        // 阻塞式获取执行结果,最长阻塞时间为剩余可等待时间,超出此时间会抛出超时异常
        final VolumeCheckResult result =
            entry.getValue().get(timeLeftMs, TimeUnit.MILLISECONDS);
        // 根据检测结果加入到不同的列表最中
        switch (result) {
        case HEALTHY:
          goodLocations.add(entry.getKey());
          break;
        case DEGRADED:
          LOG.warn("StorageLocation {} appears to be degraded.", location);
          break;
        case FAILED:
          LOG.warn("StorageLocation {} detected as failed.", location);
          failedLocations.add(location);
          break;
        default:
          LOG.error("Unexpected health check result {} for StorageLocation {}",
              result, location);
          goodLocations.add(entry.getKey());
        }
      } catch (ExecutionException|TimeoutException e) {
        // 如果抛出异常,也加入到失败列表中
        LOG.warn("Exception checking StorageLocation " + location,
            e.getCause());
        failedLocations.add(location);
      }
    }
    // 如果不可用磁盘目录数超过阈值,则抛出IO异常
    if (failedLocations.size() > maxVolumeFailuresTolerated) {
      throw new IOException(
          "Too many failed volumes: " + failedLocations.size() +
          ". The configuration allows for a maximum of " +
          maxVolumeFailuresTolerated + " failed volumes.");
    }
    // 如果没有可用磁盘目录,也抛出异常
    if (goodLocations.size() == 0) {
      throw new IOException("All directories in "
          + DFS_DATANODE_DATA_DIR_KEY + " are invalid: "
          + failedLocations);
    }
    // 返回可用磁盘目录列表
    return goodLocations;
  }

上面执行的逻辑非常的清晰,最后我们再来看ThrottledAsyncChecker的异步检测逻辑,入口即为上面的schedule方法。

  public synchronized ListenableFuture<V> schedule(
      final Checkable<K, V> target,
      final K context) {
    LOG.debug("Scheduling a check of {}", target);
    // 如果此对象已经是在检测中的状态时,则返回之前的对象
    if (checksInProgress.containsKey(target)) {
      return checksInProgress.get(target);
    }

    // 如果此结果已包含在完成列表中的情况
    if (completedChecks.containsKey(target)) {
      // 取出此对象的检测结果
      final LastCheckResult<V> result = completedChecks.get(target);
      // 计算距离上次检测结果的时间
      final long msSinceLastCheck = timer.monotonicNow() - result.completedAt;
      // 如果此间隔时间在最小间隔时间范围内,则直接返回上次的检测结果
      if (msSinceLastCheck < minMsBetweenChecks) {
        LOG.debug("Skipped checking {}. Time since last check {}ms " +
            "is less than the min gap {}ms.",
            target, msSinceLastCheck, minMsBetweenChecks);
        return result.result != null ?
            Futures.immediateFuture(result.result) :
            Futures.immediateFailedFuture(result.exception);
      }
    }
    // 否则提交到线程池中进行异步检测
    final ListenableFuture<V> lf = executorService.submit(
        new Callable<V>() {
          @Override
          public V call() throws Exception {
            return target.check(context);
          }
        });
    // 将Future对象加入到正在执行列表中
    checksInProgress.put(target, lf);
    addResultCachingCallback(target, lf);
    return lf;
  }

这里的目标对象target.check方法会调用到DiskCheker的真正磁盘检测方法,也就是StorageLocation的check方法,相关代码如下:

  public VolumeCheckResult check(CheckContext context) throws IOException {
    // 调用真正磁盘检测类DiskChecker的检测操作
    DiskChecker.checkDir(
        context.localFileSystem,
        new Path(baseURI),
        context.expectedPermission);
    return VolumeCheckResult.HEALTHY;
  }

OK,以上就是本文所要阐述的DataNode启动优化改造之磁盘检测并行化的内容了。大家可不能小看这仅仅是一个小小的改造,在某些极端情况下,可能就会帮助我们避免了后续的许多问题。

参考资料



[1].DataNode disk check improvements

[2].Support for parallel checking of StorageLocations on DataNode startup

[3].Update DataNode to use StorageLocationChecker at startup

时间: 2024-10-06 00:07:09

DataNode启动优化改进:磁盘检测并行化的相关文章

Linux启动时间优化-内核和用户空间启动优化实践

关键词:initcall.bootgraph.py.bootchartd.pybootchart等. 启动时间的优化,分为两大部分,分别是内核部分和用户空间两大部分. 从内核timestamp 0.000000作为内核启动起点,到free_initmem()输出"Freeing init memory"作为内核启动的终点. 借助于bootgraph.py对内核的kmsg进行分析,输出bootgraph.html和initcall耗时csv文件. 在紧接着free_initmem()下面

一触即发——App启动优化最佳实践

一触即发 App启动优化最佳实践 文中的很多图都是Google性能优化指南第六季中的一些截图 Google给出的优化指南来镇楼 https://developer.android.com/topic/performance/launch-time.html 闪屏定义 Android官方的性能优化典范,从第六季开始,发起了一系列针对App启动的优化实践,地址如下: https://www.youtube.com/watch?v=Vw1G1s73DsY&index=74&list=PLWz5r

一触即发 App启动优化最佳实践

一触即发 App启动优化最佳实践 本文在 DiyCode 和 CSDN个人博客 同时首发,关注作者的 DiyCode帐号 或者 作者微博 可第一时间收到新文章推送. 文中的很多图都是Google性能优化指南第六季中的一些截图 Google给出的优化指南来镇楼 https://developer.android.com/topic/performance/launch-time.html 闪屏定义 Android官方的性能优化典范,从第六季开始,发起了一系列针对App启动的优化实践,地址如下: h

TLD跟踪算法优化(一)并行化

才学疏浅,只言片语,只求志同道的朋友一起交流研究. 并行化不算是算法的改进,只是追求运行的实时性. 简要列举一个例子: TLD算法的C++版本源码里: LKTracker::trackf2f(const Mat& img1, const Mat& img2,vector<Point2f> &points1, vector<cv::Point2f> &points2){ bool LKTracker::trackf2f(const Mat& i

基于Android官方AsyncListUtil优化改进RecyclerView分页加载机制(一)

基于Android官方AsyncListUtil优化改进RecyclerView分页加载机制(一) Android AsyncListUtil是Android官方提供的专为列表这样的数据更新加载提供的异步加载组件.基于AsyncListUtil组件,可以轻易实现常见的RecyclerView分页加载技术.AsyncListUtil技术涉及的细节比较繁复,因此我将分别写若干篇文章,分点.分解AsyncListUtil技术. 先给出一个可运行的例子,MainActivity.java: packag

Android 项目优化(五):应用启动优化

介绍了前面的优化的方案后,这里我们在针对应用的启动优化做一下讲解和说明. 一.App启动概述 一个应用App的启动速度能够影响用户的首次体验,启动速度较慢(感官上)的应用可能导致用户再次开启App的意图下降,或者卸载放弃该应用程序. 应用程序启动有主要分为两种状态,每种状态都会影响应用程序对用户可见所需的时间:冷启动,热启动. 冷启动:冷启动表示用户首次打开应用,这时进程还没创建,包含了Application创建的过程.冷启动时间指从第一次用户点击Launcher中的应用图标开始,到首页内容全部

rac 11g_第二个节点重启后无法启动实例:磁盘组dismount问题

原创作品,出自 "深蓝的blog" 博客,欢迎转载,转载时请务必注明以下出处,否则追究版权法律责任. 深蓝的blog:http://blog.csdn.net/huangyanlong/article/details/41480075 rac第二个节点重启后无法启动实例:磁盘组dismount问题 实验案例: 实验环境:CentOS 6.4.Oracle 11.2.0.1 现象重演:1. 重启第二节点服务器2. 手工启动第二节点实例,报错[[email protected] ~]# s

hadoop namenode多次格式化后,导致datanode启动不了

jps hadoop namenode -format dfs directory : /home/hadoop/dfs --data --current/VERSION #Wed Jul 30 20:41:03 CST 2014 storageID=DS-ab96ad90-7352-4cd5-a0de-7308c8a358ff clusterID=CID-aa2d4761-974b-4451-8858-bbbcf82e1fd4 cTime=0 datanodeUuid=a3356a09-780

hadoop问题总结:datanode启动后,在web50070端口发现不到datanode节点(能力工场)

直接上问题:这两天为了试验,安装了两套集群: (1)32位hadoop1集群(5个节点); (2)64位hadoop2集群(6个节点) 两个集群中都遇到过这样的问题:在namenode正常启动hadoop集群后,查看datanode是正常的显示存在进程,但是在web界面中查看的时候,发现数据节点全部属于宕机,或者就是干脆直接没有datanode.还有种情况,datanode启动后,jps查看是在的,但是一会再去查看,发现挂掉了.还有就是,存储空间显示占用100% 其实这两个集群问题是一样的,都是