NIO框架之MINA源代码解析(一):背景

??

“你们的agent占了好多系统的port。把我们的非常多业务系统都给整死了,给我们造成了非常大的损失。要求你们的相关领导下周过来道歉”?? --?? 来自我们的一个客户。

?怎么可能呢,我们都不相信,我们的agent仅仅占一个port啊!

事实胜过雄辩。经过查证。确实是因为我们的agent占了好多系统的port。我看了一下日志。基本把系统可用的port占完了!

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvY2hhb2ZhbndlaQ==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" />

为什么呢?MINA框架私自开的!

因为我们的agent端使用了NIO通信框架MINA,但并没有使用好,造成了这一差点儿毁灭行的灾难。

还是先看代码吧。

/**
 * 异步发送消息
 * @param agent
 * @param request
 */
public void sendMessageToAgent(Agent agent, HyRequest request) {
	IoSession session = null;
	IoConnector connector=null;
	long startTime = System.currentTimeMillis();
	try {
		// 创建一个非堵塞的客户端程序
		 connector = new NioSocketConnector();
		// 设置链接超时时间
		connector.setConnectTimeoutMillis(connectTimeoutMillis);

		ObjectSerializationCodecFactory objsCodec = new ObjectSerializationCodecFactory();
		objsCodec.setDecoderMaxObjectSize(DEFAULTDECODER);
		objsCodec.setEncoderMaxObjectSize(DEFAULTDECODER);
		ProtocolCodecFilter codecFilter = new ProtocolCodecFilter(
				objsCodec);
		// 数据转换。编码设置
		connector.getFilterChain()
				.addLast("codec", codecFilter);
		// 消息
		connector.setHandler(clientHandler);

		SocketAddress socketAddress = new InetSocketAddress(
				agent.getIpAddr(), agent.getAgentPort());
		ConnectFuture future = connector.connect(socketAddress);
		future.awaitUninterruptibly();
		session = future.getSession();
		String json = mapper.writeValueAsString(request);
		session.write(json);

		long endTime = System.currentTimeMillis();

		logerr.debug("send-time:" + (endTime - startTime));

	} catch (Exception e) {
		logerr.error("host:" + agent.getIpAddr() + ", AgentPORT:" + agent.getAgentPort()
				+ ", 连接异常..."+e.getMessage());
		clientHandler.handlerConnectError(agent, request);

	}
}
public class MinaClientHandler extends IoHandlerAdapter {
	// 日志
	private Logger log = Logger.getLogger(getClass());

	private MinaResponseProcesser minaResponseProcesser;

	ObjectMapper mapper=null;

	@Override
	public void messageReceived(IoSession session, Object message)
			throws Exception {
		String msg = message.toString();
		log.info("receive message from " + session.getRemoteAddress().toString() + ",message:" + message);
		if(null == mapper){
			 mapper = new ObjectMapper();
		}
		//请求消息转换为HyResponse对象
		HyResponse response = mapper.readValue(msg, HyResponse.class);
		String remoteIp= ((InetSocketAddress)session.getRemoteAddress()).getAddress().getHostAddress();
		response.setRemoteIp(remoteIp);
		HyRequest request = minaResponseProcesser.processResponse(response);
		if(request == null){
			//关闭当前session
			closeSessionByServer(session,response);
		}else{
			session.write(mapper.writeValueAsString(request));
		}
	}
}

上面的逻辑就是,当要发送一个消息时,创建一个新的connector,并获取一个session发送消息后直接返回,在MinaClientHandler类的messageReceived里面处理接受到的响应数据,并进行业务处理。最后假设不须要再次发送请求,则关闭当前session。

事实上出现本文一開始的问题就是在这里造成的。

在出现我们的agent占用大量port后,我们这边的project人员就迅速定位到了这个问题,并非常快修复了。但修复并不理想,但修复过后的代码。

/**
 * 异步发送消息
 * @param agent
 * @param request
 */
