Hadoop RPC框架

1、RPC框架概述

1.1 RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

1.2 RPC通常采用客户端服务器模型,其框架主要有以下几部分

  • 通信模块:实现请求应该协议。主要分为同步方式和异步方式。
  • stub程序:客户端和服务器均包含stub程序,可以看做代理程序。使得远程函数表现的跟本地调用一样,对用户程序完全透明。
  • 调度程序:接受来自通信模块的请求消息,根据标识选择stub程序处理。并发量大一般采用线程池处理。
  • 客户程序/服务过程:请求发出者和请求的处理者。

1.3 RPC流程图

2、Hadoop RPC基本框架

2.1Hadoop RPC的使用方法见代码

服务

public interface MyBiz extends VersionedProtocol {

long PROTOCOL_VERSION = 12321443L;

String hello(String name);

}

public class MyBizImpl implements MyBiz {

@Override

public long getProtocolVersion(String arg0, long arg1) throws IOException {

return PROTOCOL_VERSION;

}

@Override

public String hello(String name) {

System. out.println( "invoked");

return "hello " + name;

}

}

服务器

public class MyServer {

public static final String SERVER_ADDRESS = "localhost";

public static final int SERVER_PORT = 12345;

public static void main(String[] args) throws IOException {

Server server = RPC. getServer(new MyBizImpl(), SERVER_ADDRESS, SERVER_PORT , new Configuration());

server.start();

}

}

客户端

public class MyClient {

public static void main(String[] args) throws IOException {

MyBiz proxy = (MyBiz) RPC. getProxy(MyBiz.class, MyBiz.PROTOCOL_VERSION,

new InetSocketAddress(MyServer. SERVER_ADDRESS,MyServer.SERVER_PORT),

new Configuration());

String result = proxy.hello( "5");

System. out.println(result);

RPC.stopProxy(proxy);

}

}

2.2 org.apache.hadoop.ipc.RPC类解析

RPC类主要包含三部分:

  • ClientCache(成员变量):根据用户提供的SocketFactory来缓存Client对象,以便重用Client对象。
  • Server(内部类):继承Server抽象类,利用反射实现了call方法,即客户端请求的方法和对应参数完成方法调用。
  • Invocation(内部类):将要调用的方法名和参数打包成可序列化的对象,方便客户端和服务器之间传递。

2.3 客户端和服务器端的关系

  • Client-NameNode之间,其中NameNode是服务器
  • Client-DataNode之间,其中DataNode是服务器
  • DataNode-NameNode之间,其中NameNode是服务器
  • DataNode-DateNode之间,其中某一个DateNode是服务器,另一个是客户端

2.4 org.apache.hadoop.ipc.Client类解析

2.4.1 Client类中主要包含:

  • Call(内部类):封装了一个RPC请求,包含5个成员变量,唯一表示id、函数调用信息param、函数返回值value、函数异常信息error、函数完成标识done。Hadoop rpc server采用异步方式处理客户端请求,使得远程过程调用的发生顺序和返回顺序无直接关系,而客户端正是通过id识别不同的函数调用。当客户端向服务器发送请求,只需填充id和param两个变量,其余3个变量由服务器端根据函数执行情况填充。
  • Connection(内部类,一个线程):是client和server之间的一个通信连接,封装了连接先关的基本信息和操作。基本信息包括:通信连接唯一标识remoteId(ConnectionId)、与Server端通信的scoket、网络输入输出流in/out、保存RPC请求的哈希表calls(Hashtable<Integer, Call>)。操作包括:addCall将一个Call对象添加到哈希表中;sendParam想服务器端发送RPC请求;receiveResponse从服务器端接收已经处理完成的RPC请求;run调用receiveResponse方法,等待返回结果。
  • ConnectionId(内部类):连接的标记(包括server地址,协议,其他一些连接的配置项信息)
  • ParallelCall(内部类):实现并行调用的请求
  • ParallelResults(内部类):并行调用的执行结果

