hbase RPCServer源码分析

前置知识: java,nio,多线程

看了几天的源码,写一些自己心得,若有错误请指出。

RPCServer的作用:负责创建listener,reader,responser,handler来处理client端的请求。

RPCServer中重要的子类有:Listener,Reader,Call,Connection,Responser

其中Reader是Listener的子类

listener负责监听client端的请求,主要做nio操作中的accept操作。

while (iter.hasNext()) {
  key = iter.next();
  iter.remove();
  try {
    if (key.isValid()) {
      if (key.isAcceptable())
        doAccept(key);
    }
  } catch (IOException ignored) {
  }
  key = null;
}

与client创建连接,生成新的channel,并将新的channel注册在reader上。

void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
  ……
  SocketChannel channel;
  ……
    Reader reader = getReader();
    try {
      reader.startAdd();
      SelectionKey readKey = reader.registerChannel(channel);  //listener接受的连接注册在reader上
      c = getConnection(channel, System.currentTimeMillis());
      readKey.attach(c);
   ……
}

reader负责处理listener传过来的channel,依次读取数据,

void doRead(SelectionKey key) throws InterruptedException {
  int count = 0;
  Connection c = (Connection)key.attachment();
  ……
  try {
    count = c.readAndProcess();
  } catch (InterruptedException ieo) {
  ……
}

这里调用Connection里面的readAndProcess()方法,这个方法的做用是读取客户端的数据,存入一个buffer字节数组中,给processRequest()方法进行处理,

processRequest方法:

protected void processRequest(byte[] buf) throws IOException, InterruptedException {
  ……
 //这里的call构造方法中的参数都是由buf中的数据解析出来的,前面省略的代码做了这部分工作
  Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
          totalRequestSize,
          traceInfo);
  //这里的scheduler是一个调度器,可以简单理解为一个线程池的控制器,它初始化时会生成默认大小的线程池,参数可由REGION_SERVER_HANDLER_COUNT来指定
  //也就是jstack文件中的handler线程,默认是30
  //dispatch方法会获取线程池中的一个线程,执行callRunner中的run()方法。run()方法的功能有:查询结果,并调用sendResponseIfReady()方法来返回数据。
  scheduler.dispatch(new CallRunner(RpcServer.this, call, userProvider));
}

call的run()方法:

public void run() {
  ……
  //查询数据,存在resultPair中
      resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
  ……
    if (!call.isDelayed() || !call.isReturnValueDelayed()) {
      Message param = resultPair != null ? resultPair.getFirst() : null;
      CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
      call.setResponse(param, cells, errorThrowable, error);
    }
  //调用Responser
    call.sendResponseIfReady();
 ……
}

其中rpcServer的call方法为:

public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
    Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
throws IOException {
  ……
   //此句进行查询
    Message result = service.callBlockingMethod(md, controller, param);
   ……
  //返回给Call对象
    return new Pair<Message, CellScanner>(result,
      controller != null? controller.cellScanner(): null);
……
}

再详细点的还没看。看了这些主要解决了以下几个疑惑:

reader的线程数在哪指定生成,handler的线程池在哪维护,监听连接请求的线程有几个?responser的线程又有几个?

listener只有一个,

listener中有一个Reader数组,默认是10,也就是说读取请求数据的连接池大小为10。

