Thrift源码学习二——Server层

Thrift 提供了如图五种模式:TSimpleServer、TNonblockingServer、THsHaServer、TThreadPoolServer、TThreadSelectorServer

??

TSimpleServer、TThreadPoolServer 属于阻塞模型

TNonblockingServer、THsHaServer、TThreadedSelectorServer 属于非阻塞模型

TServer

TServer 为抽象类

public static class Args extends AbstractServerArgs<Args> {
  public Args(TServerTransport transport) {
    super(transport);
  }
}

public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> {
  final TServerTransport serverTransport;
  // 处理层工厂
  TProcessorFactory processorFactory;
  // 传输层工厂
  TTransportFactory inputTransportFactory = new TTransportFactory();
  TTransportFactory outputTransportFactory = new TTransportFactory();
  // 协议层工厂
  TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();
  TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();
}

TServer 定义的对外方法

/**
 * The run method fires up the server and gets things going.
 */
 public abstract void serve();
/**
 * Stop the server. This is optional on a per-implementation basis. Not
 * all servers are required to be cleanly stoppable.
 */
 public void stop() {}

stop 并不是每个服务都需要优雅的退出,所以没有定义为抽象方法

抽象方法 serve() 由具体的 TServer 实例实现

TSimpleServer

TSimpleServer 实现比较简单,是单线程阻塞模型,只适合测试开发使用

serve 方法源码分析

public void serve() {
  // 启动监听 socket
  serverTransport.listen();
  // 设置服务状态
  setServing(true);
  // 不断的等待与处理 socket 请求
  while(!stopped) {
    // accept 一个业务 socket 请求
    client = serverTransport_.accept();
    if (client != null) {
      // 通过工厂获取 server 定义的处理层、传输层和协议层
      processor = processorFactory_.getProcessor(client);
      inputTransport = inputTransportFactory_.getTransport(client);
      outputTransport = outputTransportFactory_.getTransport(client);
      inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
      outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
      if (eventHandler_ != null) {
        connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
      }
      // 阻塞式处理
      while (true) {
        // 处理请求事件
        if (eventHandler_ != null) {
          eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
        }
        // 如果处理层为异步,则退出
        if(!processor.process(inputProtocol, outputProtocol)) {
          break;
        }
      }
    }
    // 关闭
    eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);
    inputTransport.close();
    outputTransport.close();
    setServing(false);
  }
}

TSimpleServer 工作图

TThreadPoolServer

ThreadPoolServer 解决了 TSimple 不支持并发和多连接的问题,引入了线程池

与 TSimple 相同,主线程负责阻塞式监听 socket,而剩下的业务处理则全部交由线程池去处理

public void serve() {
  // 主线程启动监听 socket
  serverTransport_.listen();
  // 设置服务状态
  stopped_ = false;
  setServing(true);
  // 等待并处理 socket 请求
  while (!stopped_) {
    TTransport client = serverTransport_.accept();
    // Runnable run 逻辑与 TSimpleServer 类似
    WorkerProcess wp = new WorkerProcess(client);
    int retryCount = 0;
    long remainTimeInMillis = requestTimeoutUnit.toMillis(requestTimeout);
    while(true) {
        // 交由线程池来处理
        executorService_.execute(wp);
        break;
    }
  }
  executorService_.shutdown();
  setServing(false);
}

TThreadPoolServer 的缺点:

处理能力的好坏受限于线程池的设置

TNoblockingServer

TNoblockingServer 是单线程工作,但该模式采用了 NIO,所有的 socket 被注册到 selector 中,通过一个线程循环 selector 来监控所有 socket,当有就绪的 socket 时,根据不同的请求做不同的动作(读取、写入数据或 accept 新连接)

TNoblockingServer 的 serve 方法在其父类 AbstractNonblockingServer 中定义

/**
 * Begin accepting connections and processing invocations.
 * 开始接受并处理调用
 */
