Hadoop-2.6.0中关于控制应用是否通过CGroup限制CPU的优化

一、背景

Hadoop-2.6.0中,通过一系列复杂的配置,尤其是LinuxContainerExecutor和CgroupsLCEResourcesHandler这两个组件的使用,使得应用程序可以通过cgroup来限制其CPU的使用,防止CPU消耗过高的作业占住CPU,而其它作业无法使用。

但是,这样也随之带来了一个问题,那就是一旦CPU CGroup启动,所有的应用都会受其限制,而且普遍的,生产集群配置的yarn.nodemanager.resource.cpu-vcores一般是高于物理内核数的,而作业的Container被分配的虚拟内核vcore为1的情况下,比不启用CPU CGroup时运行时间要长一些。我在5台机器上测试的结果是前者是后者的1.5倍。当然,这可能还需要在大规模集群上进行详细测试。但是有一点目前是肯定的,那就是启用CPU CGroup会使得原本不想限制CPU使用的应用受到限制,从而延长其运行时间。

那么,有没有一种方案来实现应用级别的CPU CGroup呢?

二、思考

通过分析CgroupsLCEResourcesHandler的源码,我了解到有一个参数,可以配置决定是否严格限制Container的CPU使用,即yarn.nodemanager.linux-container-executor.cgroups.strict-resource-usage,但实际上这个参数是属于NodeManager的,不是属于应用的,一旦设置并重启NodeManager后,任何应用都无法修改。并且,在Yarn中,ContainerExecutor这个组件是在NodeManager的serviceInit()方法中实例化的,如下:

@Override
protected void serviceInit(Configuration conf) throws Exception {
  // ...省略部分代码
  ContainerExecutor exec = ReflectionUtils.newInstance(
      conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
        DefaultContainerExecutor.class, ContainerExecutor.class), conf);
  try {
    exec.init();
  } catch (IOException e) {
    throw new YarnRuntimeException("Failed to initialize container executor", e);
  }
  // ...省略部分代码
}

也就是说,NodeManager进程中就一个ContainerExecutor实例,而LCEResourcesHandler又是在ContainerExecutor组件(这里是LinuxContainerExecutor)中实例化的,并且其配置Configuration实例与ContainerExecutor、NodeManager共用的一个,都是属于NodeManager级别的配置信息,如下:

@Override
public void setConf(Configuration conf) {
  super.setConf(conf);
  containerExecutorExe = getContainerExecutorExecutablePath(conf);

  resourcesHandler = ReflectionUtils.newInstance(
          conf.getClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER,
            DefaultLCEResourcesHandler.class, LCEResourcesHandler.class), conf);
  resourcesHandler.setConf(conf);
  // ...省略部分代码
}

这也就决定了通过参数配置这条道路是行不通的,因为配置是属于NodeManager的,应用无法修改。那么我们可以通过什么方式来实现呢?

CgroupsLCEResourcesHandler主要是通过preExecute(ContainerId containerId, Resource containerResource)、postExecute(ContainerId containerId)两个方法实现的CPU CGroup限制时的环境设置,而它们的参数都有ContainerId,也就是它们是与容器息息相关的,通过仔细研究容器Container的相关代码,我找到了答案,那就是容器运行时的环境变量参数。而容器的getLaunchContext()方法提供了容器启动的上下文信息ContainerLaunchContext,这个上下文提供了获取和配置环境变量的getEnvironment()和setEnvironment()方法,通过它就能实现应用级别的CPU CGroup限制了。而MapReduce为我们提供了mapreduce.map.env、mapreduce.reduce.env、yarn.app.mapreduce.am.env三个参数,来分别设置Map容器、Reduce容器和AM容器时的环境变量。

三、优化方案

1、添加ContainerExecutor组件

这个ContainerExecutor组件可以直接继承自LinuxContainerExecutor类,然后重写其launchContainer()方法,通过上述环境变量加ContainerID,在执行LinuxContainerExecutor的launchContainer()前设置一个容器级别的参数,如下:

package com.xxx.cgroup;

import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;

public class XxxLinuxContainerExecutor extends LinuxContainerExecutor {