2.4.2 Client类中主要对外通过两个接口,分别用于单个远程调用和批量远程调用。

public Writable call(Writable param, ConnectionId remoteId)  throws InterruptedException, IOException

public Writable call(Writable param, InetSocketAddress addr,  Class<?> protocol, UserGroupInformation ticket,

int rpcTimeout, Configuration conf)  throws InterruptedException, IOException

2.4.3 调用流程分析,当调用call函数执行某个远程方法时,有以下几个步骤:

1)创建一个Connection对象,并将远程方法调用信息封装成Call对象,放到Connection对象中的哈希表中;

2)调用Connection类中的sendRpcRequest()方法将当前Call对象发送给Server端;

3)Server端处理完RPC请求后,将结果通过网络返回给Client端,Client端通过receiveRpcResponse()函数获取结果;

4)Client检查结果处理状态(成功还是失败),并将对应Call对象从哈希表中删除。

2.4.4 一个Client包含多个连接,private Hashtable<ConnectionId, Connection> connections = new Hashtable<ConnectionId, Connection>();

2.5 org.apache.hadoop.ipc.Server类解析

2.5.1 背景

Hadoop采用了Master/Slave结构,其中Master是整个系统的单点,如NameNode或JobTracker,这是制约系统性能和可扩展性的最关键因素之一;而Master通过ipc.Server接收并处理所有Slave发送的请求,这就要求ipc.Server 将高并发和可扩展性作为设计目标。为此,ipc.Server采用了很多提高并发处理能力的技术,主要包括线程池、事件驱动和Reactor设计模式等,这些技术均采用了JDK自带的库实现,这里重点分析它是如何利用Reactor设计模式提高整体性能的。

2.5.2 reactor设计模式

Reactor是并发编程中的一种基于事件驱动的设计模式,它具有以下两个特点:通过派发/分离I/O操作事件提高系统的并发性能;提供了粗粒度的并发控制,使用单线程实现,避免了复杂的同步处理。典型的Reactor实现原理如图所示。

典型的Reactor模式中主要包括以下几个角色。

  • Reactor:I/O事件的派发者。
  • Acceptor:接受来自Client的连接,建立与Client对应的Handler,并向Reactor注册此Handler。
  • Handler:与一个Client通信的实体,并按一定的过程实现业务的处理。Handler内部往往会有更进一步的层次划分,用来抽象诸如read、decode、compute、encode和send等过程。在Reactor模式中,业务逻辑被分散的I/O事件所打破,所以Handler需要有适当的机制在所需的信息还不全(读到一半)的时候保存上下文,并在下一次I/O事件到来的时候(另一半可读)能继续上次中断的处理。
  • Reader/Sender:为了加速处理速度,Reactor模式往往构建一个存放数据处理线程的线程池,这样数据读出后,立即扔到线程池中等待后续处理即可。为此,Reactor模式一般分离Handler中的读和写两个过程,分别注册成单独的读事件和写事件,并由对应的Reader和Sender线程处理。

2.5.3 java nio代码实例

package com.sohu.tv.nio;

import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.ServerSocketChannel;

import java.nio.channels.SocketChannel;

import java.util.Iterator;

/**

* NIO服务端

* @author 小路

*/

