hadoop rpc principle

一.RPC协议

在分析协议之前,我觉得我们很有必要先搞清楚协议是什么。下面我就谈一点自己的认识吧。如果你学过java的网络编程,你一定知道:当客户端发送一个字节
给服务端时,服务端必须也要有一个读字节的方法在阻塞等待;反之亦然。
这种我把它称为底层的通信协议。可是对于一个大型的网络通信系统来说,很显然这种说法的协议粒度太小,不方便我们理解整个网络通信的流程及架构,所以我造
了个说法:架构层次的协议。通俗一点说,就是我把某些接口和接口中的方法称为协议,客户端和服务端只要实现这些接口中的方法就可以进行通信了,从这个角度
来说,架构层次协议的说法就可以成立了(注:如果从架构层次的协议来分析系统,我们就先不要太在意方法的具体实现,呵呵,我相信你懂得~)。

Hadoop的RPC机制正是采用了这种“架构层次的协议”,有一整套作为协议的接口。如图:

下面就几个重点的协议介绍一下吧:

VersionedProtocol
:它是所有RPC协议接口的父接口,其中只有一个方法:getProtocolVersion()

(1)HDFS相关

ClientDatanodeProtocol
:一个客户端和datanode之间的协议接口,用于数据块恢复
ClientProtocol
:client与Namenode交互的接口,所有控制流的请求均在这里,如:创建文件、删除文件等;
DatanodeProtocol
: Datanode与Namenode交互的接口,如心跳、blockreport等;
NamenodeProtocol
:SecondaryNode与Namenode交互的接口。

(2)Mapreduce相关

InterDatanodeProtocol
:Datanode内部交互的接口,用来更新block的元数据;
InnerTrackerProtocol
:TaskTracker与JobTracker交互的接口,功能与DatanodeProtocol相似;
JobSubmissionProtocol
:JobClient与JobTracker交互的接口,用来提交Job、获得Job等与Job相关的操作;
TaskUmbilicalProtocol
:Task中子进程与母进程交互的接口,子进程即map、reduce等操作,母进程即TaskTracker,该接口可以回报子进程的运行状态(词汇扫盲: umbilical 脐带的, 关系亲密的) 。

一下子罗列了这么多的协议,有些人可能要问了,hadoop是怎么使用它们的
呢?呵呵,不要着急哦,其实本篇博客所分析的是hadoop的RPC机制底层的具体实现,而这些协议却是应用层上的东西,比如hadoop是怎么样保持
“心跳”的啊。所以在我的下一篇博客:源码级分析hadoop的心跳机制中会详细说明以上协议是怎样被使用的。尽请期待哦~。现在就开始我们的RPC源码
之旅吧•••

二.ipc.RPC源码分析

ipc.RPC类中有一些内部类,为了大家对RPC类有个初步的印象,就先罗列几个我们感兴趣的分析一下吧:

Invocation
:用于封装方法名和参数,作为数据传输层,相当于VO吧。
ClientCache
:用于存储client对象,用socket factory作为hash key,存储结构为hashMap <SocketFactory, Client>。
Invoker
:是动态代理中的调用实现类,继承了InvocationHandler.
Server
:是ipc.Server的实现类。

从以上的分析可以知道,Invocation类仅作为VO,ClientCache类只是作为缓存,而Server类用于服务端的处理,他们都和客户端的数据流和业务逻辑没有关系。现在就只剩下Invoker类了。如果你对动态代理(参考:
http://weixiaolu.iteye.com/blog/1477774

)比较了解的话,你一下就会想到,我们接下来去研究的就是RPC.Invoker类中的invoke()方法了。代码如下:

代码一:

Java代码  

  1. public Object invoke(Object proxy, Method method, Object[] args)
  2. throws Throwable {
  3. •••
  4. ObjectWritable value = (ObjectWritable)
  5. client.call(new Invocation(method, args), remoteId);
  6. •••
  7. return value.get();
  8. }

呵呵,如果你发现这个invoke()方法实现的有些奇怪的话,那你就对了。一
般我们看到的动态代理的invoke()方法中总会有 method.invoke(ac, arg); 
这句代码。而上面代码中却没有,这是为什么呢?其实使用 method.invoke(ac, arg);
是在本地JVM中调用;而在hadoop中,是将数据发送给服务端,服务端将处理的结果再返回给客户端,所以这里的invoke()方法必然需要进行网络
通信。而网络通信就是下面的这段代码实现的:

代码二:

Java代码  

  1. ObjectWritable value = (ObjectWritable)
  2. client.call(new Invocation(method, args), remoteId);

Invocation类在这里封装了方法名和参数,充当VO。其实这里网络通信
只是调用了Client类的call()方法。那我们接下来分析一下ipc.Client源码吧。不过在分析ipc.Client源码之前,为了不让我们
像盲目的苍蝇一样乱撞,我想先确定一下我们分析的目的是什么,我总结出了三点需要解决的问题:

1. 客户端和服务端的连接是怎样建立的?
2. 客户端是怎样给服务端发送数据的?
3. 客户端是怎样获取服务端的返回数据的?

基于以上三个问题,我们开始吧!!!

三.ipc.Client源码分析

同样,为了对Client类有个初步的了解,我们也先罗列几个我们感兴趣的内部类:

Call
:用于封装Invocation对象,作为VO,写到服务端,同时也用于存储从服务端返回的数据
Connection
:用以处理远程连接对象。继承了Thread
ConnectionId
:唯一确定一个连接

问题1:客户端和服务端的连接是怎样建立的?

下面我们来看看Client类中的cal()方法吧:

代码三:

Java代码  

  1. public Writable call(Writable param, ConnectionId remoteId)
  2. throws InterruptedException, IOException {
  3. Call call = new Call(param);       //将传入的数据封装成call对象
  4. Connection connection = getConnection(remoteId, call);   //获得一个连接
  5. connection.sendParam(call);     // 向服务端发送call对象
  6. boolean interrupted = false;
  7. synchronized (call) {
  8. while (!call.done) {
  9. try {
  10. call.wait(); // 等待结果的返回,在Call类的callComplete()方法里有notify()方法用于唤醒线程
  11. } catch (InterruptedException ie) {
  12. // 因中断异常而终止,设置标志interrupted为true
  13. interrupted = true;
  14. }
  15. }
  16. if (interrupted) {
  17. Thread.currentThread().interrupt();
  18. }
  19. if (call.error != null) {
  20. if (call.error instanceof RemoteException) {
  21. call.error.fillInStackTrace();
  22. throw call.error;
  23. } else { // 本地异常
  24. throw wrapException(remoteId.getAddress(), call.error);
  25. }
  26. } else {
  27. return call.value; //返回结果数据
  28. }
  29. }
  30. }

具体代码的作用我已做了注释,所以这里不再赘述。但到目前为止,你依然不知道RPC机制底层的网络连接是怎么建立的。呵呵,那我们只好再去深究了,分析代码后,我们会发现和网络通信有关的代码只会是下面的两句了:

代码四:

Java代码  

  1. Connection connection = getConnection(remoteId, call);   //获得一个连接
  2. connection.sendParam(call);      // 向服务端发送call对象

先看看是怎么获得一个到服务端的连接吧,下面贴出ipc.Client类中的getConnection()方法。

代码五:

Java代码  

  1. private Connection getConnection(ConnectionId remoteId,
  2. Call call)
  3. throws IOException, InterruptedException {
  4. if (!running.get()) {
  5. // 如果client关闭了
  6. throw new IOException("The client is stopped");
  7. }
  8. Connection connection;
  9. //如果connections连接池中有对应的连接对象,就不需重新创建了;如果没有就需重新创建一个连接对象。
  10. //但请注意,该//连接对象只是存储了remoteId的信息,其实还并没有和服务端建立连接。
  11. do {
  12. synchronized (connections) {
  13. connection = connections.get(remoteId);
  14. if (connection == null) {
  15. connection = new Connection(remoteId);
  16. connections.put(remoteId, connection);
  17. }
  18. }
  19. } while (!connection.addCall(call)); //将call对象放入对应连接中的calls池,就不贴出源码了
  20. //这句代码才是真正的完成了和服务端建立连接哦~
  21. connection.setupIOstreams();
  22. return connection;
  23. }

如果你还有兴趣继续分析下去,那我们就一探建立连接的过程吧,下面贴出Client.Connection类中的setupIOstreams()方法:

代码六:

Java代码  

  1. private synchronized void setupIOstreams() throws InterruptedException {
  2. •••
  3. try {
  4. •••
  5. while (true) {
  6. setupConnection();  //建立连接
  7. InputStream inStream = NetUtils.getInputStream(socket);     //获得输入流
  8. OutputStream outStream = NetUtils.getOutputStream(socket);  //获得输出流
  9. writeRpcHeader(outStream);
  10. •••
  11. this.in = new DataInputStream(new BufferedInputStream
  12. (new PingInputStream(inStream)));   //将输入流装饰成DataInputStream
  13. this.out = new DataOutputStream
  14. (new BufferedOutputStream(outStream));   //将输出流装饰成DataOutputStream
  15. writeHeader();
  16. // 跟新活动时间
  17. touch();
  18. //当连接建立时,启动接受线程等待服务端传回数据,注意:Connection继承了Tread
  19. start();
  20. return;
  21. }
  22. } catch (IOException e) {
  23. markClosed(e);
  24. close();
  25. }
  26. }

再有一步我们就知道客户端的连接是怎么建立的啦,下面贴出Client.Connection类中的setupConnection()方法:

代码七:

Java代码  

  1. private synchronized void setupConnection() throws IOException {
  2. short ioFailures = 0;
  3. short timeoutFailures = 0;
  4. while (true) {
  5. try {
  6. this.socket = socketFactory.createSocket(); //终于看到创建socket的方法了
  7. this.socket.setTcpNoDelay(tcpNoDelay);
  8. •••
  9. // 设置连接超时为20s
  10. NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
  11. this.socket.setSoTimeout(pingInterval);
  12. return;
  13. } catch (SocketTimeoutException toe) {
  14. /* 设置最多连接重试为45次。
  15. * 总共有20s*45 = 15 分钟的重试时间。
  16. */
  17. handleConnectionFailure(timeoutFailures++, 45, toe);
  18. } catch (IOException ie) {
  19. handleConnectionFailure(ioFailures++, maxRetries, ie);
  20. }
  21. }
  22. }

终于,我们知道了客户端的连接是怎样建立的了,其实就是创建一个普通的
socket进行通信。呵呵,那服务端是不是也是创建一个ServerSocket进行通信的呢?呵呵,先不要急,到这里我们只解决了客户端的第一个问
题,下面还有两个问题没有解决呢,我们一个一个地来解决吧。

问题2:客户端是怎样给服务端发送数据的?

我们回顾一下代码四吧。第一句为了完成连接的建立,我们已经分析完毕;而第二句是为了发送数据,呵呵,分析下去,看能不能解决我们的问题呢。下面贴出Client.Connection类的sendParam()方法吧:

代码八:

Java代码  

  1. public void sendParam(Call call) {
  2. if (shouldCloseConnection.get()) {
  3. return;
  4. }
  5. DataOutputBuffer d=null;
  6. try {
  7. synchronized (this.out) {
  8. if (LOG.isDebugEnabled())
  9. LOG.debug(getName() + " sending #" + call.id);
  10. //创建一个缓冲区
  11. d = new DataOutputBuffer();
  12. d.writeInt(call.id);
  13. call.param.write(d);
  14. byte[] data = d.getData();
  15. int dataLength = d.getLength();
  16. out.writeInt(dataLength);        //首先写出数据的长度
  17. out.write(data, 0, dataLength); //向服务端写数据
  18. out.flush();
  19. }
  20. } catch(IOException e) {
  21. markClosed(e);
  22. } finally {
  23. IOUtils.closeStream(d);
  24. }
  25. }

其实这就是java io的socket发送数据的一般过程哦,没有什么特别之处。到这里问题二也解决了,来看看问题三吧。

问题3:客户端是怎样获取服务端的返回数据的?

我们再回顾一下代码六吧。代码六中,当连接建立时会启动一个线程用于处理服务端返回的数据,我们看看这个处理线程是怎么实现的吧,下面贴出Client.Connection类和Client.Call类中的相关方法吧:

代码九:

Java代码  

  1. 方法一:
  2. public void run() {
  3. •••
  4. while (waitForWork()) {
  5. receiveResponse();  //具体的处理方法
  6. }
  7. close();
  8. •••
  9. }
  10. 方法二:
  11. private void receiveResponse() {
  12. if (shouldCloseConnection.get()) {
  13. return;
  14. }
  15. touch();
  16. try {
  17. int id = in.readInt();                    // 阻塞读取id
  18. if (LOG.isDebugEnabled())
  19. LOG.debug(getName() + " got value #" + id);
  20. Call call = calls.get(id);    //在calls池中找到发送时的那个对象
  21. int state = in.readInt();     // 阻塞读取call对象的状态
  22. if (state == Status.SUCCESS.state) {
  23. Writable value = ReflectionUtils.newInstance(valueClass, conf);
  24. value.readFields(in);           // 读取数据
  25. //将读取到的值赋给call对象,同时唤醒Client等待线程,贴出setValue()代码方法三
  26. call.setValue(value);
  27. calls.remove(id);               //删除已处理的call
  28. } else if (state == Status.ERROR.state) {
  29. •••
  30. } else if (state == Status.FATAL.state) {
  31. •••
  32. }
  33. } catch (IOException e) {
  34. markClosed(e);
  35. }
  36. }
  37. 方法三:
  38. public synchronized void setValue(Writable value) {
  39. this.value = value;
  40. callComplete();   //具体实现
  41. }
  42. protected synchronized void callComplete() {
  43. this.done = true;
  44. notify();         // 唤醒client等待线程
  45. }

代码九完成的功能主要是:启动一个处理线程,读取从服务端传来的call对象,将call对象读取完毕后,唤醒client处理线程。就这么简单,客户端就获取了服务端返回的数据了哦~。客户端的源码分析就到这里了哦,下面我们来分析Server端的源码吧。

四.ipc.Server源码分析

同样,为了让大家对ipc.Server有个初步的了解,我们先分析一下它的几个内部类吧:

Call
:用于存储客户端发来的请求
Listener
: 监听类,用于监听客户端发来的请求,同时Listener内部还有一个静态类,Listener.Reader,当监听器监听到用户请求,便让Reader读取用户请求。
Responder
:响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。
Connection
:连接类,真正的客户端请求读取逻辑在这个类中。
Handler
:请求处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。

如果你看过ipc.Server的源码,你会发现其实ipc.Server是一
个abstract修饰的抽象类。那随之而来的问题就是:hadoop是怎样初始化RPC的Server端的呢?这个问题着实也让我想了好长时间。不过后
来我想到Namenode初始化时一定初始化了RPC的Sever端,那我们去看看Namenode的初始化源码吧:

1. 初始化Server

代码十:

Java代码  

  1. private void initialize(Configuration conf) throws IOException {
  2. •••
  3. // 创建 rpc server
  4. InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
  5. if (dnSocketAddr != null) {
  6. int serviceHandlerCount =
  7. conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
  8. DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
  9. //获得serviceRpcServer
  10. this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(),
  11. dnSocketAddr.getPort(), serviceHandlerCount,
  12. false, conf, namesystem.getDelegationTokenSecretManager());
  13. this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
  14. setRpcServiceServerAddress(conf);
  15. }
  16. //获得server
  17. this.server = RPC.getServer(this, socAddr.getHostName(),
  18. socAddr.getPort(), handlerCount, false, conf, namesystem
  19. .getDelegationTokenSecretManager());
  20. •••
  21. this.server.start();  //启动 RPC server   Clients只允许连接该server
  22. if (serviceRpcServer != null) {
  23. serviceRpcServer.start();  //启动 RPC serviceRpcServer 为HDFS服务的server
  24. }
  25. startTrashEmptier(conf);
  26. }

查看Namenode初始化源码得知:RPC的server对象是通过ipc.RPC类的getServer()方法获得的。下面咱们去看看ipc.RPC类中的getServer()源码吧:

代码十一:

Java代码  

  1. public static Server getServer(final Object instance, final String bindAddress, final int port,
  2. final int numHandlers,
  3. final boolean verbose, Configuration conf,
  4. SecretManager<? extends TokenIdentifier> secretManager)
  5. throws IOException {
  6. return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
  7. }

这时我们发现getServer()是一个创建Server对象的工厂方法,但
创建的却是RPC.Server类的对象。哈哈,现在你明白了我前面说的“RPC.Server是ipc.Server的实现类”了吧。不过
RPC.Server的构造函数还是调用了ipc.Server类的构造函数的,因篇幅所限,就不贴出相关源码了。

2. 运行Server

如代码十所示,初始化Server后,Server端就运行起来了,看看ipc.Server的start()源码吧:

代码十二:

Java代码  

  1. /** 启动服务 */
  2. public synchronized void start() {
  3. responder.start();  //启动responder
  4. listener.start();   //启动listener
  5. handlers = new Handler[handlerCount];
  6. for (int i = 0; i < handlerCount; i++) {
  7. handlers[i] = new Handler(i);
  8. handlers[i].start();   //逐个启动Handler
  9. }
  10. }

3. Server处理请求

1)建立连接

分析过ipc.Client源码后,我们知道Client端的底层通信直接采用了阻塞式IO编程,当时我们曾做出猜测:Server端是不是也采用了阻塞
式IO。现在我们仔细地分析一下吧,如果Server端也采用阻塞式IO,当连接进来的Client端很多时,势必会影响Server端的性能。
hadoop的实现者们考虑到了这点,所以他们采用了java  NIO来实现Server端,java NIO可参考博客:
http://weixiaolu.iteye.com/blog/1479656
。那Server端采用java NIO是怎么建立连接的呢?分析源码得知,Server端采用Listener监听客户端的连接,下面先分析一下Listener的构造函数吧:

代码十三:

Java代码  

  1. public Listener() throws IOException {
  2. address = new InetSocketAddress(bindAddress, port);
  3. // 创建ServerSocketChannel,并设置成非阻塞式
  4. acceptChannel = ServerSocketChannel.open();
  5. acceptChannel.configureBlocking(false);
  6. // 将server socket绑定到本地端口
  7. bind(acceptChannel.socket(), address, backlogLength);
  8. port = acceptChannel.socket().getLocalPort();
  9. // 获得一个selector
  10. selector= Selector.open();
  11. readers = new Reader[readThreads];
  12. readPool = Executors.newFixedThreadPool(readThreads);
  13. //启动多个reader线程,为了防止请求多时服务端响应延时的问题
  14. for (int i = 0; i < readThreads; i++) {
  15. Selector readSelector = Selector.open();
  16. Reader reader = new Reader(readSelector);
  17. readers[i] = reader;
  18. readPool.execute(reader);
  19. }
  20. // 注册连接事件
  21. acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
  22. this.setName("IPC Server listener on " + port);
  23. this.setDaemon(true);
  24. }

在启动Listener线程时,服务端会一直等待客户端的连接,下面贴出Server.Listener类的run()方法:

代码十四:

Java代码  

  1. public void run() {
  2. •••
  3. while (running) {
  4. SelectionKey key = null;
  5. try {
  6. selector.select();
  7. Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
  8. while (iter.hasNext()) {
  9. key = iter.next();
  10. iter.remove();
  11. try {
  12. if (key.isValid()) {
  13. if (key.isAcceptable())
  14. doAccept(key);     //具体的连接方法
  15. }
  16. } catch (IOException e) {
  17. }
  18. key = null;
  19. }
  20. } catch (OutOfMemoryError e) {
  21. •••
  22. }

下面贴出Server.Listener类中doAccept ()方法中的关键源码吧:

代码十五:

Java代码  

  1. void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
  2. Connection c = null;
  3. ServerSocketChannel server = (ServerSocketChannel) key.channel();
  4. SocketChannel channel;
  5. while ((channel = server.accept()) != null) { //建立连接
  6. channel.configureBlocking(false);
  7. channel.socket().setTcpNoDelay(tcpNoDelay);
  8. Reader reader = getReader();  //从readers池中获得一个reader
  9. try {
  10. reader.startAdd(); // 激活readSelector,设置adding为true
  11. SelectionKey readKey = reader.registerChannel(channel);//将读事件设置成兴趣事件
  12. c = new Connection(readKey, channel, System.currentTimeMillis());//创建一个连接对象
  13. readKey.attach(c);   //将connection对象注入readKey
  14. synchronized (connectionList) {
  15. connectionList.add(numConnections, c);
  16. numConnections++;
  17. }
  18. •••
  19. } finally {
  20. //设置adding为false,采用notify()唤醒一个reader,其实代码十三中启动的每个reader都使
  21. //用了wait()方法等待。因篇幅有限,就不贴出源码了。
  22. reader.finishAdd();
  23. }
  24. }
  25. }

当reader被唤醒,reader接着执行doRead()方法。

2)接收请求

下面贴出Server.Listener.Reader类中的doRead()方法和Server.Connection类中的readAndProcess()方法源码:

代码十六:

Java代码  

  1. 方法一:
  2. void doRead(SelectionKey key) throws InterruptedException {
  3. int count = 0;
  4. Connection c = (Connection)key.attachment();  //获得connection对象
  5. if (c == null) {
  6. return;
  7. }
  8. c.setLastContact(System.currentTimeMillis());
  9. try {
  10. count = c.readAndProcess();    // 接受并处理请求
  11. } catch (InterruptedException ieo) {
  12. •••
  13. }
  14. •••
  15. }
  16. 方法二:
  17. public int readAndProcess() throws IOException, InterruptedException {
  18. while (true) {
  19. •••
  20. if (!rpcHeaderRead) {
  21. if (rpcHeaderBuffer == null) {
  22. rpcHeaderBuffer = ByteBuffer.allocate(2);
  23. }
  24. //读取请求头
  25. count = channelRead(channel, rpcHeaderBuffer);
  26. if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
  27. return count;
  28. }
  29. // 读取请求版本号
  30. int version = rpcHeaderBuffer.get(0);
  31. byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
  32. •••
  33. data = ByteBuffer.allocate(dataLength);
  34. }
  35. // 读取请求
  36. count = channelRead(channel, data);
  37. if (data.remaining() == 0) {
  38. •••
  39. if (useSasl) {
  40. •••
  41. } else {
  42. processOneRpc(data.array());//处理请求
  43. }
  44. •••
  45. }
  46. }
  47. return count;
  48. }
  49. }

3)获得call对象

下面贴出Server.Connection类中的processOneRpc()方法和processData()方法的源码。

代码十七:

Java代码  

  1. 方法一:
  2. private void processOneRpc(byte[] buf) throws IOException,
  3. InterruptedException {
  4. if (headerRead) {
  5. processData(buf);
  6. } else {
  7. processHeader(buf);
  8. headerRead = true;
  9. if (!authorizeConnection()) {
  10. throw new AccessControlException("Connection from " + this
  11. + " for protocol " + header.getProtocol()
  12. + " is unauthorized for user " + user);
  13. }
  14. }
  15. }
  16. 方法二:
  17. private void processData(byte[] buf) throws  IOException, InterruptedException {
  18. DataInputStream dis =
  19. new DataInputStream(new ByteArrayInputStream(buf));
  20. int id = dis.readInt();      // 尝试读取id
  21. Writable param = ReflectionUtils.newInstance(paramClass, conf);//读取参数
  22. param.readFields(dis);
  23. Call call = new Call(id, param, this);  //封装成call
  24. callQueue.put(call);   // 将call存入callQueue
  25. incRpcCount();  // 增加rpc请求的计数
  26. }

4)处理call对象

你还记得Server类中还有个Handler内部类吗?呵呵,对call对象的处理就是它干的。下面贴出Server.Handler类中run()方法中的关键代码:

代码十八:

Java代码  

  1. while (running) {
  2. try {
  3. final Call call = callQueue.take(); //弹出call,可能会阻塞
  4. •••
  5. //调用ipc.Server类中的call()方法,但该call()方法是抽象方法,具体实现在RPC.Server类中
  6. value = call(call.connection.protocol, call.param, call.timestamp);
  7. synchronized (call.connection.responseQueue) {
  8. setupResponse(buf, call,
  9. (error == null) ? Status.SUCCESS : Status.ERROR,
  10. value, errorClass, error);
  11. •••
  12. //给客户端响应请求
  13. responder.doRespond(call);
  14. }
  15. }

5)返回请求

下面贴出Server.Responder类中的doRespond()方法源码:

代码十九:

Java代码  

  1. 方法一:
  2. void doRespond(Call call) throws IOException {
  3. synchronized (call.connection.responseQueue) {
  4. call.connection.responseQueue.addLast(call);
  5. if (call.connection.responseQueue.size() == 1) {
  6. // 返回响应结果,并激活writeSelector
  7. processResponse(call.connection.responseQueue, true);
  8. }
  9. }
  10. }
时间: 2024-10-08 02:11:52

hadoop rpc principle的相关文章

Hadoop RPC简单例子

