Thrift-0.9.3 代码实验

如果采用TThreadedSelectorServer模型的话。

整个网络IO模型基本上就是

Accept Thread Group --- Selector Thread Group --- ExecutorService

假设第2层的Selector拿到一个socket,并且读取了完整的数据,会抛给ExecutorService.

这是通过

/**

* Do the work required to read from a readable client. If the frame is

* fully read, then invoke the method call.

*/

protected void handleRead(SelectionKey key) {

FrameBuffer buffer = (FrameBuffer) key.attachment();

if (!buffer.read()) {

cleanupSelectionKey(key);

return;

}

// if the buffer‘s frame read is complete, invoke the method.

if (buffer.isFrameFullyRead()) {

if (!requestInvoke(buffer)) { ----这里

cleanupSelectionKey(key);

}

}

}

而requestInvoke是通过

/**

* We override the standard invoke method here to queue the invocation for

* invoker service instead of immediately invoking. If there is no thread

* pool, handle the invocation inline on this thread

*/

@Override

protected boolean requestInvoke(FrameBuffer frameBuffer) {

Runnable invocation = getRunnable(frameBuffer);

if (invoker != null) {

try {

invoker.execute(invocation);

return true;

} catch (RejectedExecutionException rx) {

LOGGER.warn("ExecutorService rejected execution!", rx);

return false;

}

} else {

// Invoke on the caller‘s thread

invocation.run();

return true;

}

}

通过第3层的ExecutorService执行后,如何抛给第2层?

void org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer.invoke()

/**

* Actually invoke the method signified by this FrameBuffer.

*/

public void invoke() {

frameTrans_.reset(buffer_.array());

response_.reset();

try {

if (eventHandler_ != null) {

eventHandler_.processContext(context_, inTrans_, outTrans_);

}

processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_);

responseReady();

return;

} catch (TException te) {

LOGGER.warn("Exception while invoking!", te);

} catch (Throwable t) {

LOGGER.error("Unexpected throwable while invoking!", t);

}

// This will only be reached when there is a throwable.

state_ = FrameBufferState.AWAITING_CLOSE;

requestSelectInterestChange();

}

--------------

/**

* After the processor has processed the invocation, whatever thread is

* managing invocations should call this method on this FrameBuffer so we

* know it‘s time to start trying to write again. Also, if it turns out that

* there actually isn‘t any data in the response buffer, we‘ll skip trying

* to write and instead go back to reading.

*/

public void responseReady() {

// the read buffer is definitely no longer in use, so we will decrement

// our read buffer count. we do this here as well as in close because

// we‘d like to free this read memory up as quickly as possible for other

// clients.

readBufferBytesAllocated.addAndGet(-buffer_.array().length);

if (response_.len() == 0) {

// go straight to reading again. this was probably an oneway method

state_ = FrameBufferState.AWAITING_REGISTER_READ;

buffer_ = null;

} else {

buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len());

// set state that we‘re waiting to be switched to write. we do this

// asynchronously through requestSelectInterestChange() because there is

// a possibility that we‘re not in the main thread, and thus currently

// blocked in select(). (this functionality is in place for the sake of

// the HsHa server.)

state_ = FrameBufferState.AWAITING_REGISTER_WRITE;

}

requestSelectInterestChange();

}

---

/**

* When this FrameBuffer needs to change its select interests and execution

* might not be in its select thread, then this method will make sure the

* interest change gets done when the select thread wakes back up. When the

* current thread is this FrameBuffer‘s select thread, then it just does the

* interest change immediately.

*/

protected void requestSelectInterestChange() {

if (Thread.currentThread() == this.selectThread_) {

changeSelectInterests();

} else {

this.selectThread_.requestSelectInterestChange(this);

}

}

===

/**

* Add FrameBuffer to the list of select interest changes and wake up the

* selector if it‘s blocked. When the select() call exits, it‘ll give the

* FrameBuffer a chance to change its interests.

*/

public void requestSelectInterestChange(FrameBuffer frameBuffer) {

synchronized (selectInterestChanges) {

selectInterestChanges.add(frameBuffer);

}

// wakeup the selector, if it‘s currently blocked.

selector.wakeup();

}

===

这样就注册到了第2层的Selector Thread Group.

第二层如何处理?

/**

* The work loop. Handles selecting (read/write IO), dispatching, and

* managing the selection preferences of all existing connections.

*/

public void run() {

try {

while (!stopped_) {

select();

processAcceptedConnections();

processInterestChanges();

}

for (SelectionKey selectionKey : selector.keys()) {

cleanupSelectionKey(selectionKey);

}

} catch (Throwable t) {

LOGGER.error("run() on SelectorThread exiting due to uncaught error", t);

} finally {

try {

selector.close();

} catch (IOException e) {

LOGGER.error("Got an IOException while closing selector!", e);

}

// This will wake up the accept thread and the other selector threads

TThreadedSelectorServer.this.stop();

}

}

---

/**

* Check to see if there are any FrameBuffers that have switched their

* interest type from read to write or vice versa.

*/

