项目积累——Mina

继续上一篇,这篇主要讲通过mina往B端发送消息,并接受消息,mina是一个网络通信框架,封装了javaNIO,简单易用,网上有很多关于他的介绍,在此不赘述了。

如上篇所介绍,完成功能,需要五个类:

PoolListener:监听,用来在系统启动的时候创建连接;

SessionPool:连接池;

SendHandler:处理类;

CharsetEncoder:编码;

CharsetDecoder:解码:

B为我们提供了6个端口,每个端口可建立3个长连接,因此,在系统时,就要创建长连接,下面是一个监听类:

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

/**
 * 初始化连接
 * @author yuanfubiao
 *
 */
public class PoolListener implements ServletContextListener {

	@Override
	public void contextDestroyed(ServletContextEvent sce) {

	}

	@Override
	public void contextInitialized(ServletContextEvent sce) {
		String nds_ip = sce.getServletContext().getInitParameter("nds_ip");
		String nds_ports = sce.getServletContext().getInitParameter("nds_ports");
		SessionPool pool = new SessionPool();
		try {

			pool.init(nds_ip, nds_ports);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

下面是监听配置,是配置在web.xml中:

    <display-name>Apache-Axis2</display-name>
    <context-param>
    	<param-name>nds_ip</param-name>
    	<param-value>XX.XXX.XXX.XXX</param-value>
    </context-param>
    <context-param>
    	<param-name>nds_ports</param-name>
    	<param-value>12210,12211,12212,12213,12214,12215</param-value>
    </context-param>
    <listener>
    	<listener-class>cn.net.easyway.nds.PoolListener</listener-class>
    </listener>

下面是自己维护的一个连接池,同样使用并发包中的ConcurrentHashMap实现,他也是线程安全的,代码如下:

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;

public class SessionPool {

	private static Log logger = LogFactory.getLog(SessionPool.class);
	private static int connNum = 0;
	private static String ip = null;
	private static Map<String,Integer> connNumPorts = new HashMap<String, Integer>();
	private static ConcurrentHashMap<String, IoSession> pool = new ConcurrentHashMap<String, IoSession>();

	/**
	 * 初始化:读取配置文件,创建长连接
	 * @throws Exception
	 */
	public void init(String nds_ip,String nds_ports) throws Exception{

		String[] ports = nds_ports.split(",");
		ip = nds_ip;

		for(int i=0;i<ports.length;i++){

			int port = Integer.parseInt(ports[i]);
			ConnectFuture future = null;

			for(int j=0;j<3;j++){
				String connNum = this.getConnNums();
				logger.info("创建连接号---->>>>>" + connNum);
				connNumPorts.put(connNum, port);
				future = SessionPool.createConnect(ip, port);
				if(future.isConnected()){
					logger.info("创建连接------->" + future.getSession());
					pool.put(connNum, future.getSession());
				}else{
					logger.error("连接创建错误,请检查IP和端口配置!" + future);
				}
			}
		}
	}

	/**
	 * 获取一个连接
	 * @param num
	 * @return
	 */
	public static IoSession  getSession(String strNum){

		logger.info("IP端口号:" + ip + "连接序列号:" + strNum + "端口号:" + connNumPorts.get(strNum));

		IoSession session = pool.get(strNum);

		if(null == session || !session.isClosing()){
			ConnectFuture newConn = createConnect(ip, connNumPorts.get(strNum));

			if(!newConn.isConnected()){
				newConn =  createConnect(ip,connNumPorts.get(strNum));
			}
			session = newConn.getSession();
			pool.replace(strNum, session);
		}

		return session;
	}

	/**
	 * 创建连接
	 * @param ip
	 * @param port
	 * @return
	 */
	private static ConnectFuture createConnect(String strIp,int intPort){

		IoConnector connector = new NioSocketConnector();

		connector.getFilterChain().addLast("codec"
				,new ProtocolCodecFilter(new CharsetCodecFactory()));

		connector.setHandler(new SendHandler());

		ConnectFuture future = connector.connect(new InetSocketAddress(strIp,intPort));
		connector.getSessionConfig().setReadBufferSize(128);
		future.awaitUninterruptibly();

		return future;
	}

	/**
	 * 生成连接序列号
	 * @return
	 */
	private synchronized String getConnNums(){

		if(18 == connNum){
			connNum = 0;
		}

		connNum++;

		return String.format("%02x", connNum);
	}
}

因此,在项目启动的时候就会有18个连接自动创建,并放在pool中等待我们的使用。下面是业务处理类,需要继承IoHandlerAdapter类,并且实现以下几个方法:

import nds.framework.security.NDSMD5;

import org.apache.commons.codec.binary.Hex;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;

import cm.custom.service.reception.RecResponse;
import cm.custom.service.reception.ReceptionResponseServiceStub;

/**
 * 业务处理
 * @author yuanfubiao
 *
 */
public class SendHandler extends IoHandlerAdapter {

	private static Log logger = LogFactory.getLog(SendHandler.class);

	@Override
	public void exceptionCaught(IoSession session, Throwable cause)
			throws Exception {
		logger.error("连接出错", cause);
	}

	@Override
	/**
	 * 设置空闲时间
	 */
	public void sessionCreated(IoSession session) throws Exception {
		session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 60);
	}

	/**
	 * 接受到消息后,通过WS发送给用户管理系统
	 */
	@Override
	public void messageReceived(IoSession session, Object message)
			throws Exception {
		String result = message.toString().trim();
		String temp = result.substring(0, result.length()-16).trim();
		logger.info("接受到的数据:" + result);
		//验证签名
		String signature = null;
		String securityKey = "12345678";
		try {
			byte binSignature[] = NDSMD5.signPacket(temp.getBytes(), securityKey);
			signature = new String(Hex.encodeHex(binSignature));
		} catch (Exception e) {
			e.printStackTrace();
		}

		String packet = temp + signature.toUpperCase().trim();

		if(!result.equalsIgnoreCase(packet)){
			logger.error("数字签名不正确!错误指令:" + result);
			return;
		}
		logger.info("接受到的数据:" + packet);
		RecResponse res = new RecResponse();
		res.setResponse(temp);
		ReceptionResponseServiceStub stub = new ReceptionResponseServiceStub();
		stub.recResponse(res);
	}

	/**
	 * 连接空闲时,发送心跳包
	 */
	@Override
	public void sessionIdle(IoSession session, IdleStatus status)
			throws Exception {
		if(status == IdleStatus.BOTH_IDLE){
			session.write("heartbeat");
		}
	}
}

一般我们在写socket程序时,用阻塞的方式读取消息,一般是根据消息换行符或者特殊字符,或者对方关闭流来证明一条信息读取完成,在mina中,有默认的编解码方式,但也可以自定义,比如以长度来判断一条消息是否读取完毕:

编码

import java.nio.charset.Charset;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;

/**
 * 编码
 * @author yuanfubiao
 *
 */
public class CharsetEncoder extends ProtocolEncoderAdapter{

	private final static Charset charset = Charset.forName("utf-8");

	@Override
	public void encode(IoSession session, Object message, ProtocolEncoderOutput out)
			throws Exception {

		IoBuffer buff = IoBuffer.allocate(100).setAutoExpand(true);
		buff.putString(message.toString(), charset.newEncoder());

		buff.flip();
		out.write(buff);
	}
}

解码

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;

/**
 * 解码
 * @author yuanfubiao
 *
 */
public class CharsetDecoder extends CumulativeProtocolDecoder{
	private static Log logger = LogFactory.getLog(CharsetDecoder.class);
	@Override
	protected boolean doDecode(IoSession session, IoBuffer in,
			ProtocolDecoderOutput out) throws Exception {

		if(in.remaining() >= 9){ //心跳为最小传输长度

			byte[] headBytes = new byte[in.limit()];
			logger.info("接收到消息" + headBytes.toString());
			in.get(headBytes, 0, 9);
			String head = new String(headBytes).trim();
			if("heartbeat".equalsIgnoreCase(head)){
				return true;
			}

			int lenPack = Integer.parseInt(head.substring(5, 9), 16)-9;

			if(in.remaining() == lenPack){ //验证消息长度
				byte[] bodyBytes = new byte[in.limit()];
				in.get(bodyBytes,0,lenPack);
				String body = new String(bodyBytes);
				out.write(head.trim()+body.trim());
				return true;
			}
			in.flip();
			return false;
		}
		return false;
	}
}

项目积累——Mina,布布扣,bubuko.com

时间: 2024-08-10 00:07:41

项目积累——Mina的相关文章

项目积累——Blockingqueue,ConcurrentLinkedQueue,Executors

背景 通过做下面一个小的接口系统gate,了解一下mina和java并发包里的东西.A系统为javaweb项目,B为C语言项目,gate是本篇需要完成的系统. 需求 1. A为集群系统,并发较高,会批量发送给gate消息,并且接受gate返回的消息: 2. gate独立部署,将从A接受到的消息压入队列,与B建立连接后,将每条消息验证签名等工作后,发送给B,需要保证性能: 3. B负责处理消息,并返回处理结果,B为gate提供提供六个端口,一个端口可有三个长连接(须由gate发送心跳保持长连接,否

项目积累(二)细节问题,提高用户体验

前两天和同事讨论公司系统一个身份证录入文本框,发现还有一个要录入年龄的文本框,都感觉挺麻烦的.其实当录入身份证号后,年龄就是固定的,想到这就动手写jQuery来改进系统,提高用户体验. 首先,身份证从第七位开始时年龄段,如:410223199910210000,这样,就获取身份证的从第七位开始和接下来的八位.在获取当前时间,相减获取年龄.这样 做还是不是准确的,还要考虑月份和具体每天.代码如下: 1 var nowDate = new Date(); 2 var month = nowDate.

【项目积累】对JSON数据的处理

[项目简述]     接触.NET项目非常长一段时间了,前台用的都是MVC框架.不知道大家是否想过一个问题.我们是怎样将数据显示到前台的,换句话说,MVC能够识别怎么样的数据形式?答案非常easy,就是JSON数据.不太记得的,最好还是找段代码看看.我们须要将数据显示到前台.一定会返回JSON类型的数据. [博客概要] 眼下为止,JSON的应用我们都不陌生了. 但对于JSON的一些理论知识,你真正知道吗?或者说,你在项目调试的时候.以前有在前台alert过一个JSON串吗?有看过JSON串的内容

项目积累(三)CSS

公司不是专门做网站的,偶尔会接到客户让修改前端,有时候和让头疼,自己浏览器兼容问题处理不好. 慢慢积累吧. 先贴出来一些前端代码吧,如下: 1 <div class="test"> 2 <form id="Form1" runat="server"> 3 <span>请输入内容:</span> 4 <asp:TextBox ID="txtCode" runat="

项目积累html标签

今天遇到一个不太常用都标签,网上以后慢慢记下项目中用到都东西. 1.<em> 标签 告诉浏览器把其中的文本表示为强调的内容.对于所有浏览器来说,这意味着要把这段文字用斜体来显示. 在文本中加入强调也需要有技巧.如果强调太多,有些重要的短语就会被漏掉:如果强调太少,就无法真正突出重要的部分.这与调味品一样,最好还是不要滥用强调. 尽管现在 <em> 标签修饰的内容都是用斜体字来显示,但这些内容也具有更广泛的含义,将来的某一天,浏览器也可能会使用其他的特殊效果来显示强调的文本.如果你只

项目积累——数据连接配置文件

<!-- Jdbc方式连接Infor数据库,连接的数据库为hr --> <entry key="jdbc.infor"> <bean class="com.cvicse.commons.datasource.jdbc.JDBCDataSourceConfig"> <!--描述数据资源的共享模式,目前提供 Global,ThreadLocal,InheritableThreadLocal 三种模式 Global 所有的请求都使

项目积累——JAVA知识积累

调用天气: <iframe src="http://www.thinkpage.cn/weather/weather.aspx?uid=&c=CHXX0008&l=zh-CHS&p=CMA&a=1&u=C&s=1&m=1&x=1&d=2&fc=&bgc=&bc=&ti=1&in=1&li=2&ct=iframe" frameborder="0&

项目积累——综合

sql += "  and HT_CGFKSQ.sqsj >=to_date('"+kssj+"','yyyy-MM-dd')"; java.sql.Date.valueOf(qjdpo.getKssjStr()) String nowRq = new SimpleDateFormat("yyyy-MM-dd") .format(new java.util.Date());// 当前的日期 public void addFksqxx(Cgh

项目积累——js应用

//解决由前台向后台传值中文乱码的问题 encodeURI($("#xmjhbgFile").val())//前台JS中数据加码 String fjmc = java.net.URLDecoder.decode(request .getParameter("fjmc"), "utf-8");//后台解码获取 int i = fjmc.lastIndexOf("\\"); fjmc = fjmc.substring(i + 1)