Hadoop rpc通信

rpc是Hadoop分布式底层通信的基础,无论是client和namenode,namenode和datanode,以及yarn新框架之间的通信模式等等都是采用的rpc方式。

下面我们来概要分析一下Hadoop2的rpc。

Hadoop通信模式主要是C/S方式,及客户端和服务端的模式。

客户端采用传统的socket通信方式向服务端发送信息,并等待服务端的返回。

服务端采用reactor的模式(Java nio)的方式来处理客户端的请求并给予响应。

一、客户端到服务端的通信

下面我们先分析客户端到服务端的通信。

要先通信,就要建立连接,建立连接就要发头消息。

客户端代码在Hadoop common中的ipc包里,主要类为client.java。负责通信的内部类是Client.Connection,Connection中包括以下几个属性

private InetSocketAddress server;// 连接服务端的地址

private final ConnectionId remoteId;//connection复用,此类是为了复用连接而创建的,在client类中有一个连接池属性Hashtable<ConnectionId, Connection> connections,此属性表示如果多个客户端来自同一个remoteID连接,如果connection没有关闭,那么就复用这个connection。那么如何判断是来自同一个ConnectionId呢,见下面的代码。

/**
*ConnectionId类重写了equals方法
*
**/
@Override
    public boolean equals(Object obj) {
      if (obj == this) {
        return true;
      }
      if (obj instanceof ConnectionId) {
        ConnectionId that = (ConnectionId) obj;
       //同一个远端服务地址,即要连接同一个服务端
        return isEqual(this.address, that.address)
            && this.doPing == that.doPing
            && this.maxIdleTime == that.maxIdleTime
            && isEqual(this.connectionRetryPolicy, that.connectionRetryPolicy)
            && this.pingInterval == that.pingInterval
            //同一个远程协议,像datanode与namenode,client与       //namenode等之间通信的时候都各自有自己的协议,
//如果不是同一个协议则使用不同的连接
            && isEqual(this.protocol, that.protocol)
            && this.rpcTimeout == that.rpcTimeout
            && this.tcpNoDelay == that.tcpNoDelay
            && isEqual(this.ticket, that.ticket);
      }
      return false;
    }

private DataInputStream in;//输入

private DataOutputStream out;//输出

private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();//Call类是client的内部类,将客户端的请求,服务端的响应等信息封装成一个call类,在后面我们会详细分析此类。而calls属性是建立连接后进行的多次消息传送,也就是我们每次建立连接可能会在连接有效期间发送了多次请求。

说了这些属性的含义,那么是怎么和服务端建立连接的呢。看下面的代码解析

  private Connection getConnection(ConnectionId remoteId,
      Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
      throws IOException {
    //running是client的一个属性,表示客户端现在是否向服务端进行请求,如果没有running(running是一个AtomicBollean原子布尔类的对象)就是返回false
    if (!running.get()) {
      // the client is stopped
      throw new IOException("The client is stopped");
    }
    Connection connection;

    do {
      synchronized (connections) {
        //判断是否存在对应的连接没有则新建
        connection = connections.get(remoteId);
        if (connection == null) {
          connection = new Connection(remoteId, serviceClass);
          connections.put(remoteId, connection);
        }
      }
    //addCall中判断当获取取得接应该关闭了,则不能将call放到这个关闭的连接中
    } while (!connection.addCall(call));

    //进行输入输出对象初始化
    connection.setupIOstreams(fallbackToSimpleAuth);
    return connection;
  }

private synchronized boolean addCall(Call call) {
     //shouldCloseConnection也是connection类的属性,当连接异常,或者客户端要断开连接是,它返回false,说明这个连接正在回收中,不能继续使用。
      if (shouldCloseConnection.get())
        return false;
      calls.put(call.id, call);
      notify();
      return true;
    }

getConnection方法只是初始化了connection对象,并将要发送的请求call对象放入连接connection中,其实还并没有与客户端进行通信。开始通信的方法是setupIOstreams方法,此方法不仅建立与服务端通信的输入输出对象,还进行消息头的发送,判断能否与服务端进行连接,由于Hadoop有很多个版本,而且并不是每个版本之间都能进行完美通信的。所以不同版本是不能通信的,消息头就是负责这个任务的,消息头中也附带了,通信的协议,说明到底是谁和谁之间通信(是client和namenode还是datanode和namenode,还是yarn中的resourceManage 和nodemanage等等)。

//省略了部分代码
private synchronized void setupIOstreams(
        AtomicBoolean fallbackToSimpleAuth) {
      if (socket != null || shouldCloseConnection.get()) {
        return;
      }
      try {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Connecting to "+server);
        }
        if (Trace.isTracing()) {
          Trace.addTimelineAnnotation("IPC client connecting to " + server);
        }
        short numRetries = 0;
        Random rand = null;
        while (true) {
          //connection一些初始化信息,建立socket,初始socket等等操作
          setupConnection();
         //初始输入
          InputStream inStream = NetUtils.getInputStream(socket);
          //初始输出
          OutputStream outStream = NetUtils.getOutputStream(socket);
          //向服务端写消息头信息
          writeConnectionHeader(outStream);
          . . . . . .

          writeConnectionContext(remoteId, authMethod);

          //connection连接有一定的超时限制,touch方法进行时间更新将连接最新时间更新到现在。
          touch();

          if (Trace.isTracing()) {
            Trace.addTimelineAnnotation("IPC client connected to " + server);
          }

          // connection类继承自thread类,在其run方法中开始接收服务端的返回消息,详见下面run方法
          start();
          return;
        }
      } catch (Throwable t) {
        if (t instanceof IOException) {
          markClosed((IOException)t);
        } else {
          markClosed(new IOException("Couldn‘t set up IO streams", t));
        }
        //如果出现错误就关闭连接,
        close();
      }
    }