public void serve() {
  // start any IO threads
  // 注册一些监听 socket 的线程到 selector 中
  if (!startThreads()) {
    return;
  }
  // start listening, or exit
  // 开始监听
  if (!startListening()) {
    return;
  }
  // 设置服务状态
  setServing(true);
  // this will block while we serve
  // TNonblocking 中实现为 selectAcceptThread_.join();
  // 主线程等待 selectAcceptThread 执行完毕
  // SelectAcceptThread 的 run 方法为 select();
  // 取出一个就绪的 socket
  waitForShutdown();

  setServing(false);

  // do a little cleanup
  stopListening();
}

// SelectAcceptThread run 方法
public void run() {
  while (!stopped_) {
    select();
    processInterestChanges();
  }
  for (SelectionKey selectionKey : selector.keys()) {
    cleanupSelectionKey(selectionKey);
  }
}

// SelectAcceptThread Select 过程
private void select() {
  try {
    // wait for io events.
    // NIO 取出一个
    selector.select();
    // process the io events we received
    Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
    // 遍历就绪的 socket
    while (!stopped_ && selectedKeys.hasNext()) {
      SelectionKey key = selectedKeys.next();
      selectedKeys.remove();
      // if the key is marked Accept, then it has to be the server
      // transport.
      // accept 新 socket 并将其注册到 selector 中
      if (key.isAcceptable()) {
        handleAccept();
      } else if (key.isReadable()) {
        // deal with reads
        // 处理读数据的 socket 请求
        handleRead(key);
      } else if (key.isWritable()) {
        // deal with writes
        // 处理写数据的 socket 请求
        handleWrite(key);
      } else {
        LOGGER.warn("Unexpected state in select! " + key.interestOps());
      }
    }
  } catch (IOException e) {
    LOGGER.warn("Got an IOException while selecting!", e);
  }
}

// 接收新的连接
private void handleAccept() throws IOException {
  SelectionKey clientKey = null;
  TNonblockingTransport client = null;
  // accept the connection
  client = (TNonblockingTransport)serverTransport.accept();
  // 注册到 selector 中
  clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
  // add this key to the map
  FrameBuffer frameBuffer = createFrameBuffer(client, clientKey, SelectAcceptThread.this);
  clientKey.attach(frameBuffer);
}

TNonblockingServer 模式的缺点:

其还是采用单线程顺序来完成,当业务处理比较复杂耗时,该模式的效率将会下降

TNonblockingServer 工作图:

THsHaServer

THsHaServer 是 TNoblockingServer 的子类,处理逻辑基本相同,不同的是,在处理读取请求时,THsHaServer 将处理过程交由线程池来完成,主线程直接返回进行下一次循环,提高了效率

THsHaServer 模式的缺点:

其主线程需要完成对所有 socket 的监听一级数据的写操作,当大请求量时,效率较低

TThreadedSelectorServer

TThreadedSelectorServer 是 Thrift 目前提供的最高级模式,生产环境的首选,其对 TNonblockingServer 进行了扩展

TThreadedSelectorServer 源码中一些关键的属性

public static class Args extends AbstractNonblockingServerArgs<Args> {
    // 在已接收的连接中选择线程的个数
    public int selectorThreads = 2;
    // 执行线程池 ExecutorService 的线程个数
    private int workerThreads = 5;
    // 执行请求具体任务的线程池
    private ExecutorService executorService = null;
}
// The thread handling all accepts
private AcceptThread acceptThread;
// Threads handling events on client transports
private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>();
// This wraps all the functionality of queueing and thread pool management
// for the passing of Invocations from the selector thread(s) to the workers
// (if any).
private final ExecutorService invoker;
/**
 * 循环模式的负载均衡器,用于为新的连接选择 SelectorThread
 */
