大并发量socket 通信框架MINA介绍

Apache MINA(Multipurpose Infrastructure for Network Applications) 是 Apache 组织一个较新的项目,它为开发高性能和高可用性的网络应用程序提供了非常便利的框架。当前发行的 MINA 版本支持基于 Java NIO 技术的TCP/UDP 应用程序开发、串口通讯程序。

Mina 的应用层:

一个设计成熟的开源框架,总是会仅可能的减少侵入性,并在整个项目中找到合适的位置,而不应对整个项目的构架设计产生过多的影响,图 1 就是 MINA 的应用层示意图。从图中和上节的 DEMO 中我们可以看到, MINA很好的把业务代码和底层的通信隔离了开来,我们要做的仅仅是建立好监听,然后写上我们需要实现的业务逻辑就OK 了。

MINA 的内部流程:

(1) IoService :这个接口在一个线程上负责套接字的建立,拥有自己的 Selector ,监听是否有连接被建立。

(2) IoProcessor :这个接口在另一个线程上负责检查是否有数据在通道上读写,也就是说它也拥有自己的Selector ,这是与我们使用 JAVA NIO 编码时的一个不同之处,通常在 JAVA NIO 编码中,我们都是使用一个 Selector ,也就是不区分 IoService 与 IoProcessor 两个功能接口。另外, IoProcessor 也是 MINA 框架的核心组件之一 . 在 MINA 框架启动时,会用一个线程池来专门生成线程,来负责调用注册在 IoService 上的过滤器,并在过滤器链之后调用 IoHandler 。在默认情况 IoProcessor 会用N+1 个线程来轮流询问监视的端口是否有数据传送,其中 n 为 cpu 的内核个数。按一般的多线程设计概念来说,IoProcessor 的线程数是越多越好,但实际上并非如此,因为大家都知道, IO 的操作是非常占用资源的,所以项目中的 IoProcessor 的线程数应该根据实际需要来定,而这个数字可以在生成 IoAcceptor 对象时进行设定。 EgIoAcceptor acceptor = new NioSocketAcceptor( N );

(3.) IoFilter :这个接口定义一组拦截器,这些拦截器可以包括日志输出、黑名单过滤,甚至是在过滤器链中利用 AOP 写上权限控制。数据的编码( write 方向)与解码( read 方向)等功能,其中数据的 encode 与 decode 是最为重要的、也是您在使用 Mina 时最主要关注的地方。

(4.) IoHandler :这个接口负责编写业务逻辑,也就是接收、发送数据的地方。如果大家把业务处理写好,并写好业务接口,真正要用时,只需要在此处替换即可,再次见证了 MINA 分层的彻底。

其中 IoService 接口会专门起一个线程来轮询是否有新的连接产生,一旦有连接产生则通知 IoProcessor, 而IoProcessor 则起 n+1 个线程来检查连接是否有数据在上面读写。一旦有连接产生,并有数据读写,则通知 decode 或 encode ,进行报文的解码或编码,将处理后的报文再交给业务类进行业务处理。其中IoProcessor 是处理请求的分配,包括选择 Selector ,超时验证,状态记录等。总之这个类和 IoService 一起配合工作,封装了 NIO 底层的实现以及 MINA 框架内部的功能的支持 .

结合实例,并根据以上的图文讲解,我们可以很轻易的总结出利用 MINA 编程的几个大致步骤:

创建一个实现了 IoService 接口的类

设置一个实现了 IoFilter 接口的过滤器(如果有需要的情况下)

设置一个 IoHandler 接口实现的处理类,用于处理事件(必须)

对 IoService 绑定一个端口开始工作

注:这一点请特别注意,因 IoProcessor 也是相当于轮询机制,这导致在报文过长时,或其它原因导致报文不能一次传输完毕的情况下,必须保存同一连接 ( 在 MINA 中是以 IoSession 类生成的对象 ) 的上一次状态,这样才能截取到一个完成的报文,而这也是 decode( 编码器 ) 需要做的核心工作 。

