Thrift源码分析阅读(一)

Thrift -Storm篇

从Nimbus启动说起:

当用户通过命令启动nimbus时,Classloader将会找到一个称之为bytetype.storm.daemon.nimbus的一个class文件,这个是由numbis.clj文件编译而成,来看nimbus.clj这个的启动方法:

(defn -main []

(-launch (standalone-nimbus)))

(standalone-nimbus) 执行这个方法返回一个INimbus接口的实例

执行launch(INimbus)这个方法,这个方法也在nimbus.clj上定义

(defn -launch [nimbus]

;; read-storm-config 在util.clj中定义

(launch-server! (read-storm-config) nimbus))

(read-storm-config)返回配置文件信息,如在storm.yaml上的定义,得到的是一个map实例。

接着我们执行launch-server!这个方法,执行流程:

  1. 检查启动模式是否是分布式模式
  2. 创建一个service-handler的一个句柄,这个用来处理Nimbus上的各种业务逻辑,如提交一个topology, kill topology等
  3. 配置服务器,如nimbus服务端口,处理线程池大小,业务处理分发器处理业务逻辑的service-handler服务句柄。
  4. 根据配置服务器的参数,创建nimbus服务,服务是一个THsHaServer。
  5. 添加主线程shutdown时的回调函数。
  6. 开启服务。

至此,这边完成了nimbus开启服务的整个过程。

重点过程在第2步和 3 4 6步

首先我们将注意力集中在第3 4 6步骤,观察nimbus如何提供各种服务。对应的clojure代码:

(let [

第3步:

options (-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))

(THsHaServer$Args.)

(.workerThreads 64)

(.protocolFactory

(TBinaryProtocol$Factory. false true (conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)))

(.processor (Nimbus$Processor. service-handler))

第4步:

server (THsHaServer.

(do (set! (. options maxReadBufferBytes)

(conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)) options))]

第6步:

(.serve server)))

我们从第4步直接查看THsHaServer的构造方法:

private ExecutorService invoker;

public THsHaServer(Args args) {

super(args);

invoker = args.executorService == null ? createInvokerPool(args) : args.executorService;

}

这里的args就相当于clojure代码中的options变量,由于在options中没有设置executorService这个变量,那么将调用createInvokerPool(args)这个方法线程池。

/**

* Helper to create an invoker pool

*/

protected static ExecutorService createInvokerPool(Args options) {

int workerThreads = options.workerThreads;

int stopTimeoutVal = options.stopTimeoutVal;

TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;

LinkedBlockingQueue<Runnable> queue =

new LinkedBlockingQueue<Runnable>();

ExecutorService invoker =

new ThreadPoolExecutor(workerThreads, workerThreads,

stopTimeoutVal, stopTimeoutUnit, queue);

return invoker;

}

这里就十分看清楚了,根据设置,返回的是一个为64的固定大小的线程池。

第6步,调用THsHaServer的serve方法时,

/** @inheritDoc */

@Override

public void serve() {

if (!startSelectorThread()) {

return;

}

// this will block while we serve

joinSelector();

}

我们重点关注startSelectorThread 和 joinSelector 两个方法。

  1. startSelectorThread()

    startSelectorThread()方法是父类 TNonblockingServer的一个方

    private SelectThread selectThread_;

protected boolean startSelectorThread() {

// start the selector

selectThread_ =new

SelectThread((TNonblockingServerTransport)serverTransport_);

stopped_ = false;

selectThread_.start();

return true;

} catch (IOException e) {

LOGGER.error("Failed to start selector thread!", e);

return false;

}

}

这个serverTransport_是怎么来的呢?

首先看clojure代码:

-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))

(THsHaServer$Args.)

在创建Args时,不停的调用父类的构造方法,同时,在创建THsHaServer时,也在不停的调用父类的构造,最终我们在TServer的构造方法里:

public abstract class TServer {

public static class Args extends AbstractServerArgs<Args> {

public Args(TServerTransport transport) {

super(transport);

}

}

public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> {

final TServerTransport serverTransport;

public AbstractServerArgs(TServerTransport transport) {

serverTransport = transport;

}

protected TServer(AbstractServerArgs args) {

serverTransport_ = args.serverTransport;

}

也就是说serverTransport_也就是TNonblockingServerSocket的一个实例。

SelectThread 是一个Thread的子类,在其构造方法中,完成了TNonblockingServerSocket实例注册监听器的过程。

this.selector = SelectorProvider.provider().openSelector();

serverTransport.registerSelector(selector);

开启SelectThread线程后,

  1. 当有一个accept事件时。

    1. 服务端ServerSocket接收该事件,并返回一个TNonblockingSocket对象,在这个对象中封装了接收该事件的端口信息SocketChannel。并将该端口信息注册到THsHaServer的Selector对象中,并标记为可读。
    2. 创建一个FrameBuffer的实例,绑定在SelectionKey中

      clientKey = client.registerSelector(selector, SelectionKey.OP_READ)

// add this key to the map

FrameBuffer frameBuffer = new FrameBuffer(client, clientKey);

clientKey.attach(frameBuffer);

以后对应的输入输出都将发生在这个端口中。因此是TTransport的一个实现。

  1. 当有一个read事件时。

    1. 得到SelectionKey绑定的FrameBuffer对象,

      FrameBuffer buffer = (FrameBuffer)key.attachment();

    2. buffer.read()
      1. 读取framesize的大小,占四个字节。

        这里有个很有意思的控制实例读取buffer的大小,使用一个private final AtomicLong readBufferBytesAllocated = new AtomicLong(0); 成员变量来控制。这个好!

// if this frame will always be too large for this server, log the // error and close the connection.

if (frameSize > MAX_READ_BUFFER_BYTES) {

LOGGER.error("Read a frame size of " + frameSize

+ ", which is bigger than the maximum allowable buffer size for ALL connections.");

return false;

}

// if this frame will push us over the memory limit, then return.

// with luck, more memory will free up the next time around.

if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) {

return true;

}

// increment the amount of memory allocated to read buffers

readBufferBytesAllocated.addAndGet(frameSize);

  1. 开辟一个framesize大小的bytebuffer,改变状态。

// reallocate the readbuffer as a frame-sized buffer

buffer_ = ByteBuffer.allocate(frameSize);

state_ = READING_FRAME;

  1. 读取frame
  1. 如果完全读取事件流,则调用requestInvoke(buffer)

    1. 调用THsHaServer中requestInvoke(buffer)方法,在线程池的调用句柄中执行frameBuffer.invoke()方法。此时算是一个异步调用的过程。

public void run() {

frameBuffer.invoke();

}

  1. frameBuffer.invoke()

    到此我们可以理解,我们已经完全的接收了用户的输入信息,下一步就是如何解析输入信息以及如何来处理反馈这个请求信息。

    invoke代码:

TTransport inTrans = getInputTransport();

TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans);

TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());

processorFactory_.getProcessor(inTrans).process(inProt, outProt);

getInputTransport():

private TTransport getInputTransport() {

return new TMemoryInputTransport(buffer_.array());

}

将用户的输入信息交给TMemoryInputTransport 来处理

这句clojure代码:

(.protocolFactory

(TBinaryProtocol$Factory. false true (conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)))

在TServer中有这样的代码:

public T protocolFactory(TProtocolFactory factory) {

this.inputProtocolFactory = factory;

this.outputProtocolFactory = factory;

return (T) this;

}

也就是inputProtocolFactory_ 是TBinaryProtocol$Factory的一个实例,

inputProtocolFactory_.getProtocol(inTrans);返回TBinaryProtocol一个实例。

processorFactory_.getProcessor(inTrans) 对应的clojure代码其实就

是:

(.processor (Nimbus$Processor. service-handler))

我们再来看看Nimbus$Processor

public static class Processor<I extends Iface>

extends org.apache.thrift.TBaseProcessor implements org.apache.thrift.TProcessor

调用TProcess的process(inProt, outProt)方法其实调用的是Nimbus.Processor的父类TBaseProcessor的方法:

在TBaseProcessor类中,保存了一个请求与处理业务逻辑的映射关系:

Map<String,ProcessFunction<I, ? extends TBase>> processMap;

由构造方法中注入这种对应的关系,也就是action –>业务逻辑,这里由用户实现。

在TBaseProcessor方法中,首先读取action,根据action查找出对应的业务逻辑 ProcessFunction:

TMessage msg = in.readMessageBegin();

ProcessFunction fn = processMap.get(msg.name);

由上文可知,这里的in就是指TBinaryProtocol实例。

如果存在对应的业务逻辑处理器,那么调用ProcessFunction的:

fn.process(msg.seqid, in, out, iface);

我们在回过头来观察Nimbus.Processor,看他究竟注入了多少个业务逻辑.

processMap.put("submitTopology", new submitTopology());

我们就以此为例。

submitTopology类继承了ProcessFunction,调用submitTopology类的process方法其实是调用了ProcessFunction的process方法。

在ProcessFunction中,提供两个抽象交给用户实现,一是如何根据请求信息形成一个请求对象,二是如何处理这个请求细节以及以及返回什么样的结果,这个是依据项目的不同而不同(细节),但都有类似的行为:

请求返回什么样的对象,T extends TBase:

T args = getEmptyArgsInstance();

如何根据请求初始化对象:

args.read(iprot);

处理请求,以及返回什么样的结果:

TBase result = getResult(iface, args);

回到这里,这里的iface究竟是什么含义呢?我们看看clojure代码:

Nimbus$Processor. service-handler

Nimbus.Processor的构造函数 I extends Iface:

public Processor(I iface) {

super(iface,…);

}

这里的Iface是Nimbus类中定义的一个接口,里面包含了一些处理业务逻辑的细节。

而service-handler实现了Nimbus.Iface接口。

由此我们完成了处理请求的一个完整的过程,下面是TBinaryProtocol字节码格式的定义。

TBinaryProtocol的输入是一个buffer字节数组。

TBinaryProtocol 传输字节码格式,当不用匹配TBinaryProtocol的版本时:


4 byte


4 byte


Msg


1 byte


4 byte


1byte


2byte


4byte


msg


整个frame的大小


开始的四个字节表示后面msg的大小,msg表示action的名字,用于事件分发操作,后面的一个字节表示msg的类型,最后4个字节表示seqid,也就是msg的编号,message begin。


这个区域表示用户输入的域,也就是参数,第一个字节表示域的类型,随后2个字节表示域的含义,由用户定义,4个字节表示msg的大小,最后是msg内容实体。

当需要匹配TBinaryProtocol的版本时,此时message begin变成:


4byte


4byte


msg


4byte


开始的四个字节表示VERSION_1 | message.type的值,其中messge.type为byte类型,随后的四个字节表示后面msg的大小,msg表示action的名字,用于事件分发操作,最后4个字节表示seqid,也就是msg的编号。

msg的类型有17中,定义在TType的类中。

那么,如何反馈一个请求呢?

由以上可知,当处理完业务逻辑时,我们得到TBase实例的反馈结果。

回头看FrameBuffer调用invoke()方法的这句:

TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());

getOutputTransport():

private TTransport getOutputTransport() {

response_ = new TByteArrayOutputStream();

// TBinaryProtocol protocol;

return outputTransportFactory_.getTransport(

new TIOStreamTransport(response_));

}

outputTransportFactory_对象中,是由TNonblockingServer 的类

AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {

super(transport);

transportFactory(new TFramedTransport.Factory());

}

因此getOutputTransport() 返回的是TFramedTransport这个对象,这个对象封装了TIOStreamTransport (ByteArrayOutputStream)这个实例。

因此TBinaryProtocol的输出流(response)是一个TFramedTransport这个对象。

在ProcessFunction的process方法中,我们调用

oprot.getTransport().flush();

输出时候,此时调用TFramedTransport的flush方法:

@Override

public void flush() throws TTransportException {

byte[] buf = writeBuffer_.get();

int len = writeBuffer_.len();

writeBuffer_.reset();

encodeFrameSize(len, i32buf);

transport_.write(i32buf, 0, 4);

transport_.write(buf, 0, len);

transport_.flush();

}

将frame大小信息以及信息实体写入到ByteArrayOutputStream当中。此时我们得到了结果字节流,但没有将这些信息写入到客户端中。

此时,要做的事情是将FrameBuffer的key变成可写的。于是有了write事件。

  1. 当有一个write事件时。

    调用SocketChannel的write方法将数据写回到客户端。

    trans_.write(buffer_)

trans_是怎样来的呢?

FrameBuffer的构造方法:

public FrameBuffer( final TNonblockingTransport trans,

final SelectionKey selectionKey) {

trans_ = trans;

selectionKey_ = selectionKey;

buffer_ = ByteBuffer.allocate(4);

}

也就是当selectThread_ handleAccept时,

client = (TNonblockingTransport)serverTransport.accept();

serverTransport 是TNonblockingServerSocket的一个实例,调用accept方法,最终会落在TNonblockingServerSocket的acceptImp()方法中,

在其父类TServerTransport accept方法里:

TTransport transport = acceptImpl();

在acceptImp()方法中

SocketChannel socketChannel = serverSocketChannel.accept();

TNonblockingSocket tsocket =

new TNonblockingSocket(socketChannel);

返回的是TNonblockingSocket 这个TTransport类的子类。

时间: 2024-11-13 23:06:34

Thrift源码分析阅读(一)的相关文章

Thrift源码分析阅读(二)

Thrift总结 总体结构: Server: ServerSocket监听请求,请求到达时,读取请求数据. 根据请求数据创建一个InputTransport,创建OutputTransport 根据InputTransport和OutputTransport创建相应的input protocol 和 output protocol. 依据protocol 创建processor,在processor中完成业务分发操作. 将请求信息结果数据写回客户端. Transport: 提供了一个读取/写入网

AtomicInteger 源码分析阅读