public class NIOServer {

//通道管理器

private Selector selector;

/**

* 获得一个ServerSocket通道,并对该通道做一些初始化的工作

* @param port  绑定的端口号

* @throws IOException

*/

public void initServer(int port) throws IOException {

// 获得一个ServerSocket通道

ServerSocketChannel serverChannel = ServerSocketChannel.open();

// 设置通道为非阻塞

serverChannel.configureBlocking(false);

// 将该通道对应的ServerSocket绑定到port端口

serverChannel.socket().bind(new InetSocketAddress(port));

// 获得一个通道管理器

this.selector = Selector.open();

//将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后,

//当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。

serverChannel.register(selector, SelectionKey.OP_ACCEPT);

}

/**

* 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理

* @throws IOException

*/

@SuppressWarnings("unchecked")

public void listen() throws IOException {

System.out.println("服务端启动成功!");

// 轮询访问selector

while (true) {

//当注册的事件到达时,方法返回;否则,该方法会一直阻塞

selector.select();

// 获得selector中选中的项的迭代器,选中的项为注册的事件

Iterator ite = this.selector.selectedKeys().iterator();

while (ite.hasNext()) {

SelectionKey key = (SelectionKey) ite.next();

// 删除已选的key,以防重复处理

ite.remove();

// 客户端请求连接事件

if (key.isAcceptable()) {

ServerSocketChannel server = (ServerSocketChannel) key

.channel();

// 获得和客户端连接的通道

SocketChannel channel = server.accept();

// 设置成非阻塞

channel.configureBlocking(false);

//在这里可以给客户端发送信息哦

channel.write(ByteBuffer.wrap(new String("向客户端发送了一条信息").getBytes()));

//在和客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限。

channel.register(this.selector, SelectionKey.OP_READ);

// 获得了可读的事件

} else if (key.isReadable()) {

read(key);

}

}

}

}

/**

* 处理读取客户端发来的信息 的事件

* @param key

* @throws IOException

*/

public void read(SelectionKey key) throws IOException{

// 服务器可读取消息:得到事件发生的Socket通道

SocketChannel channel = (SocketChannel) key.channel();

// 创建读取的缓冲区

ByteBuffer buffer = ByteBuffer.allocate(10);

channel.read(buffer);

byte[] data = buffer.array();

String msg = new String(data).trim();

System.out.println("服务端收到信息:"+msg);

ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());

channel.write(outBuffer);// 将消息回送给客户端

}

/**

* 启动服务端测试

* @throws IOException

*/

public static void main(String[] args) throws IOException {

NIOServer server = new NIOServer();

server.initServer(8000);

server.listen();

}

}

package com.sohu.tv.nio;

import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.SocketChannel;

import java.util.Iterator;

/**

* NIO客户端

* @author 小路

*/

public class NIOClient {

//通道管理器

private Selector selector;

/**

* 获得一个Socket通道,并对该通道做一些初始化的工作

* @param ip 连接的服务器的ip

* @param port  连接的服务器的端口号

* @throws IOException

*/

public void initClient(String ip,int port) throws IOException {

// 获得一个Socket通道

SocketChannel channel = SocketChannel.open();

// 设置通道为非阻塞

channel.configureBlocking(false);

// 获得一个通道管理器

this.selector = Selector.open();

// 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中调

//用channel.finishConnect();才能完成连接

channel.connect(new InetSocketAddress(ip,port));

//将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件。

channel.register(selector, SelectionKey.OP_CONNECT);

}

/**

* 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理

* @throws IOException

*/

@SuppressWarnings("unchecked")

public void listen() throws IOException {

// 轮询访问selector

while (true) {

selector.select();

// 获得selector中选中的项的迭代器

Iterator ite = this.selector.selectedKeys().iterator();

while (ite.hasNext()) {

SelectionKey key = (SelectionKey) ite.next();

// 删除已选的key,以防重复处理

ite.remove();

// 连接事件发生

if (key.isConnectable()) {

SocketChannel channel = (SocketChannel) key

.channel();

// 如果正在连接,则完成连接

if(channel.isConnectionPending()){

channel.finishConnect();

}

// 设置成非阻塞

channel.configureBlocking(false);

//在这里可以给服务端发送信息哦

channel.write(ByteBuffer.wrap(new String("向服务端发送了一条信息").getBytes()));

//在和服务端连接成功之后,为了可以接收到服务端的信息,需要给通道设置读的权限。

channel.register(this.selector, SelectionKey.OP_READ);

// 获得了可读的事件

} else if (key.isReadable()) {

read(key);

}

}

}

}

/**

* 处理读取服务端发来的信息 的事件

* @param key

* @throws IOException

*/

public void read(SelectionKey key) throws IOException{

//和服务端的read方法一样

}

/**

* 启动客户端测试

* @throws IOException

*/

public static void main(String[] args) throws IOException {

NIOClient client = new NIOClient();

client.initClient("localhost",8000);

client.listen();

}

}