protected void processInterestChanges() {

synchronized (selectInterestChanges) {

for (FrameBuffer fb : selectInterestChanges) {

fb.changeSelectInterests();

}

selectInterestChanges.clear();

}

}

===

/**

* Give this FrameBuffer a chance to set its interest to write, once data

* has come in.

*/

public void changeSelectInterests() {

if (state_ == FrameBufferState.AWAITING_REGISTER_WRITE) {

// set the OP_WRITE interest

selectionKey_.interestOps(SelectionKey.OP_WRITE);

state_ = FrameBufferState.WRITING;

} else if (state_ == FrameBufferState.AWAITING_REGISTER_READ) {

prepareRead();

} else if (state_ == FrameBufferState.AWAITING_CLOSE) {

close();

selectionKey_.cancel();

} else {

LOGGER.error("changeSelectInterest was called, but state is invalid (" + state_ + ")");

}

}

这里就注册了写事件。

然后如何写出去?

/**

* The work loop. Handles selecting (read/write IO), dispatching, and

* managing the selection preferences of all existing connections.

*/

public void run() {

try {

while (!stopped_) {

select();

processAcceptedConnections();

processInterestChanges();

}

for (SelectionKey selectionKey : selector.keys()) {

cleanupSelectionKey(selectionKey);

}

} catch (Throwable t) {

LOGGER.error("run() on SelectorThread exiting due to uncaught error", t);

} finally {

try {

selector.close();

} catch (IOException e) {

LOGGER.error("Got an IOException while closing selector!", e);

}

// This will wake up the accept thread and the other selector threads

TThreadedSelectorServer.this.stop();

}

}

===

/**

* Select and process IO events appropriately: If there are existing

* connections with data waiting to be read, read it, buffering until a

* whole frame has been read. If there are any pending responses, buffer

* them until their target client is available, and then send the data.

*/

private void select() {

try {

// wait for io events.

selector.select();

// process the io events we received

Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();

while (!stopped_ && selectedKeys.hasNext()) {

SelectionKey key = selectedKeys.next();

selectedKeys.remove();

// skip if not valid

if (!key.isValid()) {

cleanupSelectionKey(key);

continue;

}

if (key.isReadable()) {

// deal with reads

handleRead(key);

} else if (key.isWritable()) {

// deal with writes

handleWrite(key);

} else {

LOGGER.warn("Unexpected state in select! " + key.interestOps());

}

}

} catch (IOException e) {

LOGGER.warn("Got an IOException while selecting!", e);

}

}

===

/**

* Let a writable client get written, if there‘s data to be written.

*/

protected void handleWrite(SelectionKey key) {

FrameBuffer buffer = (FrameBuffer) key.attachment();

if (!buffer.write()) {

cleanupSelectionKey(key);

}

}

===

/**

* Give this FrameBuffer a chance to write its output to the final client.

*/

public boolean write() {

if (state_ == FrameBufferState.WRITING) {

try {

if (trans_.write(buffer_) < 0) {

return false;

}

} catch (IOException e) {

LOGGER.warn("Got an IOException during write!", e);

return false;

}

// we‘re done writing. now we need to switch back to reading.

if (buffer_.remaining() == 0) {

prepareRead();

}

return true;

}

LOGGER.error("Write was called, but state is invalid (" + state_ + ")");

return false;

}

===

/**

* Perform a nonblocking write of the data in buffer;

*/

public int write(ByteBuffer buffer) throws IOException {

return socketChannel_.write(buffer);

}

===

这里就写数据了。

如果碰到这样的问题:

写之前,客户端主动关闭了连接怎么办?

经过测试:

/**

* Give this FrameBuffer a chance to write its output to the final client.

*/

public boolean write() {

if (state_ == FrameBufferState.WRITING) {

try {

if (trans_.write(buffer_) < 0) {

return false;

}

} catch (IOException e) {

LOGGER.warn("Got an IOException during write!", e);

return false;

}

// we‘re done writing. now we need to switch back to reading.

if (buffer_.remaining() == 0) {

prepareRead();

}

return true;

}

LOGGER.error("Write was called, but state is invalid (" + state_ + ")");

return false;

}

这里会抛出异常。

然后返回false.

返回false,然后呢?

/**

* Let a writable client get written, if there‘s data to be written.

*/

protected void handleWrite(SelectionKey key) {

FrameBuffer buffer = (FrameBuffer) key.attachment();

if (!buffer.write()) {

cleanupSelectionKey(key);

}

}

表明出错了,清理此key.

===

/**

* Do connection-close cleanup on a given SelectionKey.

*/

protected void cleanupSelectionKey(SelectionKey key) {

// remove the records from the two maps

FrameBuffer buffer = (FrameBuffer) key.attachment();

if (buffer != null) {

// close the buffer

buffer.close();

}

// cancel the selection key

key.cancel();

}

清除key,清除key对应的缓存,一切不复存在。

下班!

时间: 2024-10-04 10:56:28

