Apache Mina2.x网络通信框架使用入门

关于Apache Mina的文章,资料已经非常多了,我想再多一篇也不过为。另外Main现在3.x版本正在开发中,且已经有M2(里程碑)发布了。

本文中主要针对Mina2.0.9(这个版本也是最后一个2.x版本了)来记录学习和使用的过程和体会。

Mina2.x的文档还算比较全面:http://mina.apache.org/mina-project/userguide/user-guide-toc.html (有些部分可能没有更新,未来讲更多关注和使用3.x)。

  1. 开发服务器和客户端网络应用

    使用Mina开发,客户端连接器和服务端接收器有更多的相似之处,Mina在API设计的时候使用了更高的抽象如:IoService,对于创建服务器端接收器我们将关注IoAccepor,而客户端连接器则是IoConnector.

下面图1是关于Mina的基本使用的描述:

图1

这里对图1做简单的说明:

图1由大的三部分组成,通过颜色就很容易区分和理解器表示的意思。对于服务器(Server)和客户端(Client)而言它们都需要中间最大一块的组成部分,其中包含了配置(Configure),会话数据工厂(SessionDataFactory),过滤器链(FilterChina,由多个过滤器组成),监听器组(有多个Listener组成),处理器(IoHandler);第三部分则可以看出服务器对应的是绑定(bind),客户端对应的是连接(connect)由此区分了服务器和客户端。

说了这么多,就中间部分而言,Mina框架最大程度的解放了开发过程要进行的会话管理,会话数据管理,服务监听处理,过滤器,服务配置等的实现,其都提供了默认实现,当然可以根据使用情况实现对应的接口来自行处理。

2.过滤器

