java nio 网络框架实现(转)

maven项目
https://github.com/solq360/common

  • 链式编/解码
  • 链路层链式处理
  • 管道管理socket
  • 多协议处理非常方便
  • 仿netty NioEventLoop 单线程串行处理

========
侍加功能 :

  • 自动化编/解码
  • rpc 接口增强使用

简单聊天例子

server

TestNioServer

//创建session管理工厂
ISessionFactory sessionFactory = new SessionFactory();
//创建编/解码管理
ICoderParserManager coderParserManager = new CoderParserManager();
//注册包编/解码,处理业务
coderParserManager.register(CoderParser.valueOf("server chat", PackageDefaultCoder.valueOf(), new ChatTestServerHandle()));
//创建ServerSocket 实例
ServerSocket serverSocket=ServerSocket.valueOf(SocketChannelConfig.valueOf(6969), 10,20,coderParserManager, sessionFactory);

//启动服务
serverSocket.start();
//阻塞当前线程
serverSocket.sync();
//关闭处理
serverSocket.stop();
	

client

TestNioClient
传统方式连接

	//创建编/解码管理
 	ICoderParserManager coderParserManager = new CoderParserManager();
 	//注册包编/解码,处理业务
	coderParserManager.register(CoderParser.valueOf("chat", PackageDefaultCoder.valueOf(), new ChatHandle()));
	//创建ClientSocket 实例
	final ClientSocket clientSocket = ClientSocket.valueOf(SocketChannelConfig.valueOf(6969), new SocketPool("client", null), coderParserManager, new EmptyHandle());

	//模拟连接之后发送消息
	Timer timer = new Timer();
	timer.schedule(new TimerTask() {

	    @Override
	    public void run() {
		clientSocket.send("连接服务器成功");
		System.out.println("send ");
		this.cancel();
	    }
	}, 1000);

	//启动服务
	clientSocket.start();
	//阻塞当前线程
	clientSocket.sync();
	//关闭处理
	clientSocket.stop();

服务器方式连接

	//创建session管理工厂
	ISessionFactory sessionFactory = new SessionFactory();
	//创建编/解码管理
 	ICoderParserManager coderParserManager = new CoderParserManager();
 	//注册包编/解码,处理业务
	coderParserManager.register(CoderParser.valueOf("chat", PackageDefaultCoder.valueOf(), new ChatHandle()));
	//创建ClientSocket 实例
	final ServerSocket serverSocket = ServerSocket.valueOf(SocketChannelConfig.valueOf(8888), 10, 20, coderParserManager, sessionFactory);

	//模拟连接之后发送消息
	Timer timer = new Timer();
	timer.schedule(new TimerTask() {

	    @Override
	    public void run() {
		System.out.println("registerClientSocket");
		//主动连接服务器
		ClientSocket clientSocket = serverSocket.registerClient(SocketChannelConfig.valueOf(6969));
		clientSocket.send("连接服务器成功");
		this.cancel();
	    }
	}, 1000);

	//启动服务
	serverSocket.start();
	//阻塞当前线程
	serverSocket.sync();
	//关闭处理
	serverSocket.stop();

源码实现过程

链式编/解码

  • 由 多个 ICoder 输入/输出转换处理
  • CoderParser 类组装多个 ICoder
  • 编/码处理器 注意优先级
  • nio read -> packageCoder -> link coders -> handle
  • handle write -> link coders -> packageCoder -> nio write
  • 由 ICoderParserManager 管理调用处理
public interface ICoderParserManager {

    /**
     * 解码处理
     *
     * @return CoderResult
     * */
    CoderResult decode(ByteBuffer buffer, ICoderCtx ctx);

    /**
     * 编码处理
     * */
    ByteBuffer encode(Object message, ICoderCtx ctx);

    void error(ByteBuffer buffer, ICoderCtx ctx);

    /** 注册 编/码处理器 */
    void register(CoderParser coderParser);
}