    @Override
    public int launchContainer(Container container, Path nmPrivateCotainerScriptPath, Path nmPrivateTokensPath,
            String user, String appId, Path containerWorkDir, List<String> localDirs, List<String> logDirs)
            throws IOException {

        boolean map_cgroup_para = Boolean.valueOf(container.getLaunchContext().getEnvironment().get("xxx_cpu_cgroup_map"));
        boolean reduce_cgroup_para = Boolean.valueOf(container.getLaunchContext().getEnvironment().get("xxx_cpu_cgroup_reduce"));
        boolean am_cgroup_para = Boolean.valueOf(container.getLaunchContext().getEnvironment().get("xxx_cpu_cgroup_am"));

        // 设置容器参数
        if (map_cgroup_para) {
            super.getConf().set(container.getContainerId().toString() + "_xxx_cpu_cgroup_map", String.valueOf(map_cgroup_para));
        }
        if (reduce_cgroup_para) {
            super.getConf().set(container.getContainerId().toString() + "_xxx_cpu_cgroup_reduce", String.valueOf(reduce_cgroup_para));
        }
        if (am_cgroup_para) {
            super.getConf().set(container.getContainerId().toString() + "_xxx_cpu_cgroup_am", String.valueOf(am_cgroup_para));
        }

        int i = super.launchContainer(container, nmPrivateCotainerScriptPath, nmPrivateTokensPath, user, appId,
                containerWorkDir, localDirs, logDirs);

        // 清空容器参数,防止配置Map数据量过大
        if (map_cgroup_para) {
            super.getConf().unset(container.getContainerId().toString() + "_xxx_cpu_cgroup_map");
        }
        if (reduce_cgroup_para) {
            super.getConf().unset(container.getContainerId().toString() + "_xxx_cpu_cgroup_reduce");
        }
        if (am_cgroup_para) {
            super.getConf().unset(container.getContainerId().toString() + "_xxx_cpu_cgroup_am");
        }

        return i;
    }
}

然后通过配置yarn.nodemanager.container-executor.class为com.bfd.cgroup.XxxLinuxContainerExecutor。

2、添加LCEResourcesHandler组件

这个LCEResourcesHandler组件直接继承自原BfdCgroupsLCEResourcesHandler,然后覆写其中的三个方法,preExecute()、postExecute()、getResourcesOption(),通过容器相关的环境变量来决定

是否启用CPU CGroup,如下:

package com.xxx.cgroup;

import java.io.IOException;

import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler;

public class XxxCgroupsLCEResourcesHandler extends CgroupsLCEResourcesHandler {

    public XxxCgroupsLCEResourcesHandler() {
        super();
    }

    @Override
    public void preExecute(ContainerId containerId, Resource containerResource) throws IOException {

        boolean map_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_map"));
        boolean reduce_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_reduce"));
        boolean am_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_am"));

        if (map_cgroup_para || reduce_cgroup_para || am_cgroup_para) {
            super.preExecute(containerId, containerResource);
        }
    }

    @Override
    public void postExecute(ContainerId containerId) {

        boolean map_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_map"));
        boolean reduce_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_reduce"));
        boolean am_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_am"));

        if (map_cgroup_para || reduce_cgroup_para || am_cgroup_para) {
            super.postExecute(containerId);
        }
    }

    @Override
    public String getResourcesOption(ContainerId containerId) {

        boolean map_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_map"));
        boolean reduce_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_reduce"));
        boolean am_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_am"));

        if (map_cgroup_para || reduce_cgroup_para || am_cgroup_para) {
            return super.getResourcesOption(containerId);
        } else {

            // 下面这些必须有,否则报错
            StringBuilder sb = new StringBuilder("cgroups=");

            if (sb.charAt(sb.length() - 1) == ‘,‘) {
                sb.deleteCharAt(sb.length() - 1);
            }

            return sb.toString();
        }
    }
}

然后配置yarn.nodemanager.linux-container-executor.resources-handler.class为com.bfd.cgroup.XxxCgroupsLCEResourcesHandler。

3、应用程序中设置环境变量参数

应用程序中,如果想对CPU进行CGroup限制,需要配置以下三个环境变量,如下:

conf.set("mapreduce.map.env", "xxx_cpu_cgroup_map=true");
conf.set("mapreduce.reduce.env", "xxx_cpu_cgroup_reduce=true");
conf.set("yarn.app.mapreduce.am.env", "xxx_cpu_cgroup_am=true");

四、测试

测试结果显示,上述改动能够实现应用级别的CPU CGroup限制,启动和不启动的执行时间明显有差距,且两者并行运行的话,也能实现该效果,说明参数能够达到应用级别(实际上是容器级别)而互不干扰。

时间: 2024-11-10 11:18:10

Hadoop-2.6.0中关于控制应用是否通过CGroup限制CPU的优化的相关文章

Hadoop 2.0中单点故障解决方案总结

项目构建 Hadoop 1.0内核主要由两个分支组成:MapReduce和HDFS,众所周知,这两个系统的设计缺陷是单点故障,即MR的JobTracker和HDFS的NameNode两个核心服务均存在单点问题,该问题在很长时间内没有解决,这使得Hadoop在相当长时间内仅适合离线存储和离线计算. 令人欣慰的是,这些问题在Hadoop 2.0中得到了非常完整的解决.Hadoop 2.0内核由三个分支组成,分别是HDFS.MapReduce和YARN,而Hadoop生态系统中的其他系统,比如HBas