run方法不仅仅是对消息头发送出的信息的响应,他是对当前连接在有效期间所有请求的响应的一个接收端。

public void run() {
      if (LOG.isDebugEnabled())
        LOG.debug(getName() + ": starting, having connections "
            + connections.size());

      try {
       //waitForWork方法判断当前连接是否处于工作状态,
        while (waitForWork()) {//wait here for work - read or close connection
          //接受消息
          receiveRpcResponse();
        }
      } catch (Throwable t) {
        // This truly is unexpected, since we catch IOException in receiveResponse
        // -- this is only to be really sure that we don‘t leave a client hanging
        // forever.
        LOG.warn("Unexpected error reading responses on connection " + this, t);
        markClosed(new IOException("Error reading responses", t));
      }
      //connection已经关闭,进行连接回收,包括输入输出的回收将连接从连接池中清除等
      close();

      if (LOG.isDebugEnabled())
        LOG.debug(getName() + ": stopped, remaining connections "
            + connections.size());
    }
//接收服务端返回的信息
 private void receiveRpcResponse() {
      if (shouldCloseConnection.get()) {
        return;
      }
      touch();

      try {
        //对返回消息的处理,分布式消息的处理方式有很多种,一种是定长格式,一种是不定长,定长方式很容易理解,不定长中包含了消息的长度,在消息头处,则可以容易的读出消息准确长度,并进行处理。
        int totalLen = in.readInt();
        RpcResponseHeaderProto header =
            RpcResponseHeaderProto.parseDelimitedFrom(in);
        checkResponse(header);

        int headerLen = header.getSerializedSize();
        headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
        //每个连接中有很多个call,call类中有一个callId的属性,类似于mac地址在对应的集群中是唯一的,从而能让客户端和服务端能够准去的处理请求。
        int callId = header.getCallId();
        if (LOG.isDebugEnabled())
          LOG.debug(getName() + " got value #" + callId);
        //获取正在处理的call
        Call call = calls.get(callId);
        //处理状态,RpcStatusProto是一个枚举类,有三种状态成功,错误,连接关闭。
        RpcStatusProto status = header.getStatus();
        if (status == RpcStatusProto.SUCCESS) {
         //通过反射方式获取返回的消息值
          Writable value = ReflectionUtils.newInstance(valueClass, conf);
          value.readFields(in);                 // read value
         //处理完成后将call从calls中删除掉
          calls.remove(callId);
          //将返回值放到client的结果值中
          call.setRpcResponse(value);

          // verify that length was correct
          // only for ProtobufEngine where len can be verified easily
          if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) {
            ProtobufRpcEngine.RpcWrapper resWrapper =
                (ProtobufRpcEngine.RpcWrapper) call.getRpcResponse();
            if (totalLen != headerLen + resWrapper.getLength()) {
              throw new RpcClientException(
                  "RPC response length mismatch on rpc success");
            }
          }
        } else { // Rpc Request failed
          // Verify that length was correct
          if (totalLen != headerLen) {
            throw new RpcClientException(
                "RPC response length mismatch on rpc error");
          }

          final String exceptionClassName = header.hasExceptionClassName() ?
                header.getExceptionClassName() :
                  "ServerDidNotSetExceptionClassName";
          final String errorMsg = header.hasErrorMsg() ?
                header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;
          final RpcErrorCodeProto erCode =
                    (header.hasErrorDetail() ? header.getErrorDetail() : null);
          if (erCode == null) {
             LOG.warn("Detailed error code not set by server on rpc error");
          }
          RemoteException re =
              ( (erCode == null) ?
                  new RemoteException(exceptionClassName, errorMsg) :
              new RemoteException(exceptionClassName, errorMsg, erCode));
          if (status == RpcStatusProto.ERROR) {
            calls.remove(callId);
            call.setException(re);
          } else if (status == RpcStatusProto.FATAL) {
            // Close the connection
            markClosed(re);
          }
        }
      } catch (IOException e) {
        markClosed(e);
      }
    }
//此方法是call中的
public synchronized void setRpcResponse(Writable rpcResponse) {
     //将结果值放到返回值中
     this.rpcResponse = rpcResponse;
     //当前call已处理完毕,
      callComplete();
    }