2.5.4 server处理流程

ipc.Server的主要功能是接收来自客户端的RPC请求,经过调用相应的函数获取结果后,返回给对应的客户端。为此,ipc.Server被划分成3个阶段:接收请求、处理请求和返回结果。

(1)接收请求

该阶段主要任务是接收来自各个客户端的RPC请求,并将它们封装成固定的格式(Call类)放到一个共享队列(callQueue)中,以便进行后续处理。该阶段内部又分为建立连接和接收请求两个子阶段,分别由Listener和Reader两种线程完成。

整个Server只有一个Listener线程,统一负责监听来自客户端的连接请求,一旦有新的请求到达,它会采用轮询的方式从线程池中选择一个Reader线程进行处理,而Reader线程可同时存在多个,它们分别负责接收一部分客户端连接的RPC请求,至于每个Reader线程负责哪些客户端连接,完全由Listener决定,当前Listener只是采用了简单的轮询分配机制。

Listener和Reader线程内部各自包含一个Selector对象,分别用于监听SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件。对于Listener线程,主循环的实现体是监听是否有新的连接请求到达,并采用轮询策略选择一个Reader线程处理新连接;对于Reader线程,主循环的实现体是监听(它负责的那部分)客户端连接中是否有新的RPC请求到达,并将新的RPC请求封装成Call对象,放到共享队列callQueue中。

(2)处理请求

该阶段主要任务是从共享队列callQueue中获取Call对象,执行对应的函数调用,并将结果返回给客户端,这全部由Handler线程完成。

Server端可同时存在多个Handler线程,它们并行从共享队列中读取Call对象,经执行对应的函数调用后,将尝试着直接将结果返回给对应的客户端。但考虑到某些函数调用返回结果很大或者网络速度过慢,可能难以将结果一次性发送到客户端,此时Handler将尝试着将后续发送任务交给Responder线程。

(3)返回结果

前面提到,每个Handler线程执行完函数调用后,会尝试着将执行结果返回给客户端,但对于特殊情况,比如函数调用返回结果过大或者网络异常情况(网速过慢),会将发送任务交给Responder线程。

Server端仅存在一个Responder线程,它的内部包含一个Selector对象,用于监听SelectionKey.OP_WRITE事件。当Handler没能将结果一次性发送到客户端时,会向该Selector对象注册SelectionKey.OP_WRITE事件,进而由Responder线程采用异步方式继续发送未发送完成的结果。

时间: 2024-11-03 05:28:59

Hadoop RPC框架的相关文章

Hadoop RPC机制-原理篇

RPC是Hadoop的基础组件,提供分布式环境下的对象调用功能.之前用了三天时间分析与测试RPC,目的是想弄清楚它的整个运行机制. 概括的说,RPC采用客户机/服务器模式.请求程序就是一个客户机,而服务提供程序就是一个服务器.首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息.在服务器端,进程保持睡眠状态直到调用信息的到达为止.当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用

Hadoop学习笔记—3.Hadoop RPC机制的使用

一.RPC基础概念 1.1 RPC的基础概念 RPC,即Remote Procdure Call,中文名:远程过程调用: (1)它允许一台计算机程序远程调用另外一台计算机的子程序,而不用去关心底层的网络通信细节,对我们来说是透明的.因此,它经常用于分布式网络通信中. RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据.在OSI网络通信模型中,RPC跨越了传输层和应用层.RPC使得开发包括网络分布式多程序在内的应用程序更加容易. (2)Hadoop的进程间交互都是通过R

RPC框架研究(二)Hadoop源代码-1