protected static class SelectorThreadLoadBalancer {}
  1. AcceptThread 线程对象,用于监听 socket 的新连接
  2. 多个 SelectorThread 线程对象,用于处理 socket 的读写操作
  3. 一个负载均衡对象 SelectorThreadLoadBalancer,用于决定将 AcceptThread 接收到的 socket 请求分配给哪个 SelectorThread 线程
  4. SelectorThread 线程执行过读写操作后,通过 ExecutorService 线程池来完成此次调用的具体执行

SelectorThread 对象源码解析

/**
 * 多个 SelectorThread 负责处理 socket 的 I/O 操作
 */
protected class SelectorThread extends AbstractSelectThread {
  /**
   * The work loop. Handles selecting (read/write IO), dispatching, and
   * managing the selection preferences of all existing connections.
   * 选择(处理 socket 的网络读写 IO),分配和管理现有连接
   */
  public void run() {
    while (!stopped_) {
      select();
    }
  }
  private void select() {
    // process the io events we received
    Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
    while (!stopped_ && selectedKeys.hasNext()) {
      SelectionKey key = selectedKeys.next();
      selectedKeys.remove();
      // skip if not valid
      if (!key.isValid()) {
        cleanupSelectionKey(key);
        continue;
      }
      if (key.isReadable()) {
        // deal with reads
        handleRead(key);
      } else if (key.isWritable()) {
        // deal with writes
        handleWrite(key);
      } else {
        LOGGER.warn("Unexpected state in select! " + key.interestOps());
      }
    }
  }
}

AcceptThread 对象源码解析

/**
 * 在服务器传输中选择线程(监听 socket 请求)并向 IO 选择器(SelectorThread)提供新连接
 */
protected class AcceptThread extends Thread {
  // The listen socket to accept on
  private final TNonblockingServerTransport serverTransport;
  private final Selector acceptSelector;
  // 负载均衡器,决定将连接分配给哪个 SelectorThread
  private final SelectorThreadLoadBalancer threadChooser;
  public void run() {
    while (!stopped_) {
      select();
    }
  }
  private void select() {
    // process the io events we received
    Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
    while (!stopped_ && selectedKeys.hasNext()) {
      SelectionKey key = selectedKeys.next();
      selectedKeys.remove();
      // 处理接收的新情求
      if (key.isAcceptable()) {
        handleAccept();
      } else {
        LOGGER.warn("Unexpected state in select! " + key.interestOps());
      }
    }
  }
  /**
   * Accept a new connection.
   */
  private void handleAccept() {
    final TNonblockingTransport client = doAccept();
    if (client != null) {
      // 从负载均衡器中,获取 SelectorThread 线程
      final SelectorThread targetThread = threadChooser.nextThread();
      if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
        doAddAccept(targetThread, client);
      } else {
        // FAIR_ACCEPT
        invoker.submit(new Runnable() {
          public void run() {
            // 将选择到的线程和连接放入 线程池 处理
            // 用 targetThread 线程取处理一个给接受的链接 client,如果新连接的队列处于满的状态,则将处于阻塞状态
            doAddAccept(targetThread, client);
          }
        });
      }
    }
  }
  private TNonblockingTransport doAccept() {
    return (TNonblockingTransport) serverTransport.accept();
  }
  // 用 targetThread 线程取处理一个给接受的链接 client,如果新连接的队列处于满的状态,则将处于阻塞状态
  private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {
    if (!thread.addAcceptedConnection(client)) {
      client.close();
    }
  }
}

TThreadedSelectorServer 工作图

参考资料

原文地址:https://www.cnblogs.com/zhengbin/p/8525274.html

时间: 2024-11-06 07:33:59

Thrift源码学习二——Server层的相关文章

Dubbo源码学习(二)

@Adaptive注解 在上一篇ExtensionLoader的博客中记录了,有两种扩展点,一种是普通的扩展实现,另一种就是自适应的扩展点,即@Adaptive注解的实现类. @Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE, ElementType.METHOD}) public @interface Adaptive { String[] value() default {}; } @Adapt