Hadoop 2.0 中的资源管理框架 - YARN(Yet Another Resource Negotiator)

1. Hadoop 2.0 中的资源管理 http://dongxicheng.org/mapreduce-nextgen/hadoop-1-and-2-resource-manage/ Hadoop 2.0指的是版本为Apache Hadoop 0.23.x.2.x或者CDH4系列的Hadoop,内核主要由HDFS.MapReduce和YARN三个系统组成,其中,YARN是一个资源管理系统,负责集群资源管理和调度,MapReduce则是运行在YARN上离线处理框架,它与Hadoop 1.0中的

Hadoop-2.2.0中文文档—— 从Hadoop 1.x 迁移至 Hadoop 2.x

简介 本文档对从 Apache Hadoop 1.x 迁移他们的Apache Hadoop MapReduce 应用到 Apache Hadoop 2.x 的用户提供了一些信息. 在 Apache Hadoop 2.x 中,我们已经把资源管理功能放入 分布式应用管理框架 的Apache Hadoop YARN,而 Apache Hadoop MapReduce (亦称 MRv2) 保持为一个纯分布式计算框架. 总之,之前的 MapReduce 运行时 (亦称 MRv1) 已经被重用并且不会有重大

Hadoop中的控制脚本

1.提出问题 在上篇博文中,提到了为什么要配置ssh免密码登录,说是Hadoop控制脚本依赖SSH来执行针对整个集群的操作,那么Hadoop中控制脚本都是什么东西呢?具体是如何通过SSH来针对整个集群的操作?网上完全分布模式下Hadoop的搭建很多,可是看完后,真的了解吗?为什么要配置Hadoop下conf目录下的masters文件和slaves文件,masters文件里面主要记录的是什么东西,slaves文件中又记录的是什么东西,masters文件和slaves文件都有什么作用?好,我看到过一

Hadoop 1.0 和 2.0 中的数据处理框架 - MapReduce

1. MapReduce - 映射.化简编程模型 运行原理: 2. Hadoop V1 中的 MapReduce 的实现 Hadoop 1.0 指的是版本为Apache Hadoop 0.20.x.1.x或者CDH3系列的Hadoop,内核主要由HDFS和MapReduce两个系统组成,其中,MapReduce是一个离线处理框架,由编程模型(新旧API).运行时环境(JobTracker和TaskTracker)和 数据处理引擎(MapTask和ReduceTask)三部分组成. 2.1 Had

Hadoop-2.2.0中文文档—— Common - CLI MiniCluster

目的 使用 CLI MiniCluster, 用户可以简单地只用一个命令就启动或关闭一个单一节点的Hadoop集群,不需要设置任何环境变量或管理配置文件. CLI MiniCluster 同时启动一个 YARN/MapReduce 和 HDFS 集群. 这对那些想要快速体验一个真实的Hadoop集群或是测试依赖明显的Hadoop函数的非Java程序 的用户很有用. Hadoop Tarball 你需要从发布页获取tar包.或者,你可以从源码中自己编译. $ mvn clean install -

Hadoop-2.2.0中文文档——MapReduce 下一代 -——集群配置

目的 这份文档描写叙述了怎样安装.配置和管理从几个节点到有数千个节点的Hadoop集群. 玩的话,你可能想先在单机上安装.(看单节点配置). 准备 从Apache镜像上下载一个Hadoop的稳定版本号. 安装 安装一个Hadoop集群,一般包含分发软件到全部集群中的机器上或者是安装RPMs. 一般地,集群中的一台机器被唯一地设计成NameNode,还有一台机器被设置成ResourceManager.这是master(主). 集群中剩下的机器作为DataNode 和 NodeManager.这些是

Hadoop2.0中单点故障解决方案总结---老董

Hadoop 1.0内核主要由两个分支组成:MapReduce和HDFS,众所周知,这两个系统的设计缺陷是单点故障,即MR的JobTracker和HDFS的NameNode两个核心服务均存在单点问题,该问题在很长时间内没有解决,这使得Hadoop在相当长时间内仅适合离线存储和离线计算. 令人欣慰的是,这些问题在Hadoop 2.0中得到了非常完整的解决.Hadoop 2.0内核由三个分支组成,分别是HDFS.MapReduce和YARN,而Hadoop生态系统中的其他系统,比如HBase.Hiv

Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN

Spark SQL, DataFrames and Datasets Guide Overview SQL Datasets and DataFrames 开始入门 起始点: SparkSession 创建 DataFrames 无类型的Dataset操作 (aka DataFrame 操作) Running SQL Queries Programmatically 全局临时视图 创建Datasets RDD的互操作性 使用反射推断Schema 以编程的方式指定Schema Aggregatio