踏着前人的脚印学hadoop——ipc中的Server

1、An abstract IPC service.  IPC calls take a single {@link Writable} as a parameter, and return a {@link Writable} as their value.  A service runs on a port and is defined by a parameter class and a value class.

2、共有5个内部类ExceptionsHandler,Call,Listener,Responder,Connection,Handler

3、很多内部属性,位置比较散乱

private final boolean authorize;
private boolean isSecurityEnabled;
private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();

private String bindAddress;
  private int port;                               // port we listen on
  private int handlerCount;                       // number of handler threads
  private int readThreads;                        // number of read threads
  private Class<? extends Writable> paramClass;   // class of call parameters
  private int maxIdleTime;                        // the maximum idle time after  which a client may be disconnected
  private int thresholdIdleConnections;           // the number of idle connections after which we will start cleaning up idle   connections
  int maxConnectionsToNuke;                       // the max number of connections to nuke during a cleanup
  protected RpcInstrumentation rpcMetrics;
  private Configuration conf;
  private SecretManager<TokenIdentifier> secretManager;

private int maxQueueSize;
  private final int maxRespSize;
  private int socketSendBufferSize;
  private final boolean tcpNoDelay; // if T then disable Nagle‘s Algorithm

volatile private boolean running = true;         // true while server runs
  private BlockingQueue<Call> callQueue; // queued calls

private List<Connection> connectionList =
    Collections.synchronizedList(new LinkedList<Connection>());
  //maintain a list of client connections
  private Listener listener = null;
  private Responder responder = null;
  private int numConnections = 0;
  private Handler[] handlers = null;

4、这个Call和Client里边的Call是不一样的,

A call queued for handling.

private Writable param;                       // the parameter passed
private Connection connection;                // connection to client
private long timestamp;     // the time received when response is null ; the time served when response is not null
private ByteBuffer response;                      // the response for this call

5、Connection

Reads calls from a connection and queues them for handling.

private boolean rpcHeaderRead = false; // if initial rpc header is read
  private boolean headerRead = false;  //if the connection header that follows version is read.

private SocketChannel channel;
  private ByteBuffer data;
  private ByteBuffer dataLengthBuffer;
  private LinkedList<Call> responseQueue;
  private volatile int rpcCount = 0; // number of outstanding rpcs
  private long lastContact;
  private int dataLength;
  private Socket socket;// Cache the remote host & port info so that even if the socket is disconnected, we can say where it used to connect to.
  private String hostAddress;
  private int remotePort;
  private InetAddress addr;
  ConnectionHeader header = new ConnectionHeader();
  Class<?> protocol;
  boolean useSasl;
  SaslServer saslServer;
  private AuthMethod authMethod;
  private boolean saslContextEstablished;
  private boolean skipInitialSaslHandshake;
  private ByteBuffer rpcHeaderBuffer;
  private ByteBuffer unwrappedData;
  private ByteBuffer unwrappedDataLengthBuffer;
 
  UserGroupInformation user = null;
  public UserGroupInformation attemptingUser = null; // user name before auth

// Fake ‘call‘ for failed authorization response
  private final int AUTHROIZATION_FAILED_CALLID = -1;
  private final Call authFailedCall =
    new Call(AUTHROIZATION_FAILED_CALLID, null, this);
  private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
  // Fake ‘call‘ for SASL context setup
  private static final int SASL_CALLID = -33;
  private final Call saslCall = new Call(SASL_CALLID, null, this);
  private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream();
 
  private boolean useWrap = false;

6、ExceptionsHandler manages Exception groups for special handling e.g., terse exception group for concise logging messages

7、Handles queued calls .

8、Listens on the socket. Creates jobs for the handler threads

Listener里边有一个内部类,Reader

相应的属性有

private ServerSocketChannel acceptChannel = null; //the accept channel
private Selector selector = null; //the selector that we use for the server
private Reader[] readers = null;
private int currentReader = 0;
private InetSocketAddress address; //the address we bind at
private Random rand = new Random();
private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
                                     //-tion (for idle connections) ran
private long cleanupInterval = 10000; //the minimum interval between
                                      //two cleanup runs
private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
private ExecutorService readPool;

9、Responder

Sends responses of RPC back to clients

相应的属性有