public void sendMessageToAgent(Agent agent, HyRequest request) {
	IoSession session = null;
	IoConnector connector=null;
	long startTime = System.currentTimeMillis();
	try {
		// 创建一个非堵塞的客户端程序
		 connector = new NioSocketConnector();
		// 设置链接超时时间
		connector.setConnectTimeoutMillis(connectTimeoutMillis);

		ObjectSerializationCodecFactory objsCodec = new ObjectSerializationCodecFactory();
		objsCodec.setDecoderMaxObjectSize(DEFAULTDECODER);
		objsCodec.setEncoderMaxObjectSize(DEFAULTDECODER);
		ProtocolCodecFilter codecFilter = new ProtocolCodecFilter(
				objsCodec);
		// 数据转换,编码设置
		connector.getFilterChain()
				.addLast("codec", codecFilter);
		// 消息
		connector.setHandler(clientHandler);

		SocketAddress socketAddress = new InetSocketAddress(
				agent.getIpAddr(), agent.getAgentPort());
		ConnectFuture future = connector.connect(socketAddress);
		future.awaitUninterruptibly();
		session = future.getSession();
		String json = mapper.writeValueAsString(request);
		session.write(json);
		// 等待断开连接
		session.getCloseFuture().awaitUninterruptibly();
		long endTime = System.currentTimeMillis();

		logerr.debug("send-time:" + (endTime - startTime));
		//connector.dispose();
	} catch (Exception e) {
		logerr.error("host:" + agent.getIpAddr() + ", AgentPORT:" + agent.getAgentPort()
				+ ", 连接异常..."+e.getMessage());
		clientHandler.handlerConnectError(agent, request);

	}finally{
		if(null!=session){
			session.close(true);
			session=null;
		}
		if(null !=connector){
			connector.dispose();
		}
	}
}

仅仅改了一个地方。就是在发送完消息后,加了一个等待断开连接语句和finally语句块-关闭session和connector。

尽管不会出现程序占用大量的系统port这个问题。但会造成另外一个问题-当有一个消息队列须要异步调用上面语句发送消息时,有原来的异步(发送完直接返回,相当于高速并发发送)变成伪异步(发送完消息后并等待消息返回处理后返回,相当于顺序处理队列里面的消息)。

上面的改动并非我们想要的结果,但至少修复了占用大量port的问题。

因为怀着想彻底修复这个问题的想法,我想还是深入了解一下MINA源代码吧。

原文地址:https://www.cnblogs.com/zhchoutai/p/8447912.html

时间: 2024-07-30 05:06:41

NIO框架之MINA源代码解析(一):背景的相关文章

NIO框架之MINA源代码解析(二):mina核心引擎

NIO框架之MINA源代码解析(一):背景 MINA的底层还是利用了jdk提供了nio功能,mina仅仅是对nio进行封装.包含MINA用的线程池都是jdk直接提供的. MINA的server端主要有accept.processor.session三部分组成的.当中accept主要负责在指定的port监听.若有新连接则建立一个新的session.processor则负责处理session相应的发送数据和接收数据并调用上层处理:session则缓存当前连接数据. MINA採用了线程懒启动的技术,即

NIO框架之MINA源码解析(五):NIO超级陷阱和使用同步IO与MINA通信

