Apache mina -Processor线程逻辑

Processor 是AbstractPollingIoProcessor 抽象类内部的私有类。

Processor 是一个处理I/O输出输出数据流的主要循环主体。

Processor 主要工作包括:

1、 优先处理新IoSession 的逻辑处理

2、抖动处理

3、IoSession 的输入输出工作。

新IoSession 的逻辑处理

//循环 private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>(); 并发队列,如果存在新IoSession就处理

/**

* Loops over the new sessions blocking queue and returns the number of

* sessions which are effectively created

*

* @return The number of new sessions

*/

private int handleNewSessions()

{

int addedSessions = 0;

//Acceptor 线程把新链接通过并发队列放入到IoProccessor线程中。IoProccessor线程优先新链接的socket

for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {

if (addNow(session)) {

// A new session has been created

addedSessions++;

}

}

return addedSessions;

}

//与Channel相关联的IoSession ,设置为非阻塞模式。并且注册SelectionKey.OP_READ在Selector中。

protected void init(NioSession session) throws Exception {

SelectableChannel ch = (SelectableChannel) session.getChannel();

ch.configureBlocking(false);

session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));

}

/**

* Process a new session :

* - initialize it

* - create its chain

* - fire the CREATED listeners if any

*

* @param session The session to create

* @return true if the session has been registered

*/

private boolean addNow(S session) {

boolean registered = false;

//处理一个新socket的流程,初始化IoSession,创建一个handler chain 和创建监听器

try {

init(session);//初始化IoSession

registered = true;

//建立IoSession相关联的Chain Filter 用来处理数据的逻辑

// Build the filter chain of this session.

IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();

chainBuilder.buildFilterChain(session.getFilterChain());

//在这里处理IoSession 的创建等事件。这里为监听着模式。

// DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here

// in AbstractIoFilterChain.fireSessionOpened().

// Propagate the SESSION_CREATED event up to the chain

IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();

listeners.fireSessionCreated(session);

} catch (Exception e) {

ExceptionMonitor.getInstance().exceptionCaught(e);

try {

destroy(session);

} catch (Exception e1) {

ExceptionMonitor.getInstance().exceptionCaught(e1);

} finally {

registered = false;

}

}

return registered;

}

//新IoSession逻辑处理完后,是对IoSession的主要逻辑处理。包括数据的读取和写入操作。

/**

* Deal with session ready for the read or write operations, or both.

*/

private void process(S session)

{

// Process Reads

//真正执行IO读写操作的地方

if (isReadable(session) && !session.isReadSuspended())

{

//执行IO读取操作

read(session);

}

// Process writes

if (isWritable(session) && !session.isWriteSuspended())

{

// add the session to the queue, if it‘s not already there

if (session.setScheduledForFlush(true))

{

flushingSessions.add(session);

}

}

}

//下面为IoProccessor 关于读取数据的主要操作。在读取过程中Apache Mina根据每次对于数据的读取量来控制下一次IoBuffer大小数量。

在I/O读取过程中,因为Java不能直接处理内存数据。容器每次都需要内堆内存申请,这样对JAVA GC压力较大。在JAVA NIO中提供了一个IoBuff是对底层内存的一种映射。

封装了对于底层内存的关联。有利于提供JAVA对于网络输入输出的处理能力。在IoProcessor 中,仅仅处理对数据的读取和写入,不处理数据的逻辑。包括分包,包结构处理。

//在IoProcessor中真正执行IO读取操作的地方

private void read(S session)

{   //IoSession有关数据输入输出都有配置。包括缓冲区大小设置等

IoSessionConfig config = session.getConfig();

//socket中可以读取字节数量

int bufferSize = config.getReadBufferSize();

IoBuffer buf = IoBuffer.allocate(bufferSize);

//判断数据包是否分包

final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();

try {

int readBytes = 0;

int ret;

try {

if (hasFragmentation) {

while ((ret = read(session, buf)) > 0) {

readBytes += ret;

if (!buf.hasRemaining()) {

break;

}

}

} else {

ret = read(session, buf);

if (ret > 0) {

readBytes = ret;

}

}

} finally {

buf.flip();

}

if (readBytes > 0)

{

//在session相关联的FilterChain中相关联的过滤链。通过预先设置的IoBUFF大小不断读取字节长度,然后交给FilterChain来处理。IoProcessor读取过程中,不处理有关数据流的逻辑。just read

IoFilterChain filterChain = session.getFilterChain();

filterChain.fireMessageReceived(buf);

buf = null;

//如果处在分包,则自动设置下一次缓冲区数据的大小

if (hasFragmentation) {

if (readBytes << 1 < config.getReadBufferSize()) {

session.decreaseReadBufferSize();

} else if (readBytes == config.getReadBufferSize()) {

session.increaseReadBufferSize();

}

}

}

if (ret < 0) {

// scheduleRemove(session);

IoFilterChain filterChain = session.getFilterChain();

filterChain.fireInputClosed();

}

} catch (Exception e) {

if (e instanceof IOException) {

if (!(e instanceof PortUnreachableException)

|| !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())

|| ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {

scheduleRemove(session);

}

}

IoFilterChain filterChain = session.getFilterChain();

filterChain.fireExceptionCaught(e);

}

}

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-13 01:32:48