Thrift-0.9.3 代码实验的相关文章

【翻】Android Design Support Library 的 代码实验——几行代码,让你的 APP 变得花俏

译者地址:[翻]Android Design Support Library 的 代码实验--几行代码,让你的 APP 变得花俏 原文:Codelab for Android Design Support Library used in I/O Rewind Bangkok session--Make your app fancy with few lines of code 原文项目 demo: Lab-Android-DesignLibrary 双语对照地址: [翻-双语]Android D

Android Design Support Library 的 代码实验——几行代码,让你的 APP 变得花俏

译者地址:[翻]Android Design Support Library 的 代码实验——几行代码,让你的 APP 变得花俏 原文:Codelab for Android Design Support Library used in I/O Rewind Bangkok session----Make your app fancy with few lines of code 原文项目 demo: Lab-Android-DesignLibrary 双语对照地址: [翻-双语]Android

【转】【翻】Android Design Support Library 的 代码实验——几行代码,让你的 APP 变得花俏

转自:http://mrfufufu.github.io/android/2015/07/01/Codelab_Android_Design_Support_Library.html [翻]Android Design Support Library 的 代码实验——几行代码,让你的 APP 变得花俏 Jul 1, 2015 译者地址:[翻]Android Design Support Library 的 代码实验——几行代码,让你的 APP 变得花俏 原文:Codelab for Androi

阿里云抢月饼代码实验,自己写着玩的

阿里4名工程师因刷月饼被开除.软件抢票.优惠秒杀...各种抢. 但是我还不知道怎么抢,所以我就模拟了一下抢月饼的流程. ps:自己不是专业前端,纯属娱乐!!! 1 <!DOCTYPE html> 2 <html> 3 <head> 4 <meta charset="utf-8"> 5 <title>阿里云抢月饼代码实验</title> 6 <script src="jquery-1.11.3.min

ubuntu thrift 0.9.3编译安装

Table of Contents 1. 下载thrift源代码 2. 编译并安装 3. 运行测试程序 4. 安装 1 下载thrift源代码 git clone https://git-wip-us.apache.org/repos/asf/thrift.git thrift git checkout 0.9.3 2 编译并安装 安装依赖 apt-get install automake apt-get install libssl-dev apt-get install byacc apt-

Android4.0图库Gallery2代码分析(二) 数据管理和数据加载

Android4.0图库Gallery2代码分析(二) 数据管理和数据加载 2012-09-07 11:19 8152人阅读 评论(12) 收藏 举报 代码分析android相册优化工作 Android4.0图库Gallery2代码分析(二) 数据管理和数据加载 一 图库数据管理 Gallery2的数据管理 DataManager(职责:管理数据源)- MediaSource(职责:管理数据集) - MediaSet(职责:管理数据项).DataManager中初始化所有的数据源(LocalSo

7、Cocos2dx 3.0游戏开发找小三之3.0版本的代码风格

重开发者的劳动成果,转载的时候请务必注明出处:http://blog.csdn.net/haomengzhu/article/details/27691337 Cocos2d-x代码风格 前面我们已经多次提到 Cocos2d-x 源自于 Cocos2d-iPhone.Cocos2d-iPhone 是一个十分出色的游戏引擎,许多优秀的 iOS平面游戏都基于 Cocos2d-iPhone 开发,而它的实现语言是 Objective-C.因此,Cocos2d-x 也就沿袭了 Objective-C 的

Xcode8.0 在编辑代码时提示警告 Implicit conversion loses integer precision: &#39;NSInteger&#39; (aka &#39;long&#39;) to

Implicit conversion loses integer precision: 'NSInteger' (aka 'long') to -Wno-shorten-64-to-32 Xcode8.0 在编辑代码时提示警告 Implicit conversion loses integer precision: 'NSInteger' (aka 'long') to

PHP Zend Studio9.0怎么把代码搞成和服务器端的同步(就是直接在服务器端修改)

Zend Studio 可以直接通过Remote System的方式直接连接服务器端的代码,就是可以直接修改服务器端的代码,不过修改的时间小心点,修改就会立即生效的. 选择Remote Systems 后,在下面就会出现下图 把这个tab移动到左边,并在左侧区域点击右键,出现下图,选择 new connection... 输入IP后 next 点 My Home会出现让你输入密码的对话框 属于密码就出现服务器端的 目录结构了,注意不要乱删除东西哦,,,, 这个时候你发现你打开的.php文件是以记

7、Cocos2dx 3.0游戏开发找小三之3.0版本号的代码风格

重开发人员的劳动成果,转载的时候请务必注明出处:http://blog.csdn.net/haomengzhu/article/details/27691337 Cocos2d-x代码风格 前面我们已经多次提到 Cocos2d-x 源自于 Cocos2d-iPhone.Cocos2d-iPhone 是一个十分出色的游戏引擎,很多优秀的 iOS平面游戏都基于 Cocos2d-iPhone 开发,而它的实现语言是 Objective-C.因此,Cocos2d-x 也就沿袭了 Objective-C