Hadoop2源码分析-YARN RPC 示例介绍

1.概述

  之前在《Hadoop2源码分析-RPC探索实战》一文当中介绍了Hadoop的RPC机制,今天给大家分享关于YARN的RPC的机制。下面是今天的分享目录:

  • YARN的RPC介绍
  • YARN的RPC示例
  • 截图预览

  下面开始今天的内容分享。

2.YARN的RPC介绍

  我们知道在Hadoop的RPC当中,其主要由RPC,Client及Server这三个大类组成,分别实现对外提供编程接口、客户端实现及服务端实现。如下图所示:

  图中是Hadoop的RPC的一个类的关系图,大家可以到《Hadoop2源码分析-RPC探索实战》一文中,通过代码示例去理解他们之间的关系,这里就不多做赘述了。接下来,我们去看Yarn的RPC。

  Yarn对外提供的是YarnRPC这个类,这是一个抽象类,通过阅读YarnRPC的源码可以知道,实际的实现由参数yarn.ipc.rpc.class设定,默认情况下,其值为:org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC,部分代码如下:

  • YarnRPC:
public abstract class YarnRPC {
   // ......

    public static YarnRPC create(Configuration conf) {
    LOG.debug("Creating YarnRPC for " +
        conf.get(YarnConfiguration.IPC_RPC_IMPL));
    String clazzName = conf.get(YarnConfiguration.IPC_RPC_IMPL);
    if (clazzName == null) {
      clazzName = YarnConfiguration.DEFAULT_IPC_RPC_IMPL;
    }
    try {
      return (YarnRPC) Class.forName(clazzName).newInstance();
    } catch (Exception e) {
      throw new YarnRuntimeException(e);
    }
  }

}
  • YarnConfiguration类:
public class YarnConfiguration extends Configuration {

  //Configurations
  public static final String YARN_PREFIX = "yarn.";

  ////////////////////////////////
  // IPC Configs
  ////////////////////////////////
  public static final String IPC_PREFIX = YARN_PREFIX + "ipc.";
  /** RPC class implementation*/
  public static final String IPC_RPC_IMPL =
    IPC_PREFIX + "rpc.class";
  public static final String DEFAULT_IPC_RPC_IMPL =
    "org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC";
}

  而HadoopYarnProtoRPC 通过 RPC 的 RpcFactoryProvider 生成客户端工厂(由参数 yarn.ipc.client.factory.class 指定,默认值是 org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl)和服务器工厂 (由参数 yarn.ipc.server.factory.class 指定,默认值是 org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl),以根据通信协议的 Protocol Buffers 定义生成客户端对象和服务器对象。相关类的部分代码如下:

  • HadoopYarnProtoRPC
public class HadoopYarnProtoRPC extends YarnRPC {

  private static final Log LOG = LogFactory.getLog(HadoopYarnProtoRPC.class);

  @Override
  public Object getProxy(Class protocol, InetSocketAddress addr,
      Configuration conf) {
    LOG.debug("Creating a HadoopYarnProtoRpc proxy for protocol " + protocol);
    return RpcFactoryProvider.getClientFactory(conf).getClient(protocol, 1,
        addr, conf);
  }

  @Override
  public void stopProxy(Object proxy, Configuration conf) {
    RpcFactoryProvider.getClientFactory(conf).stopClient(proxy);
  }

  @Override
  public Server getServer(Class protocol, Object instance,
      InetSocketAddress addr, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager,
      int numHandlers, String portRangeConfig) {
    LOG.debug("Creating a HadoopYarnProtoRpc server for protocol " + protocol +
        " with " + numHandlers + " handlers");

    return RpcFactoryProvider.getServerFactory(conf).getServer(protocol,
        instance, addr, conf, secretManager, numHandlers, portRangeConfig);

  }

}
  • RpcFactoryProvider
public class RpcFactoryProvider {

  // ......

  public static RpcClientFactory getClientFactory(Configuration conf) {
    String clientFactoryClassName = conf.get(
        YarnConfiguration.IPC_CLIENT_FACTORY_CLASS,
        YarnConfiguration.DEFAULT_IPC_CLIENT_FACTORY_CLASS);
    return (RpcClientFactory) getFactoryClassInstance(clientFactoryClassName);
  }