1.NIO超级陷阱 之所以说NIO超级陷阱,就是因为我在本系列开头的那句话,因为使用缺陷导致客户业务系统瘫痪.当然,我对这个问题进行了很深的追踪,包括对MINA源码的深入了解,但其实之所以会出现这个问题,它的根不是MINA的原因,而是JDK底层的问题. JDK底层在实现nio时,为了能够唤醒等待在io上的线程,在windows平台使用了两个端口建立连接发消息实现.看如下代码: [java] view plain copy print? public class NIOTest { @Test p

NIO框架之MINA源码解析(二):mina核心引擎

MINA的底层还是利用了jdk提供了nio功能,mina只是对nio进行封装,包括MINA用的线程池都是jdk直接提供的. MINA的server端主要有accept.processor.session三部分组成的.其中accept主要负责在指定的端口监听,若有新连接则建立一个新的session:processor则负责处理session对应的发送数据和接收数据并调用上层处理:session则缓存当前连接数据. MINA采用了线程懒启动的技术,即最少启动线程,在MINA server启动的时候,

NIO框架之MINA源码解析(一):背景

?? "你们的agent占了好多系统的端口,把我们的很多业务系统都给整死了,给我们造成了很大的损失,要求你们的相关领导下周过来道歉"   --   来自我们的一个客户. 怎么可能呢,我们都不相信,我们的agent只占一个端口啊! 事实胜过雄辩,经过查证,确实是由于我们的agent占了好多系统的端口,我看了一下日志,基本把系统可用的端口占完了! 为什么呢?MINA框架私自开的! 由于我们的agent端使用了NIO通信框架MINA,但并没有使用好,造成了这一几乎毁灭行的灾难. 还是先看代码

NIO框架之MINA源码解析(三):底层通信与责任链模式应用

本文主要介绍下在mina中责任链模式的应用以及mina对于数据读写的处理. 在mina中,对数据的读操作是在processor类里面触发的,收到新消息后就触发读数据链去处理新消息直到自己的业务逻辑代码(IoHandler). 在mina中,数据的写(write)和发(send)差别相对较大,mina中的写消息最终的结果只是把要写的消息经过写数据链处理后的最终结果放在了一个缓存中,并把当前session标记为可发. 数据的发送就是传统中我们所说的发消息,就是把写消息最终处理的结果发送到客户端,待发

NIO框架之MINA源码解析(四):粘包与断包处理及编码与解码

1.粘包与段包 粘包:指TCP协议中,发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾.造成的可能原因: 发送端需要等缓冲区满才发送出去,造成粘包 接收方不及时接收缓冲区的包,造成多个包接收 断包:也就是数据不全,比如包太大,就把包分解成多个小包,多次发送,导致每次接收数据都不全. 2.消息传输的格式 消息长度+消息头+消息体  即前N个字节用于存储消息的长度,用于判断当前消息什么时候结束. 消息头+消息体    即固定长度的消息,前几个字节为消息

NIO框架之MINA源码解析(转)

http://blog.csdn.net/column/details/nio-mina-source.html http://blog.csdn.net/chaofanwei/article/details/38848085 http://blog.csdn.net/chaofanwei/article/details/38871115 http://blog.csdn.net/chaofanwei/article/details/38920963 http://blog.csdn.net/c

【原创】NIO框架入门(一):服务端基于Netty4的UDP双向通信Demo演示

申明:本文由作者基于日常实践整理,希望对初次接触MINA.Netty的人有所启发.如需与作者交流,见文签名,互相学习. 学习交流 更多学习资料:点此进入 推荐 移动端即时通讯交流: 215891622 推荐 前言 NIO框架的流行,使得开发大并发.高性能的互联网服务端成为可能.这其中最流行的无非就是MINA和Netty了,MINA目前的主要版本是MINA2.而Netty的主要版本是Netty3和Netty4(Netty5已经被取消开发了:详见此文). 本文将演示的是一个基于Netty4的UDP服

【原创】NIO框架入门(四):Android与MINA2、Netty4的跨平台UDP双向通信实战

概述 本文演示的是一个Android客户端程序,通过UDP协议与两个典型的NIO框架服务端,实现跨平台双向通信的完整Demo. 当前由于NIO框架的流行,使得开发大并发.高性能的互联网服务端成为可能.这其中最流行的无非就是MINA和Netty了,MINA目前的主要版本是MINA2.而Netty的主要版本是Netty3和Netty4(Netty5已经被取消开发了:详见此文). 本文中,服务端将分别用MINA2和Netty4进行实现,但在你实际的项目中服务端实现只需选其一就行了.本文中的Demo同时