一、背景
Hadoop-2.6.0中,通过一系列复杂的配置,尤其是LinuxContainerExecutor和CgroupsLCEResourcesHandler这两个组件的使用,使得应用程序可以通过cgroup来限制其CPU的使用,防止CPU消耗过高的作业占住CPU,而其它作业无法使用。
但是,这样也随之带来了一个问题,那就是一旦CPU CGroup启动,所有的应用都会受其限制,而且普遍的,生产集群配置的yarn.nodemanager.resource.cpu-vcores一般是高于物理内核数的,而作业的Container被分配的虚拟内核vcore为1的情况下,比不启用CPU CGroup时运行时间要长一些。我在5台机器上测试的结果是前者是后者的1.5倍。当然,这可能还需要在大规模集群上进行详细测试。但是有一点目前是肯定的,那就是启用CPU CGroup会使得原本不想限制CPU使用的应用受到限制,从而延长其运行时间。
那么,有没有一种方案来实现应用级别的CPU CGroup呢?
二、思考
通过分析CgroupsLCEResourcesHandler的源码,我了解到有一个参数,可以配置决定是否严格限制Container的CPU使用,即yarn.nodemanager.linux-container-executor.cgroups.strict-resource-usage,但实际上这个参数是属于NodeManager的,不是属于应用的,一旦设置并重启NodeManager后,任何应用都无法修改。并且,在Yarn中,ContainerExecutor这个组件是在NodeManager的serviceInit()方法中实例化的,如下:
@Override protected void serviceInit(Configuration conf) throws Exception { // ...省略部分代码 ContainerExecutor exec = ReflectionUtils.newInstance( conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR, DefaultContainerExecutor.class, ContainerExecutor.class), conf); try { exec.init(); } catch (IOException e) { throw new YarnRuntimeException("Failed to initialize container executor", e); } // ...省略部分代码 }
也就是说,NodeManager进程中就一个ContainerExecutor实例,而LCEResourcesHandler又是在ContainerExecutor组件(这里是LinuxContainerExecutor)中实例化的,并且其配置Configuration实例与ContainerExecutor、NodeManager共用的一个,都是属于NodeManager级别的配置信息,如下:
@Override public void setConf(Configuration conf) { super.setConf(conf); containerExecutorExe = getContainerExecutorExecutablePath(conf); resourcesHandler = ReflectionUtils.newInstance( conf.getClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER, DefaultLCEResourcesHandler.class, LCEResourcesHandler.class), conf); resourcesHandler.setConf(conf); // ...省略部分代码 }
这也就决定了通过参数配置这条道路是行不通的,因为配置是属于NodeManager的,应用无法修改。那么我们可以通过什么方式来实现呢?
CgroupsLCEResourcesHandler主要是通过preExecute(ContainerId containerId, Resource containerResource)、postExecute(ContainerId containerId)两个方法实现的CPU CGroup限制时的环境设置,而它们的参数都有ContainerId,也就是它们是与容器息息相关的,通过仔细研究容器Container的相关代码,我找到了答案,那就是容器运行时的环境变量参数。而容器的getLaunchContext()方法提供了容器启动的上下文信息ContainerLaunchContext,这个上下文提供了获取和配置环境变量的getEnvironment()和setEnvironment()方法,通过它就能实现应用级别的CPU CGroup限制了。而MapReduce为我们提供了mapreduce.map.env、mapreduce.reduce.env、yarn.app.mapreduce.am.env三个参数,来分别设置Map容器、Reduce容器和AM容器时的环境变量。
三、优化方案
1、添加ContainerExecutor组件
这个ContainerExecutor组件可以直接继承自LinuxContainerExecutor类,然后重写其launchContainer()方法,通过上述环境变量加ContainerID,在执行LinuxContainerExecutor的launchContainer()前设置一个容器级别的参数,如下:
package com.xxx.cgroup; import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor; import java.io.IOException; import java.util.List; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; public class XxxLinuxContainerExecutor extends LinuxContainerExecutor { @Override public int launchContainer(Container container, Path nmPrivateCotainerScriptPath, Path nmPrivateTokensPath, String user, String appId, Path containerWorkDir, List<String> localDirs, List<String> logDirs) throws IOException { boolean map_cgroup_para = Boolean.valueOf(container.getLaunchContext().getEnvironment().get("xxx_cpu_cgroup_map")); boolean reduce_cgroup_para = Boolean.valueOf(container.getLaunchContext().getEnvironment().get("xxx_cpu_cgroup_reduce")); boolean am_cgroup_para = Boolean.valueOf(container.getLaunchContext().getEnvironment().get("xxx_cpu_cgroup_am")); // 设置容器参数 if (map_cgroup_para) { super.getConf().set(container.getContainerId().toString() + "_xxx_cpu_cgroup_map", String.valueOf(map_cgroup_para)); } if (reduce_cgroup_para) { super.getConf().set(container.getContainerId().toString() + "_xxx_cpu_cgroup_reduce", String.valueOf(reduce_cgroup_para)); } if (am_cgroup_para) { super.getConf().set(container.getContainerId().toString() + "_xxx_cpu_cgroup_am", String.valueOf(am_cgroup_para)); } int i = super.launchContainer(container, nmPrivateCotainerScriptPath, nmPrivateTokensPath, user, appId, containerWorkDir, localDirs, logDirs); // 清空容器参数,防止配置Map数据量过大 if (map_cgroup_para) { super.getConf().unset(container.getContainerId().toString() + "_xxx_cpu_cgroup_map"); } if (reduce_cgroup_para) { super.getConf().unset(container.getContainerId().toString() + "_xxx_cpu_cgroup_reduce"); } if (am_cgroup_para) { super.getConf().unset(container.getContainerId().toString() + "_xxx_cpu_cgroup_am"); } return i; } }
然后通过配置yarn.nodemanager.container-executor.class为com.bfd.cgroup.XxxLinuxContainerExecutor。
2、添加LCEResourcesHandler组件
这个LCEResourcesHandler组件直接继承自原BfdCgroupsLCEResourcesHandler,然后覆写其中的三个方法,preExecute()、postExecute()、getResourcesOption(),通过容器相关的环境变量来决定
是否启用CPU CGroup,如下:
package com.xxx.cgroup; import java.io.IOException; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler; public class XxxCgroupsLCEResourcesHandler extends CgroupsLCEResourcesHandler { public XxxCgroupsLCEResourcesHandler() { super(); } @Override public void preExecute(ContainerId containerId, Resource containerResource) throws IOException { boolean map_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_map")); boolean reduce_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_reduce")); boolean am_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_am")); if (map_cgroup_para || reduce_cgroup_para || am_cgroup_para) { super.preExecute(containerId, containerResource); } } @Override public void postExecute(ContainerId containerId) { boolean map_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_map")); boolean reduce_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_reduce")); boolean am_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_am")); if (map_cgroup_para || reduce_cgroup_para || am_cgroup_para) { super.postExecute(containerId); } } @Override public String getResourcesOption(ContainerId containerId) { boolean map_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_map")); boolean reduce_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_reduce")); boolean am_cgroup_para = Boolean.valueOf(this.getConf().get(containerId.toString() + "_xxx_cpu_cgroup_am")); if (map_cgroup_para || reduce_cgroup_para || am_cgroup_para) { return super.getResourcesOption(containerId); } else { // 下面这些必须有,否则报错 StringBuilder sb = new StringBuilder("cgroups="); if (sb.charAt(sb.length() - 1) == ‘,‘) { sb.deleteCharAt(sb.length() - 1); } return sb.toString(); } } }
然后配置yarn.nodemanager.linux-container-executor.resources-handler.class为com.bfd.cgroup.XxxCgroupsLCEResourcesHandler。
3、应用程序中设置环境变量参数
应用程序中,如果想对CPU进行CGroup限制,需要配置以下三个环境变量,如下:
conf.set("mapreduce.map.env", "xxx_cpu_cgroup_map=true"); conf.set("mapreduce.reduce.env", "xxx_cpu_cgroup_reduce=true"); conf.set("yarn.app.mapreduce.am.env", "xxx_cpu_cgroup_am=true");
四、测试
测试结果显示,上述改动能够实现应用级别的CPU CGroup限制,启动和不启动的执行时间明显有差距,且两者并行运行的话,也能实现该效果,说明参数能够达到应用级别(实际上是容器级别)而互不干扰。