其中核心
decode
encode

  @Override
    public CoderResult decode(ByteBuffer buffer, ICoderCtx ctx) {
	final SocketChannelCtx socketChannelCtx = (SocketChannelCtx) ctx;
	final ClientSocket clientSocket = socketChannelCtx.getClientSocket();

	for (CoderParser coderParser : coderParsers.values()) {
	    final IPackageCoder packageCoder = coderParser.getPackageCoder();
	    final ICoder<?, ?>[] linkCoders = coderParser.getCoders();
	    final IHandle handle = coderParser.getHandle();
	    Object value = null;
	    synchronized (buffer) {
		// 已解析完
		if (socketChannelCtx.getCurrPackageIndex() >= buffer.limit()) {
		    return CoderResult.valueOf(ResultValue.UNFINISHED);
		}
		// 包协议处理
		if (!packageCoder.verify(buffer, ctx)) {
		    continue;
		}
		// 包解析
		value = packageCoder.decode(buffer, ctx);
		if (value == null) {
		    // 包未读完整
		    return CoderResult.valueOf(ResultValue.UNFINISHED);
		}
	    }
	    // 链式处理
	    if (linkCoders != null) {
		for (ICoder coder : linkCoders) {
		    value = coder.decode(value, ctx);
		    if (value == null) {
			throw new CoderException("解码出错 : " + coder.getClass());
		    }
		}
	    }
	    // 业务解码处理
	    value = handle.decode(value, ctx);
	    clientSocket.readBefore(socketChannelCtx, value);
	    handle.handle(value, ctx);
	    clientSocket.readAfter(socketChannelCtx, value);

	    return CoderResult.valueOf(ResultValue.SUCCEED);
	}
	return CoderResult.valueOf(ResultValue.NOT_FIND_CODER);
    }

    @Override
    public ByteBuffer encode(Object message, ICoderCtx ctx) {

	for (CoderParser coderParser : coderParsers.values()) {
	    final IPackageCoder packageCoder = coderParser.getPackageCoder();
	    final ICoder<?, ?>[] linkCoders = coderParser.getCoders();
	    final IHandle handle = coderParser.getHandle();
	    // 业务检查
	    if (!handle.verify(message, ctx)) {
		continue;
	    }
	    // 业务编码处理
	    Object value = handle.encode(message, ctx);
	    // 链式处理
	    if (linkCoders != null) {
		for (int i = linkCoders.length - 1; i >= 0; i--) {
		    ICoder coder = linkCoders[i];
		    value = coder.encode(value, ctx);
		    if (value == null) {
			throw new CoderException("编码出错 : " + coder.getClass());
		    }
		}
	    }
	    // 打包消息处理
	    value = packageCoder.encode(value, ctx);
	    if (value != null) {
		return (ByteBuffer) value;
	    }
	    throw new CoderException("编码出错  :" + packageCoder.getClass());
	}

	throw new CoderException("未找到编/解码处理器 ");
   }
  • 半包/帖包处理 : AbstractISocketChannel doRead方法摘要,根据解码返回的状态做处理。
  • 半包:当不是完成状态时,继续解码,从最后一次包索引开始处理
  • 帖包:当完成包解码移动包索引,等侍下轮解码处理
       boolean run = true;
    	    // 粘包处理
    	    while (run) {
    		ByteBuffer cpbuffer = socketChannelCtx.coderBegin();
    		cpbuffer.mark();
    		CoderResult coderResult = coderParserManager.decode(cpbuffer, socketChannelCtx);
    		switch (coderResult.getValue()) {
    		case SUCCEED:
    		    break;
    		case NOT_FIND_CODER:
    		    final int readySize = socketChannelCtx.getWriteIndex() - socketChannelCtx.getCurrPackageIndex();
    		    final int headLimit = 255;
    		    if (readySize >= headLimit) {
    			throw new CoderException("未找到编/解码处理器 ");
    		    }
    		    run = false;
    		    break;
    		case UNFINISHED:
    		case UNKNOWN:
    		case ERROR:
    		default:
    		    run = false;
    		    // TODO throw
    		    break;
    		}
    	  }

http://www.cnblogs.com/solq/p/4585496.html

时间: 2024-11-05 15:49:10

java nio 网络框架实现(转)的相关文章

java nio 网络框架

https://github.com/solq360/common 主要运行在android 平台 解决自动化编/解码,等等.. 模块 解决问题/实现处理 备注 负责人 进度 录音播放 AudioRecord 和 AudioTrack 熟悉录音处理接口 solq 进行中 声音编码 第三方库 处理燥音,回音,压缩等     通信 jdk nio 实现 移动网socket编程是否稳定   进行中 ioc java 反射技术   solq 进行中 event     solq 进行中 资源管理 程序配