Mina 的使用:

//Mina TCP 服务端 
package com.mina.test;

import java.util.Date;

import org.apache.mina.core.service.IoHandlerAdapter; 
import org.apache.mina.core.session.IdleStatus; 
import org.apache.mina.core.session.IoSession;

public class TimeServerHandler extends IoHandlerAdapter { 
@Override 
public void sessionCreated(IoSession session) throws Exception { 
System.out.println("服务端与客户端创建连接..."); 
super.sessionCreated(session); 
}

@Override 
public void sessionOpened(IoSession session) throws Exception { 
System.out.println("服务端与客户端连接打开..."); 
super.sessionOpened(session); 
}

@Override 
public void sessionClosed(IoSession session) throws Exception { 
System.out.println("服务端与客户端连接关闭..."); 
super.sessionClosed(session); 
}

@Override 
public void messageSent(IoSession session, Object message) throws Exception { 
System.out.println("服务端发送信息成功..."+message.toString()); 
super.messageSent(session, message); 
}

public void exceptionCaught(IoSession session, Throwable cause) 
throws Exception { 
System.out.println("服务端发送异常..."+cause.getMessage()); 
cause.printStackTrace(); 
}

public void messageReceived(IoSession session, Object message) 
throws Exception { 
String strMsg = message.toString(); 
System.out.println("服务端接收到的数据为: "+strMsg); 
if (strMsg.trim().equalsIgnoreCase("quit")) { 
session.close(); 
return; 
}
Date date = new Date(); 
session.write(date.toString()); 
}

public void sessionIdle(IoSession session, IdleStatus status) 
throws Exception { 
System.out.println("服务端进入空闲状态... " + session.getIdleCount(status)); 

}

package com.mina.test;

import java.io.IOException; 
import java.net.InetSocketAddress; 
import java.nio.charset.Charset;

import org.apache.mina.core.service.IoAcceptor; 
import org.apache.mina.core.session.IdleStatus; 
import org.apache.mina.filter.codec.ProtocolCodecFilter; 
import org.apache.mina.filter.codec.textline.TextLineCodecFactory; 
import org.apache.mina.filter.logging.LoggingFilter; 
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

public class MinaTimeServer {

private static final int PORT = 6488;

public static void main(String[] args) throws IOException {

//监听即将到来的TCP连接,建立监控器 
IoAcceptor acceptor = new NioSocketAcceptor(); 
//设置拦截器 
acceptor.getFilterChain().addLast("logger", new LoggingFilter()); 
acceptor.getFilterChain().addLast( 
"codec", 
new ProtocolCodecFilter(new TextLineCodecFactory(Charset 
.forName("GBK")))); 
//设置处理类 
acceptor.setHandler(new TimeServerHandler()); 
    //设置配置 
acceptor.getSessionConfig().setReadBufferSize(2048); 
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);

//绑定的监听端口,可多次绑定,也可同时绑定多个。 
acceptor.bind(new InetSocketAddress(PORT)); 
System.out.println("服务端启动成功......端口号为: "+PORT);


}

//Mina TCP客户端 
package com.mina.test;

import org.apache.mina.core.buffer.IoBuffer; 
import org.apache.mina.core.service.IoHandlerAdapter; 
import org.apache.mina.core.session.IdleStatus; 
import org.apache.mina.core.session.IoSession; 
import org.apache.mina.proxy.utils.ByteUtilities;

public class TimeClientHandler extends IoHandlerAdapter {

@Override 
public void messageReceived(IoSession iosession, Object message) 
throws Exception { 
IoBuffer bbuf = (IoBuffer) message; 
byte[] byten = new byte[bbuf.limit()]; 
bbuf.get(byten, bbuf.position(), bbuf.limit()); 
System.out.println("客户端收到消息" + ByteUtilities.asHex(byten)); 

@Override 
public void exceptionCaught(IoSession session, Throwable cause) 
throws Exception { 
System.out.println("客户端异常"); 
super.exceptionCaught(session, cause); 

@Override 
public void messageSent(IoSession iosession, Object obj) throws Exception { 
System.out.println("客户端消息发送"); 
super.messageSent(iosession, obj); 

@Override 
public void sessionClosed(IoSession iosession) throws Exception { 
System.out.println("客户端会话关闭"); 
super.sessionClosed(iosession); 

@Override 
public void sessionCreated(IoSession iosession) throws Exception { 
System.out.println("客户端会话创建"); 
super.sessionCreated(iosession); 

@Override 
public void sessionIdle(IoSession iosession, IdleStatus idlestatus) 
throws Exception { 
System.out.println("客户端会话休眠"); 
super.sessionIdle(iosession, idlestatus); 

@Override 
public void sessionOpened(IoSession iosession) throws Exception { 
System.out.println("客户端会话打开"); 
super.sessionOpened(iosession); 
}

}

package com.mina.test;

import java.net.InetSocketAddress;

import org.apache.mina.core.buffer.IoBuffer; 
import org.apache.mina.core.future.ConnectFuture; 
import org.apache.mina.core.service.IoConnector; 
import org.apache.mina.core.session.IoSession; 
import org.apache.mina.transport.socket.nio.NioSocketConnector;

public class MinaTimeClient {

private static final int PORT = 6488; 
private static IoConnector connector; 
private static IoSession session;

public static void main(String[] args) throws Exception {

TimeClientHandler clientHandler=new TimeClientHandler(); 
connector = new NioSocketConnector(); 
//设置处理类 
connector.setHandler(clientHandler); 
ConnectFuture connFuture = connector.connect(new InetSocketAddress("localhost", PORT)); 
connFuture.awaitUninterruptibly(); 
session = connFuture.getSession(); 
clientHandler.sessionOpened(session); 
System.out.println("TCP 客户端启动");

for(int j=0;j<5;j++){ // 发送两遍 
byte[] bts = new byte[20]; 
for (int i = 0; i < 20; i++) { 
bts[i] = (byte) i; 

IoBuffer buffer = IoBuffer.allocate(20); 
// 自动扩容 
buffer.setAutoExpand(true); 
// 自动收缩 
buffer.setAutoShrink(true); 
buffer.put(bts); 
buffer.flip(); 
session.write(buffer); 
Thread.sleep(2000); 

// 关闭会话,待所有线程处理结束后 
connector.dispose(true);

}

}

时间: 2024-10-15 00:48:27

大并发量socket 通信框架MINA介绍的相关文章

处理大并发量订单处理的 KafKa部署总结

处理大并发量订单处理的 KafKa部署总结 今天要介绍的是消息中间件KafKa,应该说是一个很牛的中间件吧,背靠Apache 与很多有名的中间件搭配起来用效果更好哦 ,为什么不用RabbitMQ,因为公司需要它. 网上已经有很多怎么用和用到哪的内容,但结果很多人都倒在了入门第一步 环境都搭不起来,可谓是从了解到放弃,所以在此特记录如何在linux环境搭建,windows中配置一样,只是启动运行bat文件. 想要用它就先必须了解它能做什么及能做到什么程度,先看看它是什么吧. 当今社会各种应用系统诸

关于.NET大数据量大并发量的数据连接池管理

转自:http://www.cnblogs.com/virusswb/archive/2010/01/08/1642055.html 我以前对.NET连接池的认识是错误的,原来以为在web.config中设置了连接池,每次发起的数据库连接也还是会是新的,每个sql请求就是一个连接,需要打开和关闭.因此就想设计一个连接池,然后保持固定的连接数,需要数据库连接就从连接池中取出来一个给请求用,用完毕就设置连接空闲,等待下次请求.这样看来是多余的,ADO.NET已经为我们提供这样的连接池管理,每个连接字

RPC通信框架——RCF介绍(替换COM)

阅读目录 RPC通信框架 为什么选择RCF 简单的性能测试 参考资料 总结 现有的软件中用了大量的COM接口,导致无法跨平台,当然由于与Windows结合的太紧密,还有很多无法跨平台的地方.那么为了实现跨平台,支持Linux系统,以及后续的分布式,首要任务是去除COM接口. 在对大量框架进行调研后,决定使用RCF替换COM接口. 回到顶部 RPC通信框架 CORBA ICE Thrift zeromq dbus RCF YAMI4 TAO 回到顶部 为什么选择RCF 经过各项对比,认为: RCF

RPC通信框架&mdash;&mdash;RCF介绍

现有的软件中用了大量的COM接口,导致无法跨平台,当然由于与Windows结合的太紧密,还有很多无法跨平台的地方.那么为了实现跨平台,支持Linux系统,以及后续的分布式,首要任务是去除COM接口. 在对大量框架进行调研后,决定使用RCF替换COM接口. RPC通信框架 CORBA ICE Thrift zeromq dbus RCF YAMI4 TAO 为什么选择RCF 经过各项对比,认为: RCF的使用方式与现有的COM接口方式非常类似,在开发上可以更快速.更容易的替换COM,并且可以少犯错

我是如何处理大并发量订单处理的 KafKa部署总结

今天要介绍的是消息中间件KafKa,应该说是一个很牛的中间件吧,背靠Apache 与很多有名的中间件搭配起来用效果更好哦 ,为什么不用RabbitMQ,因为公司需要它. 网上已经有很多怎么用和用到哪的内容,但结果很多人都倒在了入门第一步 环境都搭不起来,可谓是从了解到放弃,所以在此特记录如何在linux环境搭建,windows中配置一样,只是启动运行bat文件. 想要用它就先必须了解它能做什么及能做到什么程度,先看看它是什么吧. 当今社会各种应用系统诸如商业.社交.搜索.浏览等像信息工厂一样不断

.NET开源高性能Socket通信中间件Helios介绍及演示

一:Helios是什么 Helios是一套高性能的Socket通信中间件,使用C#编写.Helios的开发受到Netty的启发,使用非阻塞的事件驱动模型架构来实现高并发高吞吐量.Helios为我们大大的简化了Socket编程,它已经为我们处理好了高并发情况下的解包,粘包,buffer管理等等. GitHub:https://github.com/helios-io/helios/ 二:Helios的特点 1.Powerful APIs Takes the complexity out of so

大数据量下高并发同步的讲解(转)

文章转自:http://blog.csdn.net/xcw931924821/article/details/52475742 *************************************************************************************************************************************************************************************** 对于

大数据量下高并发同步的讲解(不看,保证你后悔)

对于我们开发的网站,如果网站的访问量非常大的话,那么我们就需要考虑相关的并发访问问题了.而并发问题是绝大部分的程序员头疼的问题, 但话又说回来了,既然逃避不掉,那我们就坦然面对吧~今天就让我们一起来研究一下常见的并发和同步吧. 为了更好的理解并发和同步,我们需要先明白两个重要的概念:同步和异步    1.同步和异步的区别和联系          所谓同步,可以理解为在执行完一个函数或方法之后,一直等待系统返回值或消息,这时程序是出于阻塞的,只有接收到 返回的值或消息后才往下执行其它的命令. 异步

大数据量下高并发同步的讲解(不看,保证你后悔!)

偶然的机会在网上看到了这篇blog,觉得作者写得挺不错的(虽然自己并没有怎么看懂...),所以就转来跟大家分享分享吧~~~ 对于我们开发的网站,如果网站的访问量非常大的话,那么我们就需要考虑相关的并发访问问题了.而并发问题是绝大部分的程序员头疼的问题, 但话又说回来了,既然逃避不掉,那我们就坦然面对吧~今天就让我们一起来研究一下常见的并发和同步吧. 为了更好的理解并发和同步,我们需要先明白两个重要的概念:同步和异步    1.同步和异步的区别和联系          所谓同步,可以理解为在执行完