报名了阿里中间件性能大赛,我来说是一个全新的挑战.一切从空白学起,比赛的过程也是学习的过程 是的.想让自己学好.给自己报一个比赛吧~ 就像当初学围棋,也是报了围棋比赛,为了不至于输的太慘.一个星期里学了好多东西 第二天 Hadoop源代码-1 小雨 天真的以为学了Java回调机制后就能够把原来的RPC框架改为异步调用了,结果对着代码一下午都没想出要怎么去改,怎么入手. 于是决定研究一下Hadoop的源代码,看看别人是怎么实现RPC的,这也是我第一次研究源代码,曾经都是仅仅管用.无论怎样实现. 使

3 weekend110的hadoop中的RPC框架实现机制 + hadoop中的RPC应用实例demo

hadoop中的RPC框架实现机制 RPC是Remotr Process Call, 进程间的远程过程调用,不是在一个jvm里. 即,Controller拿不到Service的实例对象. hadoop中的RPC应用实例demo 在windows是调用端,在linux里是服务端. 在这里,需要LoginServiceinterface.java 停止 出错误了,很明显. 这是个很好的思考题?

Hadoop 源代码分析(五)RPC 框架

介绍完org.apache.hadoop.io 以后,我们开始来分析org.apache.hadoop.rpc.RPC 采用客户机/服务器模式.请求程序就是一个客户机,而服务提供程序就是一个服务器.当我们讨论HDFS 的,通信可能发生在: Client-NameNode 之间,其中NameNode 是服务器 Client-DataNode 之间,其中DataNode 是服务器 DataNode-NameNode 之间,其中NameNode 是服务器 DataNode-DateNode 之间,其中

Hadoop的RPC框架介绍

为什么会引入RPC: RPC采用客户机/服务器模式.请求程序就是一个客户机,而服务提供程序就是一个服务器.当我们讨论HDFS的,通信可能发生在: Client-NameNode之间,其中NameNode是服务器 Client-DataNode之间,其中DataNode是服务器 DataNode-NameNode之间,其中NameNode是服务器 DataNode-DateNode之间,其中某一个DateNode是服务器,另一个是客户端 如果我们考虑Hadoop的Map/Reduce以后,这些系统

Hadoop RPC简单例子

jdk中已经提供了一个RPC框架-RMI,但是该PRC框架过于重量级并且可控之处比较少,所以Hadoop RPC实现了自定义的PRC框架. 同其他RPC框架一样,Hadoop RPC分为四个部分: (1)序列化层:Clent与Server端通信传递的信息采用了Hadoop里提供的序列化类或自定义的Writable类型: (2)函数调用层:Hadoop RPC通过动态代理以及java反射实现函数调用: (3)网络传输层:Hadoop RPC采用了基于TCP/IP的socket机制: (4)服务器端

初见akka-02:rpc框架

1.RPC:简单点说,就是多线程之间的通信,我们今天用了scala以及akka 来简单的实现了 rpc框架的一些简单的内容,一脸包括了,心跳,间隔时间, 注册以及一些问题, 模式匹配的一些东西,虽然比较简单,但是属于麻雀虽小,五脏俱全 这个里面一共有有四个文件: Master.scala RemoteMessage.scala Worker.scala WorkerInfo Master.scala package cn.wj.rpc import akka.actor.{Actor, Acto

Hadoop RPC通信Client客户端的流程分析

Hadoop的RPC的通信与其他系统的RPC通信不太一样,作者针对Hadoop的使用特点,专门的设计了一套RPC框架,这套框架个人感觉还是有点小复杂的.所以我打算分成Client客户端和Server服务端2个模块做分析.如果你对RPC的整套流程已经非常了解的前提下,对于Hadoop的RPC,你也一定可以非常迅速的了解的.OK,下面切入正题. Hadoop的RPC的相关代码都在org.apache.hadoop.ipc的包下,首先RPC的通信必须遵守许多的协议,其中最最基本的协议即使如下: /**