细水长流Hadoop源码分析(3)RPC Server初始化构造

声明:个人原创,转载请注明出处。文中引用了一些网上或书里的资料,如有不妥之处请告之。

本文是我阅读Hadoop 0.20.2第二遍时写的笔记,在阅读过程中碰到很多问题,最终通过各种途径解决了大部分。Hadoop整个系统设计精良,源码值得学习分布式的同学们阅读,以后会将所有笔记一一贴出,希望能方便大家阅读源码,少走弯路。



目录

4 RPC服务器(org.apache.hadoop,ipc.Server)

4.1 服务器初始化


4 RPC服务器(org.apache.hadoop,ipc.Server)

4.1 服务器初始化

在org.apache.hadoop.hdfs.server.namenode.NameNode.initialize方法里有创建和启动RPC服务器的代码,所以RPC服务器的初始化从这个方法开始:

this.server = RPC.getServer(this, socAddr.getHostName(), socAddr.getPort(),
                                 handlerCount, false, conf);
this.server.start();  //start RPC server

PPC.getServer方法返回Server类型的对象,实际返回的类型是RPC.Server类的对象,这就是RPC服务器。实际上,RPC.getServer方法将参数原封不动地传给RPC.Server的构造方法。在继续跟踪RPC.Server对象的构造方法之前,先来看看RPC.getServer方法的参数:


RPC.getServer参数



说明


Object instance


this = NameNode类对象


NameNode实现了众多协议接口,可作为RPC服务器实例


String bindAddress


socAddr.getHostName()=localhost


NameNode RPC服务器监听IP地址


int port


socAddr.getPort()=9000


NameNode RPC服务器监听端口号


numHandlers


handlerCount=10


RPC服务器中的Handler线程数


verbose


false


是否对每次远程调用记录日志,这里表示不记录


conf


conf


全局配置

RPC.Server类相对于其父类增加了三个数据成员:instance、verbose和authorize。instance指实现协议接口的实例,verbose指是否将每次远程过程调用记录到日志,authorize指是否对每次远程过程调用执行权限检查。默认情况下,verbose和authorize均设为false,表示不记录日志,也不进行权限验证,这就是Hadoop不安全的地方。这是RPC.Server类的构造方法:

public Server(Object instance, Configuration conf, String bindAddress,
    int port,int numHandlers, boolean verbose) throws IOException {

    super(bindAddress,port,Invocation.class,
        numHandlers, conf, classNameBase(instance.getClass().getName()));
    this.instance = instance;
    this.verbose = verbose;
    this.authorize = conf.getBoolean(
        ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false);
}

这个构造方法里除了初始化自己的三个成员之外,就是调用父类的构造方法了。向父类的构造方法传递的参数有两个新参数,一个是Invocation.class,表示远程调用的参数数据类型,另一个是instance的类名称。这里之所以对instance的类名称进行classNameBase操作,是因为调用Class<?>.getName方法可能会得到java.lang.String这样的名称,所以要取这个名称的最后一部分。

org.apache.hadoop.ipc.Server类是RPC服务器的核心,其重要的数据成员如下:


成员


说明


static ThreadLocal<Server> SERVER


表示当前正在运行的RPC Server的对象,也就是Listener、Responder或Handler(这些都是内部类)的run方法设置的Server.this(这种“类名.this”用于内部类引用外部类对象时,内部类的this指自身对象)。

Hadoop允许不同线程运行不同的RPC Server,所以对于静态成员,又不能让线程间共享,就要加个ThreadLocal声明其为线程局部变量了


static ConcurrentHashMap<String, Class<?>> PROTOCOL_CACHE


协议接口类的缓冲。RPC客户端的Connection发送过来的是字符串类型的协议名称,Server再将之转换成协议类,此映射表存放“协议名称 -> 协议类”的映射。