python 协程库gevent学习--gevent源码学习(二)

在进行gevent源码学习一分析之后,我还对两个比较核心的问题抱有疑问: 1. gevent.Greenlet.join()以及他的list版本joinall()的原理和使用. 2. 关于在使用monkey_patchall()之后隐式切换的问题. 下面我将继续通过分析源码及其行为来加以理解和掌握. 1. 关于gevent.Greenlet.join()(以下简称join)先来看一个例子: import gevent def xixihaha(msg): print(msg) gevent.sl

Thrift 源码学习一

Thrift 客户端与服务端的交互图 源码结构 传输层 TTransport: TTransport:客户端传输层抽象基础类,read.write.flush.close 等方法 TSocket 与 TNonBlockingSocket:分别是基于 BIO 和 NIO 客户端传输类 TServerSocket 与 TNonBlockingServerSocket:分别是基于 BIO 和 NIO 服务端传输类 TZlibTransport: TSaslClientTransport 与 TSasl

[spring源码学习]二、IOC源码——配置文件读取

一.环境准备 对于学习源码来讲,拿到一大堆的代码,脑袋里肯定是嗡嗡的,所以从代码实例进行跟踪调试未尝不是一种好的办法,此处,我们准备了一个小例子: package com.zjl; public class Person { private String name; public String getName() { return name; } public void setName(String name) { this.name = name; } public void sayHello

Bottle 框架源码学习 二

上一篇简单分析了route的基本用法 本篇分析一下run函数的运行原理 def run(app=None, server='wsgiref', host='127.0.0.1', port=8080,         interval=1, reloader=False, quiet=False, plugins=None,         debug=None, **kargs):          if NORUN: return     if reloader and not os.env

Nmap 源码学习二 整体架构

目录功能: docs :相关文档 libdnet-stripped :开源网络接口库 liblinear:开源大型线性分类库 liblua:开源Lua脚本语言库 libnetutil:基本的网络函数 libpcap:开源抓包库 libpcre:开源正则表达式库 macosx:xcode项目文件 mswin32:vs项目文件 nbase:Nmap封装的基础使用函数库 ncat:netcat网络工具,由Nmap实现 ndiff:比较Nmap扫描结果的实用命令 nmap-update:负责Nmap更新

jQuery源码学习(二)

回调对象Callbacks 回调对象Callbacks就是用来管理回调函数队列的. 参数说明 它提供几个便捷的处理参数 - once: 确保这个回调列表只执行一次 - memory: 保持以前的值,将添加到这个列表的后面的最新的值立即执行调用任何回调 - unique: 确保一次只能添加一个回调(所以在列表中没有重复的回调). - stopOnFalse: 当一个回调返回false 时中断调用 once和stopOnFalse作用于fire memory和unique作用于add once在源码

Python 源码学习二(SocketServer)

SocketServer这个模块中定义的类比较多,但是设计比较清晰,我们以TCPServer为主线分析,先脉络再细节. 总体脉络 将相关类分为两组,如图: 服务器相关(上) BaseServer是server基础类,定义server的基本处理运行与request处理机制,TCPServer直接继承它. Request处理类RequestHandler(下) BaseRequestHandler是request处理的基础类,TCPServer的request处理类StreamRequestHand

lua_gc 源码学习二

普及下常识:GC 是 garbage collector 资源回收器: 初期的 Lua GC 采取的是 stop the world 的实现.一旦产生 gc 就需要期待全部 gc 流程走完.lua 自己是个很精简的体系,但不代表处理的数据量也必然很小. 从 Lua 5.1 入手下手,GC 的实现改成分步的.固然照旧是 stop the world ,可是,每个步骤均可以分阶段执行.这样,屡次搁浅的时间较小.随之,这部门的代码也相对于纷乱了.分步执行最关键的问题是需要处理在 GC 的步骤之间,如果