Thrift -Storm篇
从Nimbus启动说起:
当用户通过命令启动nimbus时,Classloader将会找到一个称之为bytetype.storm.daemon.nimbus的一个class文件,这个是由numbis.clj文件编译而成,来看nimbus.clj这个的启动方法:
(defn -main []
(-launch (standalone-nimbus)))
(standalone-nimbus) 执行这个方法返回一个INimbus接口的实例
执行launch(INimbus)这个方法,这个方法也在nimbus.clj上定义
(defn -launch [nimbus]
;; read-storm-config 在util.clj中定义
(launch-server! (read-storm-config) nimbus))
(read-storm-config)返回配置文件信息,如在storm.yaml上的定义,得到的是一个map实例。
接着我们执行launch-server!这个方法,执行流程:
- 检查启动模式是否是分布式模式
- 创建一个service-handler的一个句柄,这个用来处理Nimbus上的各种业务逻辑,如提交一个topology, kill topology等
- 配置服务器,如nimbus服务端口,处理线程池大小,业务处理分发器处理业务逻辑的service-handler服务句柄。
- 根据配置服务器的参数,创建nimbus服务,服务是一个THsHaServer。
- 添加主线程shutdown时的回调函数。
- 开启服务。
至此,这边完成了nimbus开启服务的整个过程。
重点过程在第2步和 3 4 6步
首先我们将注意力集中在第3 4 6步骤,观察nimbus如何提供各种服务。对应的clojure代码:
(let [
第3步:
options (-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))
(THsHaServer$Args.)
(.workerThreads 64)
(.protocolFactory
(TBinaryProtocol$Factory. false true (conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)))
(.processor (Nimbus$Processor. service-handler))
第4步:
server (THsHaServer.
(do (set! (. options maxReadBufferBytes)
(conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)) options))]
第6步:
(.serve server)))
我们从第4步直接查看THsHaServer的构造方法:
private ExecutorService invoker;
public THsHaServer(Args args) {
super(args);
invoker = args.executorService == null ? createInvokerPool(args) : args.executorService;
}
这里的args就相当于clojure代码中的options变量,由于在options中没有设置executorService这个变量,那么将调用createInvokerPool(args)这个方法线程池。
/**
* Helper to create an invoker pool
*/
protected static ExecutorService createInvokerPool(Args options) {
int workerThreads = options.workerThreads;
int stopTimeoutVal = options.stopTimeoutVal;
TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;
LinkedBlockingQueue<Runnable> queue =
new LinkedBlockingQueue<Runnable>();
ExecutorService invoker =
new ThreadPoolExecutor(workerThreads, workerThreads,
stopTimeoutVal, stopTimeoutUnit, queue);
return invoker;
}
这里就十分看清楚了,根据设置,返回的是一个为64的固定大小的线程池。
第6步,调用THsHaServer的serve方法时,
/** @inheritDoc */
@Override
public void serve() {
if (!startSelectorThread()) {
return;
}
// this will block while we serve
joinSelector();
}
我们重点关注startSelectorThread 和 joinSelector 两个方法。
- startSelectorThread()
startSelectorThread()方法是父类 TNonblockingServer的一个方
private SelectThread selectThread_;
protected boolean startSelectorThread() {
// start the selector
selectThread_ =new
SelectThread((TNonblockingServerTransport)serverTransport_);
stopped_ = false;
selectThread_.start();
return true;
} catch (IOException e) {
LOGGER.error("Failed to start selector thread!", e);
return false;
}
}
这个serverTransport_是怎么来的呢?
首先看clojure代码:
-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))
(THsHaServer$Args.)
在创建Args时,不停的调用父类的构造方法,同时,在创建THsHaServer时,也在不停的调用父类的构造,最终我们在TServer的构造方法里:
public abstract class TServer {
public static class Args extends AbstractServerArgs<Args> {
public Args(TServerTransport transport) {
super(transport);
}
}
public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> {
final TServerTransport serverTransport;
public AbstractServerArgs(TServerTransport transport) {
serverTransport = transport;
}
protected TServer(AbstractServerArgs args) {
serverTransport_ = args.serverTransport;
}
也就是说serverTransport_也就是TNonblockingServerSocket的一个实例。
SelectThread 是一个Thread的子类,在其构造方法中,完成了TNonblockingServerSocket实例注册监听器的过程。
this.selector = SelectorProvider.provider().openSelector();
serverTransport.registerSelector(selector);
开启SelectThread线程后,
- 当有一个accept事件时。
- 服务端ServerSocket接收该事件,并返回一个TNonblockingSocket对象,在这个对象中封装了接收该事件的端口信息SocketChannel。并将该端口信息注册到THsHaServer的Selector对象中,并标记为可读。
- 创建一个FrameBuffer的实例,绑定在SelectionKey中
clientKey = client.registerSelector(selector, SelectionKey.OP_READ)
// add this key to the map
FrameBuffer frameBuffer = new FrameBuffer(client, clientKey);
clientKey.attach(frameBuffer);
以后对应的输入输出都将发生在这个端口中。因此是TTransport的一个实现。
- 当有一个read事件时。
- 得到SelectionKey绑定的FrameBuffer对象,
FrameBuffer buffer = (FrameBuffer)key.attachment();
- buffer.read()
- 读取framesize的大小,占四个字节。
这里有个很有意思的控制实例读取buffer的大小,使用一个private final AtomicLong readBufferBytesAllocated = new AtomicLong(0); 成员变量来控制。这个好!
- 读取framesize的大小,占四个字节。
- 得到SelectionKey绑定的FrameBuffer对象,
// if this frame will always be too large for this server, log the // error and close the connection.
if (frameSize > MAX_READ_BUFFER_BYTES) {
LOGGER.error("Read a frame size of " + frameSize
+ ", which is bigger than the maximum allowable buffer size for ALL connections.");
return false;
}
// if this frame will push us over the memory limit, then return.
// with luck, more memory will free up the next time around.
if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) {
return true;
}
// increment the amount of memory allocated to read buffers
readBufferBytesAllocated.addAndGet(frameSize);
- 开辟一个framesize大小的bytebuffer,改变状态。
// reallocate the readbuffer as a frame-sized buffer
buffer_ = ByteBuffer.allocate(frameSize);
state_ = READING_FRAME;
- 读取frame
- 如果完全读取事件流,则调用requestInvoke(buffer)
- 调用THsHaServer中requestInvoke(buffer)方法,在线程池的调用句柄中执行frameBuffer.invoke()方法。此时算是一个异步调用的过程。
public void run() {
frameBuffer.invoke();
}
- frameBuffer.invoke()
到此我们可以理解,我们已经完全的接收了用户的输入信息,下一步就是如何解析输入信息以及如何来处理反馈这个请求信息。
invoke代码:
TTransport inTrans = getInputTransport();
TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans);
TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());
processorFactory_.getProcessor(inTrans).process(inProt, outProt);
getInputTransport():
private TTransport getInputTransport() {
return new TMemoryInputTransport(buffer_.array());
}
将用户的输入信息交给TMemoryInputTransport 来处理
这句clojure代码:
(.protocolFactory
(TBinaryProtocol$Factory. false true (conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)))
在TServer中有这样的代码:
public T protocolFactory(TProtocolFactory factory) {
this.inputProtocolFactory = factory;
this.outputProtocolFactory = factory;
return (T) this;
}
也就是inputProtocolFactory_ 是TBinaryProtocol$Factory的一个实例,
inputProtocolFactory_.getProtocol(inTrans);返回TBinaryProtocol一个实例。
processorFactory_.getProcessor(inTrans) 对应的clojure代码其实就
是:
(.processor (Nimbus$Processor. service-handler))
我们再来看看Nimbus$Processor:
public static class Processor<I extends Iface>
extends org.apache.thrift.TBaseProcessor implements org.apache.thrift.TProcessor
调用TProcess的process(inProt, outProt)方法其实调用的是Nimbus.Processor的父类TBaseProcessor的方法:
在TBaseProcessor类中,保存了一个请求与处理业务逻辑的映射关系:
Map<String,ProcessFunction<I, ? extends TBase>> processMap;
由构造方法中注入这种对应的关系,也就是action –>业务逻辑,这里由用户实现。
在TBaseProcessor方法中,首先读取action,根据action查找出对应的业务逻辑 ProcessFunction:
TMessage msg = in.readMessageBegin();
ProcessFunction fn = processMap.get(msg.name);
由上文可知,这里的in就是指TBinaryProtocol实例。
如果存在对应的业务逻辑处理器,那么调用ProcessFunction的:
fn.process(msg.seqid, in, out, iface);
我们在回过头来观察Nimbus.Processor,看他究竟注入了多少个业务逻辑.
processMap.put("submitTopology", new submitTopology());
我们就以此为例。
submitTopology类继承了ProcessFunction,调用submitTopology类的process方法其实是调用了ProcessFunction的process方法。
在ProcessFunction中,提供两个抽象交给用户实现,一是如何根据请求信息形成一个请求对象,二是如何处理这个请求细节以及以及返回什么样的结果,这个是依据项目的不同而不同(细节),但都有类似的行为:
请求返回什么样的对象,T extends TBase:
T args = getEmptyArgsInstance();
如何根据请求初始化对象:
args.read(iprot);
处理请求,以及返回什么样的结果:
TBase result = getResult(iface, args);
回到这里,这里的iface究竟是什么含义呢?我们看看clojure代码:
Nimbus$Processor. service-handler
Nimbus.Processor的构造函数 I extends Iface:
public Processor(I iface) {
super(iface,…);
}
这里的Iface是Nimbus类中定义的一个接口,里面包含了一些处理业务逻辑的细节。
而service-handler实现了Nimbus.Iface接口。
由此我们完成了处理请求的一个完整的过程,下面是TBinaryProtocol字节码格式的定义。
TBinaryProtocol的输入是一个buffer字节数组。
TBinaryProtocol 传输字节码格式,当不用匹配TBinaryProtocol的版本时:
4 byte |
4 byte |
Msg |
1 byte |
4 byte |
1byte |
2byte |
4byte |
msg |
整个frame的大小 |
开始的四个字节表示后面msg的大小,msg表示action的名字,用于事件分发操作,后面的一个字节表示msg的类型,最后4个字节表示seqid,也就是msg的编号,message begin。 |
这个区域表示用户输入的域,也就是参数,第一个字节表示域的类型,随后2个字节表示域的含义,由用户定义,4个字节表示msg的大小,最后是msg内容实体。 |
当需要匹配TBinaryProtocol的版本时,此时message begin变成:
4byte |
4byte |
msg |
4byte |
开始的四个字节表示VERSION_1 | message.type的值,其中messge.type为byte类型,随后的四个字节表示后面msg的大小,msg表示action的名字,用于事件分发操作,最后4个字节表示seqid,也就是msg的编号。 |
msg的类型有17中,定义在TType的类中。
那么,如何反馈一个请求呢?
由以上可知,当处理完业务逻辑时,我们得到TBase实例的反馈结果。
回头看FrameBuffer调用invoke()方法的这句:
TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());
getOutputTransport():
private TTransport getOutputTransport() {
response_ = new TByteArrayOutputStream();
// TBinaryProtocol protocol;
return outputTransportFactory_.getTransport(
new TIOStreamTransport(response_));
}
outputTransportFactory_对象中,是由TNonblockingServer 的类
AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {
super(transport);
transportFactory(new TFramedTransport.Factory());
}
因此getOutputTransport() 返回的是TFramedTransport这个对象,这个对象封装了TIOStreamTransport (ByteArrayOutputStream)这个实例。
因此TBinaryProtocol的输出流(response)是一个TFramedTransport这个对象。
在ProcessFunction的process方法中,我们调用
oprot.getTransport().flush();
输出时候,此时调用TFramedTransport的flush方法:
@Override
public void flush() throws TTransportException {
byte[] buf = writeBuffer_.get();
int len = writeBuffer_.len();
writeBuffer_.reset();
encodeFrameSize(len, i32buf);
transport_.write(i32buf, 0, 4);
transport_.write(buf, 0, len);
transport_.flush();
}
将frame大小信息以及信息实体写入到ByteArrayOutputStream当中。此时我们得到了结果字节流,但没有将这些信息写入到客户端中。
此时,要做的事情是将FrameBuffer的key变成可写的。于是有了write事件。
- 当有一个write事件时。
调用SocketChannel的write方法将数据写回到客户端。
trans_.write(buffer_)
trans_是怎样来的呢?
FrameBuffer的构造方法:
public FrameBuffer( final TNonblockingTransport trans,
final SelectionKey selectionKey) {
trans_ = trans;
selectionKey_ = selectionKey;
buffer_ = ByteBuffer.allocate(4);
}
也就是当selectThread_ handleAccept时,
client = (TNonblockingTransport)serverTransport.accept();
serverTransport 是TNonblockingServerSocket的一个实例,调用accept方法,最终会落在TNonblockingServerSocket的acceptImp()方法中,
在其父类TServerTransport accept方法里:
TTransport transport = acceptImpl();
在acceptImp()方法中
SocketChannel socketChannel = serverSocketChannel.accept();
TNonblockingSocket tsocket =
new TNonblockingSocket(socketChannel);
返回的是TNonblockingSocket 这个TTransport类的子类。