//此方法是call中的
protected synchronized void callComplete() {
     //done=true表示此call已经处理完成
      this.done = true;
     //在处理call的时候采用的是同步处理方案,所有处理完后要唤醒其中一个还未处理的call
      notify();                                 // notify caller
    }

未完待续。。。。。。

时间: 2024-08-13 08:05:07

Hadoop rpc通信的相关文章

Hadoop RPC通信Client客户端的流程分析

Hadoop的RPC的通信与其他系统的RPC通信不太一样,作者针对Hadoop的使用特点,专门的设计了一套RPC框架,这套框架个人感觉还是有点小复杂的.所以我打算分成Client客户端和Server服务端2个模块做分析.如果你对RPC的整套流程已经非常了解的前提下,对于Hadoop的RPC,你也一定可以非常迅速的了解的.OK,下面切入正题. Hadoop的RPC的相关代码都在org.apache.hadoop.ipc的包下,首先RPC的通信必须遵守许多的协议,其中最最基本的协议即使如下: /**

Hadoop RPC通信Server端的流程分析

前2天刚刚小小的分析下Client端的流程,走的还是比较通顺的,但是RPC的服务端就显然没有那么简单了,毕竟C-S这种模式的,压力和重点都是放在Server端的,所以我也只能做个大概的分析,因为里面细节的东西太多,我也不可能理清所有细节,但是我会集合源代码把主要的流程理理清.如果读者想进一步学习的话,可自行查阅源码. Server服务端和Client客户端在某些变量的定义上还是一致的,比如服务端也有Call,和Connection,这个很好理解,Call回调,和Connection连接是双向的.

Hadoop RPC通信机制

客户端与服务端都要实现同一个接口Bizable,客户端得到服务端实例代码对象的方法. 服务端需要绑定相关的IP地址.端口. 1.在这里,我们使用Hadoop提供的工具类RPC.Builder,下面就是服务端相关代码 public class RPCServer implements Bizable{ //服务端实现sayHi接口,为客户端提供接口 public String sayHi(String name) { return "Hi~" + name; } public stati

Hadoop RPC远程过程调用源码解析及实例

什么是RPC? 1.RPC(Remote Procedure Call)远程过程调用,它允许一台计算机程序远程调用另外一台计算机的子程序,而不用去关心底层的网络通信细节,对我们来说是透明的.经常用于分布式网络通信中. 2.Hadoop的进程间交互都是通过RPC来进行的,比如Namenode与Datanode之间,Jobtracker与Tasktracker之间等. RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据.在OSI网络通信模型中, RPC跨越了传输层和应用层

Hadoop学习&lt;四&gt;--HDFS的RPC通信原理总结

这里先写下自己学习RPC的笔记总结,下面将详细介绍学习过程: RPC(remote procedure call) 不同java进程间的对象方法的调用. 一方称作服务端(server),一方称作客户端(client). server端提供对象,供客户端调用的,被调用的对象的方法的执行发生在server端. RPC是hadoop框架运行的基础. 通过rpc小例子获得的认识? 1. 服务端提供的对象必须是一个接口,接口extends VersioinedProtocal 2. 客户端能够的对象中的方

Hadoop之——RPC通信实例

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/45922715 一. RPC(remote procedure call) 不同java进程间的对象方法的调用. 一方称作服务端(server),一方称作客户端(client). server端提供对象,供客户端调用的,被调用的对象的方法的执行发生在server端. RPC是hadoop框架运行的基础. 二.通过rpc小例子获得的认识 1.服务端提供的对象必须是一个接口,接口ext

Hadoop RPC简单例子

jdk中已经提供了一个RPC框架-RMI,但是该PRC框架过于重量级并且可控之处比较少,所以Hadoop RPC实现了自定义的PRC框架. 同其他RPC框架一样,Hadoop RPC分为四个部分: (1)序列化层:Clent与Server端通信传递的信息采用了Hadoop里提供的序列化类或自定义的Writable类型: (2)函数调用层:Hadoop RPC通过动态代理以及java反射实现函数调用: (3)网络传输层:Hadoop RPC采用了基于TCP/IP的socket机制: (4)服务器端

Hadoop RPC

第一部分:什么是RPC RPC (Remote Procedure Call Protocol) – 远程过程协议调用 .通过 RPC 我们可以从网络上的计算机请求服务,而不需要了 解底层网络协议. Hadoop 底层的交互都是通过 rpc 进行的.例 如: datanode 和 namenode . tasktracker 和 jobtracker . secondary namenode 和 namenode 之间的通信都是通过 rpc 实 现的. RPC 模式 RPC 采用客户机 / 服务

RPC与Hadoop RPC机制

一.什么是RPC? (1)Remote Procdure call ,远程方法调用,它允许一台计算机程序远程调用另外一台计算机的子程序,而不用去关心底层的网络通信细节,对我们来说是透明的.经常用于分布式网络通信中. (2)Hadoop的进程间交互都死通过RPC来进行的,比如Namenode与Datanode直接,Jobtracker与Tasktracker之间等. 流程: (1)RPC采用了C/S的模式: (2)Client端发送一个带有参数的请求信息到Server: (3)Server接收到这