RPC源码阅读-客户端

Hadoop版本Hadoop2.6

RPC主要分为3个部分:(1)交互协议(2)客户端(3)服务端

(2)客户端

先展示RPC客户端实例代码

public class LoginController {
    public static void main(String[] args) throws IOException {
      //获取RPC LoginServiceInterface协议接口的代理对象
        LoginServiceInterface proxy= RPC.getProxy(LoginServiceInterface.class,1L,new InetSocketAddress("localhost",10000),new Configuration());
        String msg=proxy.login("xiaoming","123123");
        System.out.println(msg);
    }
}

(1)进入上述的RPC.getProxy方法,会发现是通过获取RpcEngine接口(默认实现是WritableRpcEngine),利用WritableRpcEngine的getProxy方法获取Proxy代理,如下所示

public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
                         InetSocketAddress addr, UserGroupInformation ticket,
                         Configuration conf, SocketFactory factory,
                         int rpcTimeout, RetryPolicy connectionRetryPolicy,
                         AtomicBoolean fallbackToSimpleAuth)
    throws IOException {    

    if (connectionRetryPolicy != null) {
      throw new UnsupportedOperationException(
          "Not supported: connectionRetryPolicy=" + connectionRetryPolicy);
    }
    T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
        new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
            factory, rpcTimeout, fallbackToSimpleAuth));
    return new ProtocolProxy<T>(protocol, proxy, true);
  }

(2)上述就是客户端获取代理的过程,但是其中是如何从服务端获取通过动态代理类Invoker实现,并将代理封装成ProtocolProxy类,在本文上述的例子中,该ProtocolProxy类没有干什么,只是通过getProxy()方法将封装的代理返回给客户端

那么我们接着分析动态代理类Invoker

Invoker成员有Clinet类,并且全局变量ClientCache对Client进行缓存。

动态代理类Invoker在代理对象发送请求时会自动执行invoke()方法,如下所示:

public Object invoke(Object proxy, Method method, Object[] args)
      throws Throwable {
      long startTime = 0;
      if (LOG.isDebugEnabled()) {
        startTime = Time.now();
      }
      TraceScope traceScope = null;
      if (Trace.isTracing()) {
        traceScope = Trace.startSpan(
            method.getDeclaringClass().getCanonicalName() +
            "." + method.getName());
      }
      ObjectWritable value;
      try {
        value = (ObjectWritable)
          client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args),
            remoteId, fallbackToSimpleAuth);
      } finally {
        if (traceScope != null) traceScope.close();
      }
      if (LOG.isDebugEnabled()) {
        long callTime = Time.now() - startTime;
        LOG.debug("Call: " + method.getName() + " " + callTime);
      }
      return value.get();
    }

3、上述中动态代理通过client.call方法向服务器发送请求获取返回值。

我们还看到Invocation类封装了方法和参数,Invocation通过实现Writable实现序列化,方便数据在网络中传输,作为数据传输层,相当于VO。

因此我们接着进入Clinet类,查看call方法干了什么。

首先我们先看看Client类的结构,Client类包含了几个内部类:

Call :用于封装Invocation对象,作为VO,写到服务端,同时也用于存储从服务端返回的数据
Connection :用以处理远程连接对象。继承了Thread
ConnectionId :唯一确定一个连接

Client类中call()方法如下所示:

public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
      ConnectionId remoteId, int serviceClass,
      AtomicBoolean fallbackToSimpleAuth) throws IOException {
    final Call call = createCall(rpcKind, rpcRequest);//将传入的数据封装成call对象
    Connection connection = getConnection(remoteId, call, serviceClass,
      fallbackToSimpleAuth);//获得一个连接  
    try {
      connection.sendRpcRequest(call);                 // send the rpc request向服务端发送call对象
    } catch (RejectedExecutionException e) {
      throw new IOException("connection has been closed", e);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      LOG.warn("interrupted waiting to send rpc request to server", e);
      throw new IOException(e);
    }

    boolean interrupted = false;
    synchronized (call) {
      while (!call.done) {
        try {
          call.wait();                           // wait for the result
        } catch (InterruptedException ie) {
          // save the fact that we were interrupted
          interrupted = true;
        }
      }

      if (interrupted) {
        // set the interrupt flag now that we are done waiting
        Thread.currentThread().interrupt();
      }

      if (call.error != null) {
        if (call.error instanceof RemoteException) {
          call.error.fillInStackTrace();
          throw call.error;
        } else { // local exception
          InetSocketAddress address = connection.getRemoteAddress();
          throw NetUtils.wrapException(address.getHostName(),
                  address.getPort(),
                  NetUtils.getHostname(),
                  0,
                  call.error);
        }
      } else {
        return call.getRpcResponse();
      }
    }
  }