jdk中已经提供了一个RPC框架-RMI,但是该PRC框架过于重量级并且可控之处比较少,所以Hadoop RPC实现了自定义的PRC框架. 同其他RPC框架一样,Hadoop RPC分为四个部分: (1)序列化层:Clent与Server端通信传递的信息采用了Hadoop里提供的序列化类或自定义的Writable类型: (2)函数调用层:Hadoop RPC通过动态代理以及java反射实现函数调用: (3)网络传输层:Hadoop RPC采用了基于TCP/IP的socket机制: (4)服务器端

Hadoop RPC

第一部分:什么是RPC RPC (Remote Procedure Call Protocol) – 远程过程协议调用 .通过 RPC 我们可以从网络上的计算机请求服务,而不需要了 解底层网络协议. Hadoop 底层的交互都是通过 rpc 进行的.例 如: datanode 和 namenode . tasktracker 和 jobtracker . secondary namenode 和 namenode 之间的通信都是通过 rpc 实 现的. RPC 模式 RPC 采用客户机 / 服务

hadoop rpc 学习

Hadoop rpc 学习(版本2.6.0) 客户端部分 1.生成动态代理类(org.apache.hadoop.ipc.WritableRpcEngine.class 311行) 2.代理类执行任务入口(org.apache.hadoop.ipc.WritableRpcEngine.class 245行) 3.hadoop rpc client 给服务端发送请求 调用(org.apache.hadoop.ipc.Client.class 998行) public void sendRpcReq

Hadoop RPC通信Client客户端的流程分析

Hadoop的RPC的通信与其他系统的RPC通信不太一样,作者针对Hadoop的使用特点,专门的设计了一套RPC框架,这套框架个人感觉还是有点小复杂的.所以我打算分成Client客户端和Server服务端2个模块做分析.如果你对RPC的整套流程已经非常了解的前提下,对于Hadoop的RPC,你也一定可以非常迅速的了解的.OK,下面切入正题. Hadoop的RPC的相关代码都在org.apache.hadoop.ipc的包下,首先RPC的通信必须遵守许多的协议,其中最最基本的协议即使如下: /**

hadoop rpc基础

第一部分: hadoop rpc基础 RPC,远程程序调用,分布式计算中C/S模型的一个应用实例. 同其他RPC框架一样,Hadoop分为四个部分: 序列化层:支持多种框架实现序列化与反序列化 函数调用层:利用java反射与动态代理实现 网络传输层:基于TCP/IP的Socket机制 服务的处理框架:基于Reactor模式的事件驱动IO模型 Hadoop RPC主要对外提供2种接口 public static ProtocolProxy getProxy/waitForProxy: 构造一个客户

Hadoop RPC远程过程调用源码解析及实例

什么是RPC? 1.RPC(Remote Procedure Call)远程过程调用,它允许一台计算机程序远程调用另外一台计算机的子程序,而不用去关心底层的网络通信细节,对我们来说是透明的.经常用于分布式网络通信中. 2.Hadoop的进程间交互都是通过RPC来进行的,比如Namenode与Datanode之间,Jobtracker与Tasktracker之间等. RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据.在OSI网络通信模型中, RPC跨越了传输层和应用层

RPC与Hadoop RPC机制

一.什么是RPC? (1)Remote Procdure call ,远程方法调用,它允许一台计算机程序远程调用另外一台计算机的子程序,而不用去关心底层的网络通信细节,对我们来说是透明的.经常用于分布式网络通信中. (2)Hadoop的进程间交互都死通过RPC来进行的,比如Namenode与Datanode直接,Jobtracker与Tasktracker之间等. 流程: (1)RPC采用了C/S的模式: (2)Client端发送一个带有参数的请求信息到Server: (3)Server接收到这

Hadoop RPC protocol description--转

原文地址:https://spotify.github.io/snakebite/hadoop_rpc.html Snakebite currently implements the following protocol in snakebite.channel.SocketRpcChannel to communicate with the NameNode. Connection The Hadoop RPC protocol works as described below. On con

每天收获一点点------Hadoop RPC机制的使用

一.RPC基础概念 1.1 RPC的基础概念 RPC,即Remote Procdure Call,中文名:远程过程调用: (1)它允许一台计算机程序远程调用另外一台计算机的子程序,而不用去关心底层的网络通信细节,对我们来说是透明的.因此,它经常用于分布式网络通信中. RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据.在OSI网络通信模型中,RPC跨越了传输层和应用层.RPC使得开发包括网络分布式多程序在内的应用程序更加容易. (2)Hadoop的进程间交互都是通过R