private Selector writeSelector;
    private int pending;         // connections waiting to register
   
    final static int PURGE_INTERVAL = 900000; // 15mins

时间: 2024-10-10 03:02:56

踏着前人的脚印学hadoop——ipc中的Server的相关文章

踏着前人的脚印学hadoop&mdash;&mdash;ipc中的Client

1.Client有五个内部类,分别是Call,ParallelCall,ParallelResult,Connetion,ConnectionId 其实这五个类就是去完成两件事情的,一件事情是连接,另外一件事情是调用,而连接呢主要通过Connection来完成,ConnectionId是它的辅助类.调用呢,Call为主,由于会同时和NameNode和其他DataNode通讯,所以需要一个ParallelCall来完成这件事,调用完了总有个返回值吧,所以要有ParallelResult这个类. 2

踏着前人的脚印学Hadoop&mdash;&mdash;RPC源码

A simple RPC mechanism.A protocol  is a Java interface.  All parameters and return types must be one of:a primitive type(这个注意是9个基本类型,包括void),a String ; or a  Writable or an array of the above types All methods in the protocol should throw only IOExce

踏着前人的脚印学Hadoop&mdash;&mdash;结构、重点

HDFS作为一个分布式文件系统,是所有这些项目的基础.分析好HDFS,有利于了解其他系统.由于Hadoop的HDFS和MapReduce是同一个项目,我们就把他们放在一块,进行分析. 如果把整个hadoop当做一个java中的类的话,那么HDFS就是这个类的静态变量,其他的项目是hadoop中的方法. hdfs HDFS,Hadoop的分布式文件系统实现 文件系统的抽象,可以理解为支持多种文件系统实现的统一文件访问接口 fs 文件系统的抽象,可以理解为支持多种文件系统实现的统一文件访问接口 ip

踏着前人的脚印学Hadoop&mdash;&mdash;序列化,Writerable

package org.apache.hadoop.io; import java.io.DataOutput;import java.io.DataInput;import java.io.IOException; /** * A serializable object which implements a simple, efficient, 一个序列化的对象,这个家伙实现了一个简单.高效.序列化的协议,它是基于DataInput和DataOutput这两个IO对象的 * protocol,

Hadoop HA HDFS启动错误之org.apache.hadoop.ipc.Client: Retrying connect to server问题解决

近日,在搭建Hadoop HA QJM集群的时候,出现一个问题,如本文标题. 网上有很多HA的博文,其实比较好的博文就是官方文档,讲的已经非常详细.所以,HA的搭建这里不再赘述. 本文就想给出一篇org.apache.hadoop.ipc.Client: Retrying connect to server错误的解决的方法. 因为在搜索引擎中输入了错误问题,没有找到一篇解决问题的.这里写一篇备忘,也可以给出现同样问题的朋友一个提示. 一.问题描述 HA按照规划配置好,启动后,NameNode不能

application master 持续org.apache.hadoop.ipc.Client: Retrying connect to server

一.问题现象 某一个nodemanager退出后,导致 application master中出现大量的如下日志,并且持续很长时间,application master才成功退出. 2016-06-24 09:32:35,596 INFO [ContainerLauncher #3] org.apache.hadoop.ipc.Client: Retrying connect to server: dchadoop206/192.168.1.199:32951. Already tried 1 

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException)

在运行hadoop的程序时,向hdfs中写文件时候,抛出异常信息如下: Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=Administrator, access=WRITE, inode="/user":root:supergroup:drwxr-xr-x 原因:Hdfs中的/user

一张图解释Hadoop IPC

基于hadoop2.6.2.... 一张图Server启动,Client访问..... 使用hadoop ipc步骤: 1.定义RPC协议 2.实现RPC协议 3.构造和启动RPC SERVER 4.构造RPC Client并发送请求 参考: http://www.cnblogs.com/dycg/p/3934394.html http://www.cnblogs.com/dycg/p/rpc.html 我的字好丑!!!!

hive执行query语句时提示错误:org.apache.hadoop.ipc.RemoteException: java.io.IOException: java.io.IOException:

hive> select product_id, track_time from trackinfo limit 5; Total MapReduce jobs = 1 Launching Job 1 out of 1 Number of reduce tasks is set to 0 since there's no reduce operator org.apache.hadoop.ipc.RemoteException: java.io.IOException: java.io.IOEx