4、从上述可以看到,rpcRequest是将方法和参数封装后的可序列号的对象,当做请求参数发送给服务端。

在上述方法中主要使用了两个类Call和Connection.

Call:封装了与服务端请求的状态,包括:

    final int id;               // call id该请求连接ID
    final int retry;           // retry count该请求重试次数
    final Writable rpcRequest;  // the serialized rpc request该请求参数
    Writable rpcResponse;       // null if rpc has error该请求的返回值
    IOException error;          // exception, null if success该请求成功标示
    final RPC.RpcKind rpcKind;      // Rpc EngineKind使用RpcEngine的类型
    boolean done;               // true when call is done该请求完成标示

Connection则是实现了与服务端建立连接,发送请求,获取数据等功能。

5、Connection类解析

Connection类继承线程类Thread.

从3步可以看到在Clinet的call()方法通过getConnection()方法获取Connection,如下所示:

可以看出Client使用connections对客户端每一个connection进行缓存,

并通过setupIOstreams()方法与服务器建立Socket连接,并创建输入输出流connection.in,connection.out,

并通过start()方法启动该线程也就是运行Connection类的run()方法,等待服务端传回数据。

因此Connection类主要通过run()方法接受数据,通过sendRpcRequest()向服务端发送请求。

private Connection getConnection(ConnectionId remoteId,
      Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
      throws IOException {
    if (!running.get()) {
      // the client is stopped
      throw new IOException("The client is stopped");
    }
    Connection connection;
    /* we could avoid this allocation for each RPC by having a
     * connectionsId object and with set() method. We need to manage the
     * refs for keys in HashMap properly. For now its ok.
     */
    do {
      synchronized (connections) {
        connection = connections.get(remoteId);
        if (connection == null) {
          connection = new Connection(remoteId, serviceClass);
          connections.put(remoteId, connection);
        }
      }
    } while (!connection.addCall(call));

    //we don‘t invoke the method below inside "synchronized (connections)"
    //block above. The reason for that is if the server happens to be slow,
    //it will take longer to establish a connection and that will slow the
    //entire system down.
    connection.setupIOstreams(fallbackToSimpleAuth);
    return connection;
  }

5.1 Connection 的sendRpcRequest()向服务端发送请求

public void sendRpcRequest(final Call call)
        throws InterruptedException, IOException {
      if (shouldCloseConnection.get()) {
        return;
      }

      // Serialize the call to be sent. This is done from the actual
      // caller thread, rather than the sendParamsExecutor thread,

      // so that if the serialization throws an error, it is reported
      // properly. This also parallelizes the serialization.
      //
      // Format of a call on the wire:
      // 0) Length of rest below (1 + 2)
      // 1) RpcRequestHeader  - is serialized Delimited hence contains length
      // 2) RpcRequest
      //
      // Items ‘1‘ and ‘2‘ are prepared here.
      final DataOutputBuffer d = new DataOutputBuffer();
      RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
          call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
          clientId);
      header.writeDelimitedTo(d);
      call.rpcRequest.write(d);

      synchronized (sendRpcRequestLock) {
        Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
          @Override
          public void run() {
            try {
              synchronized (Connection.this.out) {
                if (shouldCloseConnection.get()) {
                  return;
                }

                if (LOG.isDebugEnabled())
                  LOG.debug(getName() + " sending #" + call.id);

                byte[] data = d.getData();
                int totalLength = d.getLength();
                out.writeInt(totalLength); // Total Length
                out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest
                out.flush();
              }
            } catch (IOException e) {
              // exception at this point would leave the connection in an
              // unrecoverable state (eg half a call left on the wire).
              // So, close the connection, killing any outstanding calls
              markClosed(e);
            } finally {
              //the buffer is just an in-memory buffer, but it is still polite to
              // close early
              IOUtils.closeStream(d);
            }
          }
        });

        try {
          senderFuture.get();
        } catch (ExecutionException e) {
          Throwable cause = e.getCause();

          // cause should only be a RuntimeException as the Runnable above
          // catches IOException
          if (cause instanceof RuntimeException) {
            throw (RuntimeException) cause;
          } else {
            throw new RuntimeException("unexpected checked exception", cause);
          }
        }
      }
    }