在Server类里对PROTOCOL_CACHE进行get和put操作的方法只有getProtocolClass,此方,类中没有对getProtocolClass进行同步,调用时也没有在PROTOCOL_CACHE上同步,所以ConcurrentHashMap的使用并不是为了同步此HashMap的get和put操作,实际上这里的多次put都没有关系,因为不可能发生同名的协议而不是相同的协议。PROTOCOL_CACHE会由不同线程操作,所以要使用ConcurrentHashMap对HashMap内部的数据对象进行保护

ConcurrentHashMap请参考2.3.4。


static ThreadLocal<Call> CurCall


当前正在进行处理的方法调用,在Handler线程的run方法里会用到。这里使用ThreadLocal理由同SERVER,是为了使RPC服务器支持多线程


String bindAddress


RPC服务器监听地址,如localhost


int port


RPC服务器监听端口,如9000


int handlerCount


Handler线程的数量


Class<? extends Writable> paramClass


客户端发送的包含远程调用名称、参数类型列表和参数列表的封装类型,一般是org.apache.hadoop.ipc.RPC.Invocation


BlockingQueue<Call> callQueue


Listener填充此callQueue,读取所有连接的所有方法调用,然后交由Handler并行处理,所以这里使用线程安全的BlockingQueue

BlockingQueue请参考2.3.5。


Collections.synchronizedList(new LinkedList<Connection>()) connectionList


这里的connectionList与ConcurrentHashMap类似,同样是List内部数据结构安全的,但对于自己的业务逻辑,还是要自己同步,请参考2.3.6。

connectionList会在运行Server的线程和Listener的线程内调用,多个线程访问共享资源,当然要求安全的集合。

此外,当使用connectionList时,还要自己对某些操作同步,实际上在使用connectionList的代码里就用synchronized(connectionList)进行操作的同步


Listener listener


Listener线程,用于监听来自RPC客户端的连接和方法调用数据的线程


Responder responder


Responder线程,用于发送方法调用返回值的线程


int numConnections


当前RPC服务器有多少个Connection


Handler[] handlers


Handler线程,调用远程方法调用,并处理方法调用返回值的线程

由这些重要的数据成员可知,一个Server会保存若干个Server.Connection、一个Listener线程、一个Responder线程,和很多Handler线程。

现在可以看看org.apache.hadoop.ipc.Server的构造方法如何对这些数据成员进行设置,以及如何构造一个RPC服务器了。Server的构造方法的代码如下:

protected Server(String bindAddress, int port,
                  Class<? extends Writable> paramClass, int handlerCount,
                  Configuration conf, String serverName)
    throws IOException {
    this.bindAddress = bindAddress;
    this.conf = conf;
    this.port = port;
    this.paramClass = paramClass;
    this.handlerCount = handlerCount;
    this.socketSendBufferSize = 0;
    this.maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
    this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize);
    this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
    this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
    this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);

    // Start the listener here and let it bind to the port
    listener = new Listener();
    this.port = listener.getAddress().getPort();
    this.rpcMetrics = new RpcMetrics(serverName,
                          Integer.toString(this.port), this);
this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);

    // Create the responder here
    responder = new Responder();
}

这里最主要的操作是Listener线程的构造,this.port端口的获取和Responder线程的构造,其它Server成员的初始化内容都是显而易见的。

Server.Listener类是一个线程类,用于监听客户端的的连接和读取客户端发送过来的ConnectionHeader、远程过程调用数据。Listener使用了NIO管理Socket连接和数据传输,因此在整个RPC服务器里只需一个线程即可处理来自客户端的所有连接和读数据请求。对于使用Selector选择器对连接进行管理的程序来说,Selector对象仅需一个,传统Socket服务器会有一个ServerSocket接收许多Socket连接请求,为每个连接建立一个线程用于对此Socket连接的处理,而Selector最好只有一个,因为Selector可高效地处理大量的Socket操作,多个Selector放在多个线程增加了同步负担,线程切换也带来了开销。