Mina中的过滤器的顶层接口是IoFilter,其自身提供了多种过滤器,比如:LoggingFilter,ExecutorFilter,BlacklistFilter(请求地址黑名单过滤),SSLFilter,编码和解密过滤器等等(更多参考API http://mina.apache.org/mina-project/apidocs/index.html)。

过滤器是Mina框架中极其重要和有价值的部分,其提供了日志,安全,权限控制,统计等过滤器,并且其是可插拔的,可以通过不同的过滤器以不同的顺序组成过滤器链将实现不同的功能。另外可以通过实现IoFilter接口或者继承IoFilterAdapter来创建更多具体业务中需要的IoFilter,当然这么做之前,可以仔细看看Mina的org.apache.mina.filter包下是否已经提供了实现。

3.编码和解码

编码和解码作为网络应用开发必须面对的问题,而Mina作为全功能的网络通讯框架,实现对数据报文的编码和解码自然是其分内之事,具体使用者可更多关注IoHandler,即具体处理接收和发送报文的相关业务。

在org.apache.mina.filter.codec包下有更多的关于编码和解码的实现。

关于编码其方式有很多种,比如Protobuf,serialization(对象序列化),JSON,BASE64等,而解码则涉及到字节流分割的问题,下图2是三种常用的字节流分割的方式:

图2

上面三种方式中2和3在Mina中都有对应的实现,比如3特殊字符结尾标记对应的实现有TextLineEncoder和TextLineDecoder,两者组成了TextLineCodecFactory; 2固定字节的head表示数据字节数有PrefixedStringEncoder和PrefixedStringDecoder,两者组成了PrefixedStringCodecFactory。

第一种固定长度字节数这种主要应用在传输命令的场景中,其传输的字节数是固定,应用中可以自己根据具体情况来实现对应的编码和解码类。

4.一个具体案例来贯穿全文

本案例通过客户端发送短信信息到服务器,然后服务器将其短信信息的发送者和接受者对调,短信内容设置"OK",发回给客户端。

4.1定义短信格式(protobuf):

package secondriver.mina.bo.protobuf;
 
option java_package = "secondriver.mina.bo.protobuf";
option java_outer_classname = "SmsDataProtocal";
 
message Sms {
  required string protocol 		= 1;
  required string sender 		= 2;
  required string receiver 		= 3;
  required string content 		= 4;
}

使用protoc命令将定义个消息生成Java类(使用方式可以参考:)。

4.2编写Sms对象的编码和解密类,这里我们直接编写编码解密工程类,其由编码和解密类组合而成。

package secondriver.mina.bo.protobuf;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.mina.filter.codec.ProtocolEncoder;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;

import com.google.protobuf.ByteString;

import secondriver.mina.bo.protobuf.SmsDataProtocal.Sms;

public class SmsDataCodecFactory implements ProtocolCodecFactory {

	private final Charset charset = StandardCharsets.UTF_8;

	private int prefixLength = 4;

	private int maxDataLength = 1024;

	@Override
	public ProtocolEncoder getEncoder(IoSession session) throws Exception {

		return new ProtocolEncoderAdapter() {

			@Override
			public void encode(IoSession session, Object message,
					ProtocolEncoderOutput out) throws Exception {
				Sms sms = (Sms) message;

				String content = sms.toByteString().toStringUtf8();

				IoBuffer buffer = IoBuffer.allocate(content.length())
						.setAutoExpand(true);

				buffer.putPrefixedString(content, prefixLength,
						charset.newEncoder());

				if (buffer.position() > maxDataLength) {
					throw new IllegalArgumentException("Data length: "
							+ buffer.position());
				}

				buffer.flip();
				out.write(buffer);
			}
		};
	}

	@Override
	public ProtocolDecoder getDecoder(IoSession session) throws Exception {
		return new CumulativeProtocolDecoder() {

			@Override
			protected boolean doDecode(IoSession session, IoBuffer in,
					ProtocolDecoderOutput out) throws Exception {

				if (in.prefixedDataAvailable(prefixLength, maxDataLength)) {

					String msg = in.getPrefixedString(prefixLength,
							charset.newDecoder());

					Sms sms = Sms.parseFrom(ByteString.copyFrom(msg,
							charset.name()));

					out.write(sms);
					return true;
				}
				return false;
			}
		};
	}
}

4.3参见文中1端来写服务端

创建IoAccptor对象->设置过滤器->设置IoHandler->配置->绑定到指定IP和端口

package secondriver.mina.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.filter.logging.LogLevel;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

import secondriver.mina.bo.protobuf.SmsDataCodecFactory;
import secondriver.mina.bo.protobuf.SmsDataProtocal.Sms;

public class SmsServer {

	public static final int PORT = 9001;

	public static void main(String[] args) throws IOException {

		// 接收器
		IoAcceptor acceptor = new NioSocketAcceptor();

		// 过滤器链
		DefaultIoFilterChainBuilder builder = new DefaultIoFilterChainBuilder();

		LoggingFilter loggingFilter = new LoggingFilter();
		loggingFilter.setExceptionCaughtLogLevel(LogLevel.DEBUG);

		builder.addLast("logging", loggingFilter);
		builder.addLast("codec", new ProtocolCodecFilter(
				new SmsDataCodecFactory()));
		builder.addLast("threadPool",
				new ExecutorFilter(Executors.newCachedThreadPool()));
		acceptor.setFilterChainBuilder(builder);

		// 设置处理器IoHandler
		acceptor.setHandler(new IoHandlerAdapter() {

			@Override
			public void messageReceived(IoSession session, Object message)
					throws Exception {
				Sms sms = (Sms) message;
				System.out.println("客户端发来:");
				System.out.println(sms.toString());

				// 服务器发送
				Sms serverSms = Sms.newBuilder().setProtocol(sms.getProtocol())
						.setContent("OK").setReceiver(sms.getSender())
						.setSender(sms.getSender()).build();
				session.write(serverSms);
			}
		});

		// 配置服务器(IoAccptor)
		acceptor.getSessionConfig().setReadBufferSize(2048);
		acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
		// 绑定到指定IP和端口
		acceptor.bind(new InetSocketAddress(PORT));
	}
}

4.4 参见文中1端编写客户端

package secondriver.mina.client;

import java.net.InetSocketAddress;
import java.util.Scanner;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.mina.core.RuntimeIoException;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;

import secondriver.mina.bo.protobuf.SmsDataCodecFactory;
import secondriver.mina.bo.protobuf.SmsDataProtocal.Sms;

public class SmsClient {
	private static InetSocketAddress server = new InetSocketAddress(
			"127.0.0.1", 9001);

	public static void main(String[] args) throws InterruptedException {

		// 客户端连接器
		IoConnector connector = new NioSocketConnector();

		// 过滤器
		connector.getFilterChain().addLast("codec",
				new ProtocolCodecFilter(new SmsDataCodecFactory()));
		connector.getFilterChain().addLast("threadPool",
				new ExecutorFilter(Executors.newCachedThreadPool()));

		// 处理器
		connector.setHandler(new IoHandlerAdapter() {

			@Override
			public void sessionCreated(IoSession session) throws Exception {
			}

			@Override
			public void messageReceived(IoSession session, Object message)
					throws Exception {
				System.out.println("服务器响应:");
				System.out.println(((Sms) message).toString());
			}

		});

		// 建立会话Session
		IoSession session = null;
		while (true) {
			try {
				ConnectFuture future = connector.connect(server);
				future.awaitUninterruptibly(100, TimeUnit.SECONDS);
				session = future.getSession();
				if (null != session) {
					break;
				}
			} catch (RuntimeIoException e) {
				System.err.println("Failed to connect with "
						+ server.toString());
				e.printStackTrace();
				try {
					Thread.sleep(5000);
				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
			}
		}

		// 客户端输入
		try (Scanner scanner = new Scanner(System.in);) {
			while (true) {
				String sender = "1814453211";
				System.out.println("请输入收信息手机号:");
				String receiver = scanner.nextLine();
				System.out.println("请输入信息内容:");
				String content = scanner.nextLine();

				Sms sms = Sms.newBuilder()
						.setProtocol("ip.weixin.com TC-C/2.0")
						.setSender(sender).setReceiver(receiver)
						.setContent(content).build();

				session.write(sms);

				Thread.sleep(10000);
				System.out.println("是否继续,回车继续 , q or quit 退出:");
				String line = scanner.nextLine();
				if (line.trim().equalsIgnoreCase("q")
						|| line.trim().equalsIgnoreCase("quit")) {
					break;
				}
			}
		}
		session.close(false);
		connector.dispose();
	}
}

4.5 启动服务,启动客户端

图3是运行的结果:

客户端信息:

服务器信息:

图3

说明:上面客户端和服务器端的关于IoHandler直接使用了匿名类的方式对数据的接收做了相应的简单处理。Sms对象转换成UTF-8编码的字符串,采用了3端中编码和解密的第2中方式,并且传输的数据最大长度为1024byte(1k)。另外,Potobuf-java和Mina集成,mina3.x提供了对protobuf定义的消息的编码和解码提供了实现支持。

为了需要更多关注Mina3.x,另外Netty的发展势头正旺,netty有种子承父业的感觉,也值得拥有!

另外关于使用Mina2.x的一个多客户端会话的示例见:https://code.csdn.net/snippets/546078.js

时间: 2024-08-05 01:50:53

Apache Mina2.x网络通信框架使用入门的相关文章

NetworkComms网络通信框架序言

03年大学毕业,主要做Web开发,大家可以看看networkcomms中文站: www.networkcomms.cn  自己基于网上开源程序二次开发的:) 从06年开始,便把主要的学习精力放到网络通信上, 主要使用C#语言,WinForm框架,sql Server数据库. 工作于大企业的IT部门,平时有较多的时间用于技术研究,即便这样,在学习的初期,几年的时间内,都无法开发出比较稳定的CS系统,网络通信中需要调试和考虑的地方太多,能开发稳定的可复用的通信系统,我想只有传说中的高手才能做到,而我

使用 Apache MINA2 实现 Web 系统的消息中间件

本文将介绍如何使用 Apache MINA2(以下简称 MINA2)解决复杂 Web 系统内各子系统之间同步消息中间件的问题.MINA2 为开发高性能和高可用性的网络应用程序提供了非常便利的框架.从本文中可以了解 MINA2 的基本原理和主要功能,此外在本文中您还可以看到 MINA2 实现消息中间件的服务端和客户端程序的详细内容. 4 评论: 苏 梦, 软件工程师, IBM 尹 文清, Java 开发工程师, 百度在线 2011 年 8 月 25 日 内容 项目背景介绍 系统发展遇到的瓶颈问题

Mycat开发实践---Mycat的网络通信框架

1从一个测试说起 网上有人对Cobar和MyCAT做了一个简单的比较测试,过程如下: 1 测试环境 利用A.B.C三大类服务器,在A台上面安装配置MyCAT及Cobar,这样保证了硬件方面的一致性.B类服务器上安装Apache这一web服务,使用PHP语言.C类安装MySQL数据库,其中B类与C类均不止一台,主要目的是为了作压力的均分.C类服务器安装了4台,存放了相同的数据库,对其中一个表进行分片存储. 测试软件使用的是loadRunner.在对两个中间件分别进行测试的过程中,采用的web服务器

【转】Android 网络通信框架Volley简介(Google IO 2013)

Volley主页 https://android.googlesource.com/platform/frameworks/volley http://www.youtube.com/watch?v=yhv8l9F44qo&feature=player_embedded 1. 什么是Volley 在这之前,我们在程序中需要和网络通信的时候,大体使用的东西莫过于AsyncTaskLoader,HttpURLConnection,AsyncTask,HTTPClient(Apache)等,今年的Go

Springmvc整合tiles框架简单入门示例(maven)

Springmvc整合tiles框架简单入门示例(maven) 本教程基于Springmvc,spring mvc和maven怎么弄就不具体说了,这边就只简单说tiles框架的整合. 先贴上源码(免积分下载): http://download.csdn.net/detail/zhangbing2434/9435460(这里用的是Idea,eclipse,导入的时候可能会有些差异) 1.tiles依赖的jar包:     maven代码: <dependency> <groupId>

Apache Hadoop2.x 边安装边入门

完整PDF版本:<Apache Hadoop2.x边安装边入门> 目录 第一部分:Linux环境安装 第一步.配置Vmware NAT网络 一. Vmware网络模式介绍 二. NAT模式配置 第二步.安装Linux操作系统 三. Vmware上安装Linux系统 四.设置网络 五.修改Hostname 六.配置Host 七.关闭防火墙 八.关闭selinux 第三步.安装JDK 九.安装Java JDK 第二部分:Hadoop本地模式安装 第四步. Hadoop部署模式 第五步.本地模式部署

http网络通信框架解读(HttpClient、HttpURLConnection、OKHttp、Unirest、Volley、Xutils、Retrofit )

一.HttpClient与HttpURLConnection,URLConnection HttpURLConnection继承URLConnection,底层socket,最原始通信,使用 HttpURLConnection 发起 HTTP 请求最大的优点是不需要引入额外的依赖.但无法提供额外的功能,如cookie,登录,同时,也缺乏连接池管理.域名机械控制等特性支持. HttpClient是apache的一个项目,封装了很多底层细节.Apache HttpComponents HttpCli

手动搭建I/O网络通信框架2:BIO编程模型实现群聊

第一章:手动搭建I/O网络通信框架1:Socket和ServerSocket入门实战,实现单聊 在第一章中运用Socket和ServerSocket简单的实现了网络通信.这一章,利用BIO编程模型进行升级改造,实现群聊聊天室. 如图:当一个客户端请求进来时,接收器会为这个客户端分配一个工作线程,这个工作线程专职处理客户端的操作.在上一章中,服务器接收到客户端请求后就跑去专门服务这个客户端了,所以当其他请求进来时,是处理不到的. 看到这个图,很容易就会想到线程池,BIO是一个相对简单的模型,实现它

c#网络通信框架networkcomms内核解析 序言

networkcomms是我遇到的写的最优美的代码,很喜欢,推荐给大家:) 基于networkcomms2.3.1开源版本( gplv3)协议,写了一些文章,希望大家喜欢,个人水平有限,不足之处难免. networkcommsc#通信框架来自于美丽的英国剑桥,由大洋彼岸的两位工程师 Marc Fletcher, Matthew Dean开发. c#网络通信框架networkcomms内核解析之一 消息传送 c#网络通信框架networkcomms内核解析之二 消息处理流程 c#网络通信框架net