如果采用TThreadedSelectorServer模型的话。
整个网络IO模型基本上就是
Accept Thread Group --- Selector Thread Group --- ExecutorService
假设第2层的Selector拿到一个socket,并且读取了完整的数据,会抛给ExecutorService.
这是通过
/**
* Do the work required to read from a readable client. If the frame is
* fully read, then invoke the method call.
*/
protected void handleRead(SelectionKey key) {
FrameBuffer buffer = (FrameBuffer) key.attachment();
if (!buffer.read()) {
cleanupSelectionKey(key);
return;
}
// if the buffer‘s frame read is complete, invoke the method.
if (buffer.isFrameFullyRead()) {
if (!requestInvoke(buffer)) { ----这里
cleanupSelectionKey(key);
}
}
}
而requestInvoke是通过
/**
* We override the standard invoke method here to queue the invocation for
* invoker service instead of immediately invoking. If there is no thread
* pool, handle the invocation inline on this thread
*/
@Override
protected boolean requestInvoke(FrameBuffer frameBuffer) {
Runnable invocation = getRunnable(frameBuffer);
if (invoker != null) {
try {
invoker.execute(invocation);
return true;
} catch (RejectedExecutionException rx) {
LOGGER.warn("ExecutorService rejected execution!", rx);
return false;
}
} else {
// Invoke on the caller‘s thread
invocation.run();
return true;
}
}
通过第3层的ExecutorService执行后,如何抛给第2层?
void org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer.invoke()
/**
* Actually invoke the method signified by this FrameBuffer.
*/
public void invoke() {
frameTrans_.reset(buffer_.array());
response_.reset();
try {
if (eventHandler_ != null) {
eventHandler_.processContext(context_, inTrans_, outTrans_);
}
processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_);
responseReady();
return;
} catch (TException te) {
LOGGER.warn("Exception while invoking!", te);
} catch (Throwable t) {
LOGGER.error("Unexpected throwable while invoking!", t);
}
// This will only be reached when there is a throwable.
state_ = FrameBufferState.AWAITING_CLOSE;
requestSelectInterestChange();
}
--------------
/**
* After the processor has processed the invocation, whatever thread is
* managing invocations should call this method on this FrameBuffer so we
* know it‘s time to start trying to write again. Also, if it turns out that
* there actually isn‘t any data in the response buffer, we‘ll skip trying
* to write and instead go back to reading.
*/
public void responseReady() {
// the read buffer is definitely no longer in use, so we will decrement
// our read buffer count. we do this here as well as in close because
// we‘d like to free this read memory up as quickly as possible for other
// clients.
readBufferBytesAllocated.addAndGet(-buffer_.array().length);
if (response_.len() == 0) {
// go straight to reading again. this was probably an oneway method
state_ = FrameBufferState.AWAITING_REGISTER_READ;
buffer_ = null;
} else {
buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len());
// set state that we‘re waiting to be switched to write. we do this
// asynchronously through requestSelectInterestChange() because there is
// a possibility that we‘re not in the main thread, and thus currently
// blocked in select(). (this functionality is in place for the sake of
// the HsHa server.)
state_ = FrameBufferState.AWAITING_REGISTER_WRITE;
}
requestSelectInterestChange();
}
---
/**
* When this FrameBuffer needs to change its select interests and execution
* might not be in its select thread, then this method will make sure the
* interest change gets done when the select thread wakes back up. When the
* current thread is this FrameBuffer‘s select thread, then it just does the
* interest change immediately.
*/
protected void requestSelectInterestChange() {
if (Thread.currentThread() == this.selectThread_) {
changeSelectInterests();
} else {
this.selectThread_.requestSelectInterestChange(this);
}
}
===
/**
* Add FrameBuffer to the list of select interest changes and wake up the
* selector if it‘s blocked. When the select() call exits, it‘ll give the
* FrameBuffer a chance to change its interests.
*/
public void requestSelectInterestChange(FrameBuffer frameBuffer) {
synchronized (selectInterestChanges) {
selectInterestChanges.add(frameBuffer);
}
// wakeup the selector, if it‘s currently blocked.
selector.wakeup();
}
===
这样就注册到了第2层的Selector Thread Group.
第二层如何处理?
/**
* The work loop. Handles selecting (read/write IO), dispatching, and
* managing the selection preferences of all existing connections.
*/
public void run() {
try {
while (!stopped_) {
select();
processAcceptedConnections();
processInterestChanges();
}
for (SelectionKey selectionKey : selector.keys()) {
cleanupSelectionKey(selectionKey);
}
} catch (Throwable t) {
LOGGER.error("run() on SelectorThread exiting due to uncaught error", t);
} finally {
try {
selector.close();
} catch (IOException e) {
LOGGER.error("Got an IOException while closing selector!", e);
}
// This will wake up the accept thread and the other selector threads
TThreadedSelectorServer.this.stop();
}
}
---
/**
* Check to see if there are any FrameBuffers that have switched their
* interest type from read to write or vice versa.
*/
protected void processInterestChanges() {
synchronized (selectInterestChanges) {
for (FrameBuffer fb : selectInterestChanges) {
fb.changeSelectInterests();
}
selectInterestChanges.clear();
}
}
===
/**
* Give this FrameBuffer a chance to set its interest to write, once data
* has come in.
*/
public void changeSelectInterests() {
if (state_ == FrameBufferState.AWAITING_REGISTER_WRITE) {
// set the OP_WRITE interest
selectionKey_.interestOps(SelectionKey.OP_WRITE);
state_ = FrameBufferState.WRITING;
} else if (state_ == FrameBufferState.AWAITING_REGISTER_READ) {
prepareRead();
} else if (state_ == FrameBufferState.AWAITING_CLOSE) {
close();
selectionKey_.cancel();
} else {
LOGGER.error("changeSelectInterest was called, but state is invalid (" + state_ + ")");
}
}
这里就注册了写事件。
然后如何写出去?
/**
* The work loop. Handles selecting (read/write IO), dispatching, and
* managing the selection preferences of all existing connections.
*/
public void run() {
try {
while (!stopped_) {
select();
processAcceptedConnections();
processInterestChanges();
}
for (SelectionKey selectionKey : selector.keys()) {
cleanupSelectionKey(selectionKey);
}
} catch (Throwable t) {
LOGGER.error("run() on SelectorThread exiting due to uncaught error", t);
} finally {
try {
selector.close();
} catch (IOException e) {
LOGGER.error("Got an IOException while closing selector!", e);
}
// This will wake up the accept thread and the other selector threads
TThreadedSelectorServer.this.stop();
}
}
===
/**
* Select and process IO events appropriately: If there are existing
* connections with data waiting to be read, read it, buffering until a
* whole frame has been read. If there are any pending responses, buffer
* them until their target client is available, and then send the data.
*/
private void select() {
try {
// wait for io events.
selector.select();
// process the io events we received
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// skip if not valid
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
if (key.isReadable()) {
// deal with reads
handleRead(key);
} else if (key.isWritable()) {
// deal with writes
handleWrite(key);
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
}
===
/**
* Let a writable client get written, if there‘s data to be written.
*/
protected void handleWrite(SelectionKey key) {
FrameBuffer buffer = (FrameBuffer) key.attachment();
if (!buffer.write()) {
cleanupSelectionKey(key);
}
}
===
/**
* Give this FrameBuffer a chance to write its output to the final client.
*/
public boolean write() {
if (state_ == FrameBufferState.WRITING) {
try {
if (trans_.write(buffer_) < 0) {
return false;
}
} catch (IOException e) {
LOGGER.warn("Got an IOException during write!", e);
return false;
}
// we‘re done writing. now we need to switch back to reading.
if (buffer_.remaining() == 0) {
prepareRead();
}
return true;
}
LOGGER.error("Write was called, but state is invalid (" + state_ + ")");
return false;
}
===
/**
* Perform a nonblocking write of the data in buffer;
*/
public int write(ByteBuffer buffer) throws IOException {
return socketChannel_.write(buffer);
}
===
这里就写数据了。
如果碰到这样的问题:
写之前,客户端主动关闭了连接怎么办?
经过测试:
/**
* Give this FrameBuffer a chance to write its output to the final client.
*/
public boolean write() {
if (state_ == FrameBufferState.WRITING) {
try {
if (trans_.write(buffer_) < 0) {
return false;
}
} catch (IOException e) {
LOGGER.warn("Got an IOException during write!", e);
return false;
}
// we‘re done writing. now we need to switch back to reading.
if (buffer_.remaining() == 0) {
prepareRead();
}
return true;
}
LOGGER.error("Write was called, but state is invalid (" + state_ + ")");
return false;
}
这里会抛出异常。
然后返回false.
返回false,然后呢?
/**
* Let a writable client get written, if there‘s data to be written.
*/
protected void handleWrite(SelectionKey key) {
FrameBuffer buffer = (FrameBuffer) key.attachment();
if (!buffer.write()) {
cleanupSelectionKey(key);
}
}
表明出错了,清理此key.
===
/**
* Do connection-close cleanup on a given SelectionKey.
*/
protected void cleanupSelectionKey(SelectionKey key) {
// remove the records from the two maps
FrameBuffer buffer = (FrameBuffer) key.attachment();
if (buffer != null) {
// close the buffer
buffer.close();
}
// cancel the selection key
key.cancel();
}
清除key,清除key对应的缓存,一切不复存在。
下班!