如上所述,Listener负责监听和建立Socket连接,读取远程过调用数据全处理了,这肯定会加入Listener的负担,使Listener线程成为瓶颈,实际上现在Hadoop开发社区也在讨论这个问题。

Listener读取方法之后会把方法放入Server.callQueue中,由Handler进行实际的方法调用,得到的结果再由Server.Responder处理返回给客户端。

在开始描述Listener的构造过程之前,先来看看Listener的重要数据成员:


成员


说明


ServerSocketChannel acceptChannel


服务器Socket


Selector selector


管理服务器Socket,服务器Socket创建的Socket


InetSocketAddress address


监听地址,实际上就是RPC服务器地址localhost:9000

下面就是Listener的构造方法:

public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
      // Create a new server socket and set to non blocking mode
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);

      // Bind the server socket to the local host and port
      bind(acceptChannel.socket(), address, backlogLength);
      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
      // create a selector;
      selector= Selector.open();

      // Register accepts on the server socket with the selector.
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
      this.setName("IPC Server listener on " + port);
      this.setDaemon(true);
    }

这里调用ServerSocketChannel.open方法获取一个带通道的服务器Socket,赋值给this.acceptChannel,这就是RPC服务器监听客户端连接和接收数据的服务器Socket。然后,绑定acceptChannel到指定的RPC服务器地址,比如localhost:9000。

如果想把ServerSocketChannel对象注册到Selector上,必须设置ServerSocketChannel对象是非阻塞的。另外,SocketChannel要注册到Selector上,也必须配置成非阻塞的。非阻塞的Socket意味着,ServerSocket的accept操作和Socket的read不论有没有连接,不论有没有数据可读,都会立即返回。配置Selector选择器这种非阻塞的工作方法完全没有问题,因为Selector总是等到有操作请求,才会提示用户线程。

然后是调用Selector.open方法构造一个选择器selector,之后将acceptChannel的SelectionKey.OP_ACCEPT操作,也就是ServerSocket的accept操作注册到此选择器上。致此,完成Listener的构造。

Listener有一个将acceptChannel绑定的操作,这一操作过后,acceptChannel可能会因为端口冲突而自动换用一个可用地址,所以这里要保存新的端口号到Server.port。

Server的构造方法构造完Listener之后开始构造Server.Responder对象,Server.Responder的功能是将远程过程调用返回值或出错信息发送回RPC客户端。同Listener一样,Responder也只用了一个线程来处理所有发送到客户端的方用调用返回结果,因为Responder也用了NIO。

Server.Responder只有两个成员,分别是Selector类型的writeSelector选择器和int类型的pending。writeSelector管理所有向RPC客户端发送数据的输出流通道,pending指当前等待要处理返回值的远程过程调用个数。Responder的构造方法仅仅调用Selector.open方法构造了一个选择器赋值给writeSelect,并将pending初始化为0。

再次回到本节最开始的那段代码,当调用getServer方法返回一个RPC.Server对象时RPC构造经历了上述过程,第二行代码就是调用this.server.start方法,RPC.Sever没定义这个方法,这个方法是在Server类里定义的。这个start方法很简单:启动responder线程、启动listener线程、创建handlerCount数量的Handler线程,并一一启动它们。此方法前加了synchronized关键字,因此此方法是线程安全的。

这些线程启动之后整个RPC服务器就开始工作了,因此,所有的工作都在线程Responder、Listener和Handler里完成。

对于RPC服务器,首先工作的肯定是Server.Listener,因为必须先accept一个客户端的Socket连接,接收客户端发来的一个远程方法调用数据,其它像Handler线程才能执行方法调用,Responder接着才能将方法调用的结果发送给客户端。我的源代码分析应尽量顺着执行流程走。所以应该首先分析Listener线程,再分析Handler线程,最后是Responder线程,这就是下一节的内容。

(全文完)

细水长流Hadoop源码分析(3)RPC Server初始化构造,布布扣,bubuko.com

时间: 2024-10-04 17:39:13

细水长流Hadoop源码分析(3)RPC Server初始化构造的相关文章