? 序 阅读java源码可能是每一个java程序员的必修课,只有知其所以然,才能更好的使用java,写出更优美的程序,阅读java源码也为我们后面阅读java框架的源码打下了基础.阅读源代码其实就像再看一篇长篇推理小说一样,不能急于求成,需要慢慢品味才行.这一系列的文章,记录了我阅读源码的收获与思路,读者也可以借鉴一下,也仅仅是借鉴,问渠那得清如许,绝知此事要躬行!要想真正的成为大神,还是需要自己亲身去阅读源码而不是看几篇分析源码的博客就可以的. 正文 最近在看JAVA1.8线程池源码的时候,发

Thrift源码分析(一)-- 基本概念

我所在的公司使用Thrift作为基础通信组件,相当一部分的RPC服务基于Thrift框架.公司的日UV在千万级别,Thrift很好地支持了高并发访问,并且Thrift相对简单地编程模型也提高了服务地开发效率. Thrift源于Facebook, 目前已经作为开源项目提交给了Apahce.Thrift解决了Facebook各系统的大数据量传输通信和内部不同语言环境的跨平台调用. Thrift的官方网站: http://thrift.apache.org/ 作为一个高性能的RPC框架,Thrift的

Thrift源码分析(四)-- 方法调用模型分析

RPC调用本质上就是一种网络编程,客户端向服务器发送消息,服务器拿到消息之后做后续动作.只是RPC这种消息比较特殊,它封装了方法调用,包括方法名,方法参数.服务端拿到这个消息之后,解码消息,然后要通过方法调用模型来完成实际服务器端业务方法的调用. 这篇讲讲Thrfit的方法调用模型.Thrift的方法调用模型很简单,就是通过方法名和实际方法实现类的注册完成,没有使用反射机制,类加载机制. 和方法调用相关的几个核心类: 1. 自动生成的Iface接口,是远程方法的顶层接口 2. 自动生成的Proc

Thrift源码分析(二)-- 协议和编解码

协议和编解码是一个网络应用程序的核心问题之一,客户端和服务器通过约定的协议来传输消息(数据),通过特定的格式来编解码字节流,并转化成业务消息,提供给上层框架调用. Thrift的协议比较简单,它把协议和编解码整合在了一起.抽象类TProtocol定义了协议和编解码的顶层接口.个人感觉采用抽象类而不是接口的方式来定义顶层接口并不好,TProtocol关联了一个TTransport传输对象,而不是提供一个类似getTransport()的接口,导致抽象类的扩展性比接口差. TProtocol主要做了

Thrift源码分析(六)-- Transport传输层分析

RPC作为一种特殊的网络编程,会封装一层传输层来支持底层的网络通信.Thrift使用了Transport来封装传输层,但Transport不仅仅是底层网络传输,它还是上层流的封装. 关于Transport的设计,从架构上看,IO流和网络流都是IO的范畴,用一个统一的接口来抽象并无不可,但是个人感觉看Thrift的代码时,都用的Transport来表示流,不知道是普通IO流还是底层的网络流.还不如用Java的方式,把普通IO和网络接口用不同抽象隔离,至少代码逻辑比较清晰 废话不多说,看看Trasp

Thrift源码分析(五)-- FrameBuffer类分析

FrameBuffer是Thrift NIO服务器端的一个核心组件,它一方面承担了NIO编程中的缓冲区的功能,另一方面还承担了RPC方法调用的职责. FrameBufferState定义了FrameBuffer作为缓冲区的读写状态 private enum FrameBufferState { // in the midst of reading the frame size off the wire // 读Frame消息头,实际是4字节表示Frame长度  READING_FRAME_SIZ

Thrift源码分析(三)-- IDL和生成代码分析

IDL是很多RPC框架用来支持跨语言环境调用的一个服务描述组件,一般都是采用文本格式来定义. 更多IDL的思考查看<理解WSDL, IDL> Thrift的不同版本定义IDL的语法也不太相同,这里使用Thrift-0.8.0这个版本来介绍Java下的IDL定义 1. namespace 定义包名 2. struct 定义服务接口的参数,返回值使用到的类结构.如果接口的参数都是基本类型,则不需要定义struct 3. service 定义接口 一个简单的例子,IDL文件以.thrift为后缀名.

Flume NG源码分析(五)使用ThriftSource通过RPC方式收集日志

上一篇说了利用ExecSource从本地日志文件异步的收集日志,这篇说说采用RPC方式同步收集日志的方式.笔者对Thrift比较熟悉,所以用ThriftSource来介绍RPC的日志收集方式. 整体的结构图如下: 1. ThriftSource包含了一个Thrift Server,以及一个Thrift Service服务的实现.这里的Thrift Service是由ThriftSourceProtocol定义 2. 应用程序调用Thrift Service的客户端,以RPC的方式将日志发送到Th