5.2 Connection 的run()获取服务端返回的数据

可以看到通过receiveRpcResponse()方法通过之前建立的输入流in获取服务器传来的数据,并将数据value传给call数据对象call.setRpcResponse(value);,

在call.setRpcResponse(value)方法中通过callComplete()将call数据对象设置成已完成,并通过notify()唤醒该call对象。

在Client的call()方法中,检测到call对象已完成后,就将call对象中的响应数据返回给调用者。

至此,一个完整的RPC远程过程调用的过程就完成了。

public void run() {
      if (LOG.isDebugEnabled())
        LOG.debug(getName() + ": starting, having connections "
            + connections.size());

      try {
        while (waitForWork()) {//wait here for work - read or close connection循环等待获取服务端数据
          receiveRpcResponse();//获取服务端数据的具体实现
        }
      } catch (Throwable t) {
        // This truly is unexpected, since we catch IOException in receiveResponse
        // -- this is only to be really sure that we don‘t leave a client hanging
        // forever.
        LOG.warn("Unexpected error reading responses on connection " + this, t);
        markClosed(new IOException("Error reading responses", t));
      }

      close();

      if (LOG.isDebugEnabled())
        LOG.debug(getName() + ": stopped, remaining connections "
            + connections.size());
    }
private void receiveRpcResponse() {
      if (shouldCloseConnection.get()) {
        return;
      }
      touch();

      try {
        int totalLen = in.readInt();
        RpcResponseHeaderProto header =
            RpcResponseHeaderProto.parseDelimitedFrom(in);
        checkResponse(header);

        int headerLen = header.getSerializedSize();
        headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);

        int callId = header.getCallId();
        if (LOG.isDebugEnabled())
          LOG.debug(getName() + " got value #" + callId);

        Call call = calls.get(callId);
        RpcStatusProto status = header.getStatus();
        if (status == RpcStatusProto.SUCCESS) {
          Writable value = ReflectionUtils.newInstance(valueClass, conf);
          value.readFields(in);                 // read value
          calls.remove(callId);
          call.setRpcResponse(value);

          // verify that length was correct
          // only for ProtobufEngine where len can be verified easily
          if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) {
            ProtobufRpcEngine.RpcWrapper resWrapper =
                (ProtobufRpcEngine.RpcWrapper) call.getRpcResponse();
            if (totalLen != headerLen + resWrapper.getLength()) {
              throw new RpcClientException(
                  "RPC response length mismatch on rpc success");
            }
          }
        } else { // Rpc Request failed
          // Verify that length was correct
          if (totalLen != headerLen) {
            throw new RpcClientException(
                "RPC response length mismatch on rpc error");
          }

          final String exceptionClassName = header.hasExceptionClassName() ?
                header.getExceptionClassName() :
                  "ServerDidNotSetExceptionClassName";
          final String errorMsg = header.hasErrorMsg() ?
                header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;
          final RpcErrorCodeProto erCode =
                    (header.hasErrorDetail() ? header.getErrorDetail() : null);
          if (erCode == null) {
             LOG.warn("Detailed error code not set by server on rpc error");
          }
          RemoteException re =
              ( (erCode == null) ?
                  new RemoteException(exceptionClassName, errorMsg) :
              new RemoteException(exceptionClassName, errorMsg, erCode));
          if (status == RpcStatusProto.ERROR) {
            calls.remove(callId);
            call.setException(re);
          } else if (status == RpcStatusProto.FATAL) {
            // Close the connection
            markClosed(re);
          }
        }
      } catch (IOException e) {
        markClosed(e);
      }
    }
时间: 2024-08-05 21:42:55

RPC源码阅读-客户端的相关文章

RPC源码阅读-交互协议