private class Listener extends Thread {
……
  private Reader[] readers = null;

handler的线程池由RPCServer中的scheduler维护,默认是30,
listener监听到一个请求后,生成对应的channel发送给Reader,然后Reader会为每一个channel创建一个connection,

connection中保存了连接的信息。然后调用connection的方法来读取请求参数,并生成call对象,这时将调用scheduler,

使用handler线程池(默认30)来查询数据,(这里就开始并行了),结果存在call对象用,call对象最后再调用responser类的方法,将结果返回给client。

responser只有一个线程,它维护了一个call链表,采用非阻塞的方式(这里要说也是并行),依次将call对象送出。

大致过程就是这样

				
时间: 2024-11-08 11:54:34

hbase RPCServer源码分析的相关文章

hbase split 源码分析之split策略

在工作中接触到split,于是查看了这块的源代码,先看到了split的策略,今天就说说这个吧,后续还会有split的其他源码分析和compact相关的源码分析. 看了很多其他人的博客,很多都是转发的,原创的也都没有注明是哪个版本.其实给很多读者造成混淆,我这里是基于Hbase-0.98.13  版本作为分析的,注意:不同版本的此部分源码很可能不一样. 在这个版本中使用的split策略是IncreasingToUpperBoundRegionSplitPolicy.确切来说他是0.94版本以后的策

Hbase写入hdfs源码分析

版权声明:本文由熊训德原创文章,转载请注明出处: 文章原文链接:https://www.qcloud.com/community/article/258 来源:腾云阁 https://www.qcloud.com/community 本文档从源码角度分析了,hbase作为dfs client写入hdfs的hadoop sequence文件最终刷盘落地的过程.之前在<wal线程模型源码分析>中描述wal的写过程时说过会写入hadoop sequence文件,hbase为了保证数据的安全性,一般都

hbase(0.94) get、scan源码分析

简介 本文是需要用到hbase timestamp性质时研究源码所写.内容有一定侧重.且个人理解不算深入,如有错误请不吝指出. 如何看源码 hbase依赖很重,没有独立的client包.所以目前如果在maven中指定如下: <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase</artifactId> <version>0.94-adh3u9.9</

HBase1.0.0源码分析之请求处理流程分析以Put操作为例(二)

HBase1.0.0源码分析之请求处理流程分析以Put操作为例(二) 1.通过mutate(put)操作,将单个put操作添加到缓冲操作中,这些缓冲操作其实就是Put的父类的一个List的集合.如下: private List<Row> writeAsyncBuffer = new LinkedList<>(); writeAsyncBuffer.add(m); 当writeAsyncBuffer满了之后或者是人为的调用backgroundFlushCommits操作促使缓冲池中的

Spark SQL之External DataSource外部数据源(二)源码分析

上周Spark1.2刚发布,周末在家没事,把这个特性给了解一下,顺便分析下源码,看一看这个特性是如何设计及实现的. /** Spark SQL源码分析系列文章*/ (Ps: External DataSource使用篇地址:Spark SQL之External DataSource外部数据源(一)示例 http://blog.csdn.net/oopsoom/article/details/42061077) 一.Sources包核心 Spark SQL在Spark1.2中提供了External

hadoop源码分析解读入门

hadoop 源代码分析(一) Google 的核心竞争技术是它的计算平台.HadoopGoogle的大牛们用了下面5篇文章,介绍了它们的计算设施. Google的几篇论文 GoogleCluster:http://research.google.com/archive/googlecluster.html Chubby:http://labs.google.com/papers/chubby.html GFS:http://labs.google.com/papers/gfs.html Big

HBase1.0.0源码分析之请求处理流程分析以Put操作为例(一)

如下面的代码所示,是HBase Put操作的简单代码实例,关于代码中的Connection connection = ConnectionFactory.createConnection(conf),已近在前一篇博 HBase1.0.0源码分析之Client启动连接流程,中介绍了链接的相关流程以及所启动的服务信息. TableName tn = TableName.valueOf("test010"); try (Connection connection = ConnectionFa

第十一篇:Spark SQL 源码分析之 External DataSource外部数据源

上周Spark1.2刚发布,周末在家没事,把这个特性给了解一下,顺便分析下源码,看一看这个特性是如何设计及实现的. /** Spark SQL源码分析系列文章*/ (Ps: External DataSource使用篇地址:Spark SQL之External DataSource外部数据源(一)示例 http://blog.csdn.net/oopsoom/article/details/42061077) 一.Sources包核心 Spark SQL在Spark1.2中提供了External

SOFA 源码分析 —— 服务发布过程

前言 SOFA 包含了 RPC 框架,底层通信框架是 bolt ,基于 Netty 4,今天将通过 SOFA-RPC 源码中的例子,看看他是如何发布一个服务的. 示例代码 下面的代码在 com.alipay.sofa.rpc.quickstart.QuickStartServer 类下. ServerConfig serverConfig = new ServerConfig() .setProtocol("bolt") // 设置一个协议,默认bolt .setPort(9696)