hadoop源码分析解读入门

hadoop 源代码分析(一) Google 的核心竞争技术是它的计算平台.HadoopGoogle的大牛们用了下面5篇文章,介绍了它们的计算设施. Google的几篇论文 GoogleCluster:http://research.google.com/archive/googlecluster.html Chubby:http://labs.google.com/papers/chubby.html GFS:http://labs.google.com/papers/gfs.html Big

Hadoop源码分析(2)——Configuration类

这篇文章主要介绍Hadoop的系统配置类Configuration. 接着上一篇文章介绍,上一篇文章中Hadoop Job的main方法为: public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new CalculateSumJob(),args); System.exit(res); } 其中ToolRunner.run方法传入的第一个变量

Hadoop源码分析—— Job任务的程序入口

这篇文章大致介绍Hadoop Job的程序是如何启动的. 通常用Java编写的Hadoop MapReduce程序是通过一个main方法作为程序的整个入口,如下: public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new CalculateSumJob(),args); System.exit(res);} 可以看到这个Job任务的MapR

《深入理解SPARK:核心思想与源码分析》——SparkContext的初始化(中)

<深入理解Spark:核心思想与源码分析>一书前言的内容请看链接<深入理解SPARK:核心思想与源码分析>一书正式出版上市 <深入理解Spark:核心思想与源码分析>一书第一章的内容请看链接<第1章 环境准备> <深入理解Spark:核心思想与源码分析>一书第二章的内容请看链接<第2章 SPARK设计理念与基本架构> 由于本书的第3章内容较多,所以打算分别开辟三篇随笔分别展现. <深入理解Spark:核心思想与源码分析>一

【spring源码分析】IOC容器初始化(总结)

前言:在经过前面十二篇文章的分析,对bean的加载流程大致梳理清楚了.因为内容过多,因此需要进行一个小总结. 经过前面十二篇文章的漫长分析,终于将xml配置文件中的bean,转换成我们实际所需要的真正的bean对象. 总结 [spring源码分析]IOC容器初始化(一):主要分析了Spring是如何解析占位符以及BeanFactory的最终实现类DefaultListableBeanFactory. [spring源码分析]IOC容器初始化(二):以loadBeanDefinitions函数为切

Hadoop源码分析之Map输入

对于MapReduce的输入输出Hadoop的官网如下所示 Input and Output types of a MapReduce job: (input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output) 这里将从源码分析 input <k1,v1>->map 的过程, Mapper 基

【Canal源码分析】Canal Server的启动和停止过程

本文主要解析下canal server的启动过程,希望能有所收获. 一.序列图 1.1 启动 1.2 停止 二.源码分析 整个server启动的过程比较复杂,看图难以理解,需要辅以文字说明. 首先程序的入口在CanalLauncher的main方法中. 2.1 加载配置文件 String conf = System.getProperty("canal.conf", "classpath:canal.properties"); Properties properti

Nouveau源码分析(六):NVIDIA设备初始化之nouveau_drm_load (3)

Nouveau源码分析(六) 上一篇中我们暂时忽略了两个函数,第一个是用于创建nvif_device对应的nouveau_object的ctor函数: // /drivers/gpu/drm/nouveau/core/engine/device/base.c 488 static struct nouveau_ofuncs 489 nouveau_devobj_ofuncs = { 490 .ctor = nouveau_devobj_ctor, 491 .dtor = nouveau_devo

Nouveau源码分析(三):NVIDIA设备初始化之nouveau_drm_probe

Nouveau源码分析(三) 向DRM注册了Nouveau驱动之后,内核中的PCI模块就会扫描所有没有对应驱动的设备,然后和nouveau_drm_pci_table对照. 对于匹配的设备,PCI模块就调用对应的probe函数,也就是nouveau_drm_probe. // /drivers/gpu/drm/nouveau/nouveau_drm.c 281 static int nouveau_drm_probe(struct pci_dev *pdev, 282 const struct