  //......

}
/** Factory to create client IPC classes.*/
  public static final String IPC_CLIENT_FACTORY_CLASS =
    IPC_PREFIX + "client.factory.class";
  public static final String DEFAULT_IPC_CLIENT_FACTORY_CLASS =
      "org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl";

  在 YARN 中并未使用Hadoop自带的Writable来做序列化,而是使用 Protocol Buffers 作为默认的序列化机制,这带来的好处主要有以下几点:

  • 继承Protocol Buffers的优点:Protocol Buffers已被实践证明其拥有高效性、可扩展性、紧凑性以及跨语言性等特点。
  • 支持在线升级回滚:在Hadoop 2.x版本后,添加的HA方案,该方案能够进行主备切换,在不停止NNA节点服务的前提下,能够在线升级版本。

3.YARN的RPC示例

  YARN 的工作流程是先定义通信协议接口ResourceTracker,它包含2个函数,具体代码如下所示:

  • ResourceTracker:
public interface ResourceTracker {

  @Idempotent
  public RegisterNodeManagerResponse registerNodeManager(
      RegisterNodeManagerRequest request) throws YarnException,
      IOException;

  @AtMostOnce
  public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
      throws YarnException, IOException;

}

  这里ResourceTracker提供了Protocol Buffers定义和Java实现,其中设计的Protocol Buffers文件有:ResourceTracker.proto、yarn_server_common_service_protos.proto和yarn_server_common_protos.proto,文件路径在Hadoop的源码包的 hadoop-2.6.0-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto,这里就不贴出3个文件的具体代码类,大家可以到该目录去阅读这部分代码。这里需要注意的是,若是大家要编译这些文件需要安装 ProtoBuf 的编译环境,环境安装较为简单,这里给大家简要说明下。

  首先是下载ProtoBuf的安装包,然后解压,进入到解压目录,编译安装。命令如下:

./configure --prefix=/home/work /protobuf/  

make && make install

最后编译 .proto 文件的命令:

protoc ./ResourceTracker.proto  --java_out=./

  下面,我们去收取Hadoop源码到本地工程,运行调试相关代码。

  • TestYarnServerApiClasses:
public class TestYarnServerApiClasses {

  // ......

  // 列举测试4个方法  

@Test
  public void testRegisterNodeManagerResponsePBImpl() {
    RegisterNodeManagerResponsePBImpl original =
        new RegisterNodeManagerResponsePBImpl();
    original.setContainerTokenMasterKey(getMasterKey());
    original.setNMTokenMasterKey(getMasterKey());
    original.setNodeAction(NodeAction.NORMAL);
    original.setDiagnosticsMessage("testDiagnosticMessage");

    RegisterNodeManagerResponsePBImpl copy =
        new RegisterNodeManagerResponsePBImpl(
            original.getProto());
    assertEquals(1, copy.getContainerTokenMasterKey().getKeyId());
    assertEquals(1, copy.getNMTokenMasterKey().getKeyId());
    assertEquals(NodeAction.NORMAL, copy.getNodeAction());
    assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());

  }

@Test
  public void testNodeHeartbeatRequestPBImpl() {
    NodeHeartbeatRequestPBImpl original = new NodeHeartbeatRequestPBImpl();
    original.setLastKnownContainerTokenMasterKey(getMasterKey());
    original.setLastKnownNMTokenMasterKey(getMasterKey());
    original.setNodeStatus(getNodeStatus());
    NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl(
        original.getProto());
    assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId());
    assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId());
    assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost());
  }

@Test
  public void testNodeHeartbeatResponsePBImpl() {
    NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl();

    original.setDiagnosticsMessage("testDiagnosticMessage");
    original.setContainerTokenMasterKey(getMasterKey());
    original.setNMTokenMasterKey(getMasterKey());
    original.setNextHeartBeatInterval(1000);
    original.setNodeAction(NodeAction.NORMAL);
    original.setResponseId(100);

    NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl(
        original.getProto());
    assertEquals(100, copy.getResponseId());
    assertEquals(NodeAction.NORMAL, copy.getNodeAction());
    assertEquals(1000, copy.getNextHeartBeatInterval());
    assertEquals(1, copy.getContainerTokenMasterKey().getKeyId());
    assertEquals(1, copy.getNMTokenMasterKey().getKeyId());
    assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());
  }

@Test
  public void testRegisterNodeManagerRequestPBImpl() {
    RegisterNodeManagerRequestPBImpl original = new RegisterNodeManagerRequestPBImpl();
    original.setHttpPort(8080);
    original.setNodeId(getNodeId());
    Resource resource = recordFactory.newRecordInstance(Resource.class);
    resource.setMemory(10000);
    resource.setVirtualCores(2);
    original.setResource(resource);
    RegisterNodeManagerRequestPBImpl copy = new RegisterNodeManagerRequestPBImpl(
        original.getProto());

    assertEquals(8080, copy.getHttpPort());
    assertEquals(9090, copy.getNodeId().getPort());
    assertEquals(10000, copy.getResource().getMemory());
    assertEquals(2, copy.getResource().getVirtualCores());

  }

}
  • TestResourceTrackerPBClientImpl:
public class TestResourceTrackerPBClientImpl {

    private static ResourceTracker client;
    private static Server server;
    private final static org.apache.hadoop.yarn.factories.RecordFactory recordFactory = RecordFactoryProvider
            .getRecordFactory(null);

    @BeforeClass
    public static void start() {

        System.out.println("Start client test");

        InetSocketAddress address = new InetSocketAddress(0);
        Configuration configuration = new Configuration();
        ResourceTracker instance = new ResourceTrackerTestImpl();
        server = RpcServerFactoryPBImpl.get().getServer(ResourceTracker.class, instance, address, configuration, null,
                1);
        server.start();

        client = (ResourceTracker) RpcClientFactoryPBImpl.get().getClient(ResourceTracker.class, 1,
                NetUtils.getConnectAddress(server), configuration);

    }

    @AfterClass
    public static void stop() {

        System.out.println("Stop client");

        if (server != null) {
            server.stop();
        }
    }

    /**
     * Test the method registerNodeManager. Method should return a not null
     * result.
     *
     */
    @Test
    public void testResourceTrackerPBClientImpl() throws Exception {
        RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
        assertNotNull(client.registerNodeManager(request));

        ResourceTrackerTestImpl.exception = true;
        try {
            client.registerNodeManager(request);
            fail("there should be YarnException");
        } catch (YarnException e) {
            assertTrue(e.getMessage().startsWith("testMessage"));
        } finally {
            ResourceTrackerTestImpl.exception = false;
        }

    }

    /**
     * Test the method nodeHeartbeat. Method should return a not null result.
     *
     */

    @Test
    public void testNodeHeartbeat() throws Exception {
        NodeHeartbeatRequest request = recordFactory.newRecordInstance(NodeHeartbeatRequest.class);
        assertNotNull(client.nodeHeartbeat(request));

        ResourceTrackerTestImpl.exception = true;
        try {
            client.nodeHeartbeat(request);
            fail("there  should be YarnException");
        } catch (YarnException e) {
            assertTrue(e.getMessage().startsWith("testMessage"));
        } finally {
            ResourceTrackerTestImpl.exception = false;
        }

    }

    public static class ResourceTrackerTestImpl implements ResourceTracker {

        public static boolean exception = false;

        public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest request)
                throws YarnException, IOException {
            if (exception) {
                throw new YarnException("testMessage");
            }
            return recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
        }

        public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException {
            if (exception) {
                throw new YarnException("testMessage");
            }
            return recordFactory.newRecordInstance(NodeHeartbeatResponse.class);
        }

    }
}

4.截图预览

  接下来,我们使用JUnit去测试代码,截图预览如下所示:

  • 对testRegisterNodeManagerRequestPBImpl()方法的一个DEBUG调试

  • testResourceTrackerPBClientImpl()方法的DEBUG调试

  这里由于设置exception的状态为true,在调用registerNodeManager()时,会打印一条测试异常信息。

if (exception) {
  throw new YarnException("testMessage");
}

5.总结

  在学习Hadoop YARN的RPC时,可以先了解Hadoop的RPC机制,这样在接触YARN的RPC的会比较好理解,YARN的RPC只是其中的一部分,后续会给大家分享更多关于YARN的内容。

6.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

时间: 2024-08-25 12:23:19

Hadoop2源码分析-YARN RPC 示例介绍的相关文章

Hadoop2源码分析-HDFS核心模块分析

1.概述 这篇博客接着<Hadoop2源码分析-RPC机制初识>来讲述,前面我们对MapReduce.序列化.RPC进行了分析和探索,对Hadoop V2的这些模块都有了大致的了解,通过对这些模块的研究,我们明白了MapReduce的运行流程以及内部的实现机制,Hadoop的序列化以及它的通信机制(RPC).今天我们来研究另一个核心的模块,那就是Hadoop的分布式文件存储系统——HDFS,下面是今天分享的内容目录: HDFS简述 NameNode DataNode 接下来,我们开始今天的分享

Hadoop2源码分析-准备篇

