Processor 是AbstractPollingIoProcessor 抽象类内部的私有类。
Processor 是一个处理I/O输出输出数据流的主要循环主体。
Processor 主要工作包括:
1、 优先处理新IoSession 的逻辑处理
2、抖动处理
3、IoSession 的输入输出工作。
新IoSession 的逻辑处理
//循环 private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>(); 并发队列,如果存在新IoSession就处理
/**
* Loops over the new sessions blocking queue and returns the number of
* sessions which are effectively created
*
* @return The number of new sessions
*/
private int handleNewSessions()
{
int addedSessions = 0;
//Acceptor 线程把新链接通过并发队列放入到IoProccessor线程中。IoProccessor线程优先新链接的socket
for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
if (addNow(session)) {
// A new session has been created
addedSessions++;
}
}
return addedSessions;
}
//与Channel相关联的IoSession ,设置为非阻塞模式。并且注册SelectionKey.OP_READ在Selector中。
protected void init(NioSession session) throws Exception {
SelectableChannel ch = (SelectableChannel) session.getChannel();
ch.configureBlocking(false);
session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
}
/**
* Process a new session :
* - initialize it
* - create its chain
* - fire the CREATED listeners if any
*
* @param session The session to create
* @return true if the session has been registered
*/
private boolean addNow(S session) {
boolean registered = false;
//处理一个新socket的流程,初始化IoSession,创建一个handler chain 和创建监听器
try {
init(session);//初始化IoSession
registered = true;
//建立IoSession相关联的Chain Filter 用来处理数据的逻辑
// Build the filter chain of this session.
IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
chainBuilder.buildFilterChain(session.getFilterChain());
//在这里处理IoSession 的创建等事件。这里为监听着模式。
// DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
// in AbstractIoFilterChain.fireSessionOpened().
// Propagate the SESSION_CREATED event up to the chain
IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
listeners.fireSessionCreated(session);
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
try {
destroy(session);
} catch (Exception e1) {
ExceptionMonitor.getInstance().exceptionCaught(e1);
} finally {
registered = false;
}
}
return registered;
}
//新IoSession逻辑处理完后,是对IoSession的主要逻辑处理。包括数据的读取和写入操作。
/**
* Deal with session ready for the read or write operations, or both.
*/
private void process(S session)
{
// Process Reads
//真正执行IO读写操作的地方
if (isReadable(session) && !session.isReadSuspended())
{
//执行IO读取操作
read(session);
}
// Process writes
if (isWritable(session) && !session.isWriteSuspended())
{
// add the session to the queue, if it‘s not already there
if (session.setScheduledForFlush(true))
{
flushingSessions.add(session);
}
}
}
//下面为IoProccessor 关于读取数据的主要操作。在读取过程中Apache Mina根据每次对于数据的读取量来控制下一次IoBuffer大小数量。
在I/O读取过程中,因为Java不能直接处理内存数据。容器每次都需要内堆内存申请,这样对JAVA GC压力较大。在JAVA NIO中提供了一个IoBuff是对底层内存的一种映射。
封装了对于底层内存的关联。有利于提供JAVA对于网络输入输出的处理能力。在IoProcessor 中,仅仅处理对数据的读取和写入,不处理数据的逻辑。包括分包,包结构处理。
//在IoProcessor中真正执行IO读取操作的地方
private void read(S session)
{ //IoSession有关数据输入输出都有配置。包括缓冲区大小设置等
IoSessionConfig config = session.getConfig();
//socket中可以读取字节数量
int bufferSize = config.getReadBufferSize();
IoBuffer buf = IoBuffer.allocate(bufferSize);
//判断数据包是否分包
final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
try {
int readBytes = 0;
int ret;
try {
if (hasFragmentation) {
while ((ret = read(session, buf)) > 0) {
readBytes += ret;
if (!buf.hasRemaining()) {
break;
}
}
} else {
ret = read(session, buf);
if (ret > 0) {
readBytes = ret;
}
}
} finally {
buf.flip();
}
if (readBytes > 0)
{
//在session相关联的FilterChain中相关联的过滤链。通过预先设置的IoBUFF大小不断读取字节长度,然后交给FilterChain来处理。IoProcessor读取过程中,不处理有关数据流的逻辑。just read
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireMessageReceived(buf);
buf = null;
//如果处在分包,则自动设置下一次缓冲区数据的大小
if (hasFragmentation) {
if (readBytes << 1 < config.getReadBufferSize()) {
session.decreaseReadBufferSize();
} else if (readBytes == config.getReadBufferSize()) {
session.increaseReadBufferSize();
}
}
}
if (ret < 0) {
// scheduleRemove(session);
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireInputClosed();
}
} catch (Exception e) {
if (e instanceof IOException) {
if (!(e instanceof PortUnreachableException)
|| !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
|| ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
scheduleRemove(session);
}
}
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireExceptionCaught(e);
}
}
版权声明:本文为博主原创文章,未经博主允许不得转载。