RPC主要分为3个部分:(1)交互协议(2)客户端(3)服务端 (1)交互协议 协议:把某些接口和接口中的方法称为协议,客户端和服务端只要实现这些接口中的方法就可以进行通信了 Hadoop RPC中VersionedProtocol是所有协议的父类,只定义了两个方法. package org.apache.hadoop.ipc; import java.io.IOException; /** * Superclass of all protocols that use Hadoop RPC. *

Hadoop RPC源码阅读-服务端Server

RPC服务端的实例代码: public class Starter { public static void main(String[] args) throws IOException { RPC.Builder build = new RPC.Builder(new Configuration()); build.setBindAddress("localhost").setPort(10000).setProtocol(LoginServiceInterface.class).s

Flume-NG源码阅读之AvroSink

org.apache.flume.sink.AvroSink是用来通过网络来传输数据的,可以将event发送到RPC服务器(比如AvroSource),使用AvroSink和AvroSource可以组成分层结构.它继承自AbstractRpcSink  extends AbstractSink implements Configurable这跟其他的sink一样都得extends AbstractSink implements Configurable,所以重点也在confgure.start.

【Java】【Flume】Flume-NG源码阅读之AvroSink

org.apache.flume.sink.AvroSink是用来通过网络来传输数据的,可以将event发送到RPC服务器(比如AvroSource),使用AvroSink和AvroSource可以组成分层结构.它继承自AbstractRpcSink  extends AbstractSink implements Configurable这跟其他的sink一样都得extends AbstractSink implements Configurable,所以重点也在confgure.start.

【Dubbo源码阅读系列】之远程服务调用(上)

今天打算来讲一讲 Dubbo 服务远程调用.笔者在开始看 Dubbo 远程服务相关源码的时候,看的有点迷糊.后来慢慢明白 Dubbo 远程服务的调用的本质就是动态代理模式的一种实现.本地消费者无须知道远程服务具体的实现,消费者和提供者通过代理类来进行交互!! 一.JAVA 动态代理 简单看一段代码回顾一下动态代理: public class MyInvocationHandler implements InvocationHandler{ private Object object; publi

CI框架源码阅读笔记3 全局函数Common.php

从本篇开始,将深入CI框架的内部,一步步去探索这个框架的实现.结构和设计. Common.php文件定义了一系列的全局函数(一般来说,全局函数具有最高的加载优先权,因此大多数的框架中BootStrap引导文件都会最先引入全局函数,以便于之后的处理工作). 打开Common.php中,第一行代码就非常诡异: if ( ! defined('BASEPATH')) exit('No direct script access allowed'); 上一篇(CI框架源码阅读笔记2 一切的入口 index

如何阅读Java源码 阅读java的真实体会

刚才在论坛不经意间,看到有关源码阅读的帖子.回想自己前几年,阅读源码那种兴奋和成就感(1),不禁又有一种激动. 源码阅读,我觉得最核心有三点:技术基础+强烈的求知欲+耐心. 说到技术基础,我打个比方吧,如果你从来没有学过Java,或是任何一门编程语言如C++,一开始去啃<Core Java>,你是很难从中吸收到营养的,特别是<深入Java虚拟机>这类书,别人觉得好,未必适合现在的你. 虽然Tomcat的源码很漂亮,但我绝不建议你一开始就读它.我文中会专门谈到这个,暂时不展开. 强烈

Memcache-Java-Client-Release源码阅读(之七)

一.主要内容 本章节的主要内容是介绍Memcache Client的Native,Old_Compat,New_Compat三个Hash算法的应用及实现. 二.准备工作 1.服务器启动192.168.0.106:11211,192.168.0.106:11212两个服务端实例. 2.示例代码: String[] servers = { "192.168.0.106:11211", "192.168.0.106:11212" }; SockIOPool pool =

Netty源码阅读(一) ServerBootstrap启动

Netty源码阅读(一) ServerBootstrap启动 转自我的Github Netty是由JBOSS提供的一个java开源框架.Netty提供异步的.事件驱动的网络应用程序框架和工具,用以快速开发高性能.高可靠性的网络服务器和客户端程序.本文讲会对Netty服务启动的过程进行分析,主要关注启动的调用过程,从这里面进一步理解Netty的线程模型,以及Reactor模式. 这是我画的一个Netty启动过程中使用到的主要的类的概要类图,当然是用到的类比这个多得多,而且我也忽略了各个类的继承关系