1.概述 我们已经能够搭建一个高可用的Hadoop平台了,也熟悉并掌握了一个项目在Hadoop平台下的开发流程,基于Hadoop的一些套件我们也能够使用,并且能利用这些套件进行一些任务的开发.在Hadoop的应用级别上,我们接着往后面去研究学习,那就是Hadoop的源码了,作为Hadoop开发人员,我们得去学习和研究Hadoop得实现原理,底层框架的设计,编码的实现过程等等,下面就开始我们今天的Hadoop源码分析之旅. 2.准备 在分析源码之前,我们需要准备好分析源码的环境,以及如何去分析(分

Hadoop2源码分析-Hadoop V2初识

1.概述 在完成分析Hadoop2源码的准备工作后,我们进入到后续的源码学习阶段.本篇博客给大家分享,让大家对Hadoop V2有个初步认识,博客的目录内容如下所示: Hadoop的渊源 Hadoop V2部分项目图 各个包的功能介绍 本篇文章的源码是基于Hadoop-2.6.0来分析,其他版本的Hadoop的源码可以此作为参考分析. 2.Hadoop的渊源 其实,早年Google的核心竞争力是它的计算平台,Google对外公布的论文有一下内容: GoogleCluster Chubby GFS

Java并发包中Semaphore的工作原理、源码分析及使用示例

1. 信号量Semaphore的介绍 我们以一个停车场运作为例来说明信号量的作用.假设停车场只有三个车位,一开始三个车位都是空的.这时如果同时来了三辆车,看门人允许其中它们进入进入,然后放下车拦.以后来的车必须在入口等待,直到停车场中有车辆离开.这时,如果有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开一辆,则又可以放入一辆,如此往复. 在这个停车场系统中,车位是公共资源,每辆车好比一个线程,看门人起的就是信号量的作用.信号量是一个非负整数,表示了当前公共资源的可用数目(在上面的

Hadoop RCFile存储格式详解(源码分析、代码示例)

RCFile RCFile全称Record Columnar File,列式记录文件,是一种类似于SequenceFile的键值对(Key/Value Pairs)数据文件. 关键词:Record.Columnar.Key.Value. RCFile的优势在哪里?适用于什么场景?为了让大家有一个感性的认识,我们来看一个例子. 假设我们有这样一张9行3列的Hive数据表table,以普通的TextFile进行存储, 现在我们需要统计这张数据表的第二列(col2)值为“row5_col2”的出现次数

Hbase源码分析:RPC概况

RPC是hbase中Master,RegionServer和Client三者之间通信交流的纽带.了解hbase的rpc机制能够为通过源码学习hbase奠定良好的基础.因为了解了hbase的rpc机制能够很快通过debug深入理解hbase各种机制(比方说flush,compaction,scan等请求)的流程.同时也便于碰到问题时,通过源码分析找到原因,毕竟源码面前了无秘密. 1,RPC简介 RPC(remote procedure call)即远程过程调用.对于本地调用,定义好一个函数以后,程

dubbo源码分析系列——项目工程结构介绍

摘要 dubbo是目前国内最流行的分布式服务框架,已经俨然成为行业的标准了,多数无自研能力的公司都在使用这个框架,而且这个框架本身非常具有代表性,即使很多公司自研的分布式服务框架也都是对dubbo的扩展或者借鉴了该框架,因此研究它的源码意义还是非常大的,对于掌握分布式服务框架的原理和实现细节有着非常好的帮助. 项目源码地址 本系列文章是基于当当网维护的dubbox版本进行分析的,源码地址参考:https://github.com/dangdangdotcom/dubbox 项目源码结构 我们下载

Hadoop2源码分析-序列化篇

1.概述 上一篇我们了解了MapReduce的相关流程,包含MapReduce V2的重构思路,新的设计架构,与MapReduce V1的区别等内容,今天我们在来学习下在Hadoop V2中的序列化的相关内容,其目录如下所示: 序列化的由来 Hadoop序列化依赖图详解 Writable常用实现类 下面,我们开始学习今天的内容. 2.序列化的由来 我们知道Java语言对序列化提供了非常友好的支持,在定义一个类时,如果我们需要序列化一个类,只需要实现该类的序列化接口即可.场景:让一个AppInfo

hdfs源码分析之RPC

要了解Hadoop的消息通信机制先得了解下java的动态代理 这是一个装饰模式,用动态代理实现的,用静态工场生成被代理的具体实现类.这里是集合Set的代理. 定义泛型方法时必须在前面加一个<T>,来声明这是一个泛型方法,持有一个泛型T,然后才能用泛型T作为函数返回值. InvocationHandler接口是用来拦截代理对象的方法调用的 接下来是代理对象的具体使用 可见使用代理对象后,对set进行了功能升级(包装模式),通过拦截代理对象的方法,完成打日志的功能 今天先写到这里,晚上有空的话在写