Apache mina -Processor线程逻辑的相关文章

Apache Mina

初步接触RPC通信框架,目前有很多优秀的RPC框架,今天我参考该博文:http://www.cnblogs.com/xuekyo/archive/2013/03/06/2945826.html 学习了Aapche Mina通信框架.博主介绍的非常详细,包括Mina的源码流程,这里通过阅读博主的文章进行了学习记录,方便以后需要时使用. Apache Mina 是一个网络应用程序框架,用来帮助用户简单地开发高性能和高可扩展性的网络应用程序.它提供了一个通过Java NIO在不同的传输例如TCP/IP

Apache Mina(一)

Apache Mina是一个能够帮助用户开发高性能和高伸缩性网络应用程序的框架.它通过Java nio技术基于TCP/IP和UDP/IP协议提供了抽象的.事件驱动的.异步的API. Mina包的简介: org.apache.mina.core.buffer 用于缓冲区的IoBuffer org.apache.mina.core.serviceorg.apache.mina.transport.* 用于提供连接的service org.apache.mina.core.session 用于提供两端

网络通信框架Apache MINA

Apache MINA(Multipurpose Infrastructure for Network Applications) 是 Apache 组织一个较新的项目,它为开发高性能和高可用性的网络应用程序提供了非常便利的框架.当前发行的 MINA 版本支持基于 Java NIO 技术的TCP/UDP 应用程序开发.串口通讯程序. Mina 的应用层: 一个设计成熟的开源框架,总是会仅可能的减少侵入性,并在整个项目中找到合适的位置,而不应对整个项目的构架设计产生过多的影响,图 1 就是 MIN

Mina的线程模型

在Mina的NIO模式中有三种I/O工作线程(这三种线程模型只在NIOSocket中有效,在NIO数据包和虚拟管道中没有,也不需要配置): IoAcceptor/IoConnector线程 IoProcessor线程 IoHandler线程 一.Acceptor  thread 该线程的作用是接收客户端的连接,并将客户端的连接导入到I/O processor线程模型中.所谓的I/O processor线程模型就是Mina的I/O processor thread.Acceptor thread在

Apache Mina 2.x 框架+源码分析

源码下载 http://www.apache.org/dyn/closer.cgi/mina/mina/2.0.9/apache-mina-2.0.9-src.tar.gz 整体架构 核心过程(IoAcceptor 与 IoConnector通讯) 客户端: 1)通过SocketConnector同服务器端建立连接. 2)链接建立之后I/O的读写交给了I/O Processor线程,I/O Processor是多线程的. 3)通过I/O Processor读取的数据经过IoFilterChain

Apache Mina框架实践

1.为什么要用Apache Mina框架 ApacheMina Server 是一个网络通信应用框架,Mina 可以帮助我们快速开发高性能.高扩展性的网络通信应用,Mina 提供了事件驱动.异步(Mina 的异步IO 默认使用的是JAVANIO 作为底层支持)操作的编程模型. 2.ApacheMina框架使用 Mina的执行流程: > IoService:这个接口在一个线程上负责套接字的建立,拥有自己的Selector,监听是否有连接被建立. > IoProcessor:这个接口在另一个线程上

Apache mina流程分析

Apache mina工作流介绍 apache mina的整体工作流程包含了几个重要的概念和组件,分别是IoService,IoProcessor,IoHandler和IoFilter,在弄清楚整体的运作流程之前需要先介绍下各个组件各自的作用. IoService 这个是mina请求接受器(Acceptor)以及连接器(Connector)的一个抽象的父类,作用就是提供连接和接受请求的服务. IoProcessor 请求处理器,负责请求的处理工作,包括监听事件的更改,filterChain的建立

Apache Mina开发手册之二

Apache Mina开发手册之二 作者:chszs,转载需注明.博客主页:http://blog.csdn.net/chszs 四.NIO概述 NIO API是Java 1.4版引入的,NIO的意思是非阻塞的I/O通信.要知道Mina的NIO是基于NIO-1开发的,而在JDK 7中引入了NIO-2的库,但Mina还没有从NIO-2中获得各方面的提升,因此Mina还是基于NIO-1的.虽然Oracle官方是把NIO的N作为New的解释,但业界普遍把这个N解释为Non-Blocking. Mina

图解Apache Mina

Apache MINA 是一个用于简化开发构建高性能.高可扩展的网络应用框架.通过JAVA NIO在各种传输协议(如:TCP/IP.UDP/IP)上提供抽象的事件驱动异步API Apache MINA可以称为:NIO框架库.服务端客户端框架库.一个网络套接字库 特性 为各种传输协议提供统一API 提供类似servlet filter的过滤链路支持 可定制化线程模型 开箱即用的 SSL · TLS · StartTLS 超载保护和传输流量控制 易于集成(如:与Spring集成) 可平滑过渡到Net