Java NIO通信框架在电信领域的实践

Java NIO通信框架在电信领域的实践 此文配图有错,华为电信软件V1版逻辑架构图与华为电信软件V2 MVC版逻辑架构图两张配图是同一张啊 另:我觉得作者在本文中遇到由于同步io引起的历史遗留问题更多的是架构的问题,在作架构时就需要考虑到同步io引起的阻塞问题,我觉得比较好的解决方案是使用排队的方式来下发请求,而不是每次下发请求都启一个线程,这样如果对方还是响应慢的话即使是用nio也是解决不了问题的.

Java NIO 网络编程基础

Java NIO提供了一套网络api,可以用来处理连接数很多的情况.他的基本思想就是用一个线程来处理多个channel. 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899

java nio网络编程服务篇入门

服务端写完了,现在写一个客户端,对于客户端,我考虑使用nio或阻塞socket都可以. 使用nio的客户端: 1 /** 2 * 初始化网络连接 3 */ 4 public void run() { 5 6 // 开启网络连接 7 try { 8 channel = SocketUtils.connect("127.0.0.1",8080); 9 selector = Selector.open(); 10 channel.register(selector, SelectionKey

基于NIO的Netty网络框架

Netty是一个高性能.异步事件驱动的NIO框架,它提供了对TCP.UDP和文件传输的支持,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果. Netty的优点有: a.功能丰富,内置了多种数据编解码功能.支持多种网络协议. b.高性能,通过与其它主流NIO网络框架对比,它的综合性能最佳. c.可扩展性好,可通过它提供的ChannelHandler组件对网络通信方面进行灵活扩展. d.易用性,API使用简单.

BIO &amp; NIO &amp; NIO常见框架

BIO & NIO BIO - Blocking IO - 同步式阻塞式IO --- UDP/TCP NIO - New  IO - 同步式非阻塞式IO AIO  - AsynchronousIO - 异步式非阻塞式IO - jdk1.8 BIO的缺点: 1.会产生阻塞行为 --- receive/accept/connect/read/write 2.一对一的连接:每连接一个客户端,在服务器端就需要开启一个线程去处理请求.在客户端较多的情况下,服务器端就会产生大量的线程 - 耗费内存 3.连接

基于Java NIO的多人在线聊天工具源码实现(登录,单聊,群聊)

近来在学习Java NIO网络开发知识,写了一个基于Java NIO的多人在线聊天工具练练手.源码公开在Coding上: https://coding.net/u/hust_wsh/p/MyChat/git ,开发环境是Ubuntu14.04+Eclipse Mars+JDK1.8. 要想编写一个基于Java NIO的多人在线聊天工具,我总结需要以下几方面的地址:客户端服务器模型,Java NIO中的Selector,SocketChannel,ByteBuffer,Collections以及序

Java NIO框架Mina、Netty、Grizzly介绍与对比(zz)

Mina: Mina(Multipurpose Infrastructure for Network Applications) 是 Apache 组织一个较新的项目,它为开发高性能和高可用性的网络应用程序提供了非常便利的框架.当前发行的 Mina 版本2.04支持基于 Java NIO 技术的 TCP/UDP 应用程序开发.串口通讯程序,Mina 所支持的功能也在进一步的扩展中.目前,正在使用 Mina的应用包括:Apache Directory Project.AsyncWeb.AMQP(A

Java NIO示例:多人网络聊天室

一个多客户端聊天室,支持多客户端聊天,有如下功能: 功能1: 客户端通过Java NIO连接到服务端,支持多客户端的连接 功能2:客户端初次连接时,服务端提示输入昵称,如果昵称已经有人使用,提示重新输入,如果昵称唯一,则登录成功,之后发送消息都需要按照规定格式带着昵称发送消息 功能3:客户端登录后,发送已经设置好的欢迎信息和在线人数给客户端,并且通知其他客户端该客户端上线 功能4:服务器收到已登录客户端输入内容,转发至其他登录客户端. 功能5 TODO:客户端下线检测  方案是:客户端在线的时候