JAVA NIO 服务器与客户端实现示例

以下代码只兼容Java 7及以上版本,对于一些关键地方请看注释说明。

公共类:

package com.stevex.app.nio;

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;

public class CharsetHelper {
	private static final String UTF_8 = "UTF-8";
	private static CharsetEncoder encoder = Charset.forName(UTF_8).newEncoder();
	private static CharsetDecoder decoder = Charset.forName(UTF_8).newDecoder();

	public static ByteBuffer encode(CharBuffer in) throws CharacterCodingException{
		return encoder.encode(in);
	}

	public static CharBuffer decode(ByteBuffer in) throws CharacterCodingException{
		return decoder.decode(in);
	}
}

服务器代码:

package com.stevex.app.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class XiaoNa {
	private ByteBuffer readBuffer;
	private Selector selector;

	public static void main(String[] args){
		XiaoNa xiaona = new XiaoNa();
		xiaona.init();
		xiaona.listen();
	}

	private void init(){
		readBuffer = ByteBuffer.allocate(1024);
		ServerSocketChannel servSocketChannel;

		try {
			servSocketChannel = ServerSocketChannel.open();
			servSocketChannel.configureBlocking(false);
			//绑定端口
			servSocketChannel.socket().bind(new InetSocketAddress(8383));

			selector = Selector.open();
			servSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	private void listen() {
		while(true){
			try{
				selector.select();
				Iterator ite = selector.selectedKeys().iterator();

				while(ite.hasNext()){
					SelectionKey key = (SelectionKey) ite.next();
					ite.remove();//确保不重复处理

					handleKey(key);
				}
			}
			catch(Throwable t){
				t.printStackTrace();
			}
		}
	}

	private void handleKey(SelectionKey key)
			throws IOException, ClosedChannelException {
		SocketChannel channel = null;

		try{
			if(key.isAcceptable()){
				ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
				channel = serverChannel.accept();//接受连接请求
				channel.configureBlocking(false);
				channel.register(selector, SelectionKey.OP_READ);
			}
			else if(key.isReadable()){
				channel = (SocketChannel) key.channel();
				readBuffer.clear();
				/*当客户端channel关闭后,会不断收到read事件,但没有消息,即read方法返回-1
				 * 所以这时服务器端也需要关闭channel,避免无限无效的处理*/
				int count = channel.read(readBuffer);

				if(count > 0){
					//一定需要调用flip函数,否则读取错误数据
					readBuffer.flip();
					/*使用CharBuffer配合取出正确的数据
					String question = new String(readBuffer.array());  
					可能会出错,因为前面readBuffer.clear();并未真正清理数据
					只是重置缓冲区的position, limit, mark,
					而readBuffer.array()会返回整个缓冲区的内容。
					decode方法只取readBuffer的position到limit数据。
					例如,上一次读取到缓冲区的是"where", clear后position为0,limit为	1024,
					再次读取“bye"到缓冲区后,position为3,limit不变,
					flip后position为0,limit为3,前三个字符被覆盖了,但"re"还存在缓冲区中,
					所以 new String(readBuffer.array()) 返回 "byere",
					而decode(readBuffer)返回"bye"。
					*/
					CharBuffer charBuffer = CharsetHelper.decode(readBuffer); 
					String question = charBuffer.toString(); 
					String answer = getAnswer(question);
					channel.write(CharsetHelper.encode(CharBuffer.wrap(answer)));
				}
				else{
					//这里关闭channel,因为客户端已经关闭channel或者异常了
					channel.close();
				}
			}
		}
		catch(Throwable t){
			t.printStackTrace();
			if(channel != null){
				channel.close();
			}
		}
	}

	private String getAnswer(String question){
		String answer = null;

		switch(question){
		case "who":
			answer = "我是小娜\n";
			break;
		case "what":
			answer = "我是来帮你解闷的\n";
			break;
		case "where":
			answer = "我来自外太空\n";
			break;
		case "hi":
			answer = "hello\n";
			break;
		case "bye":
			answer = "88\n";
			break;
		default:
				answer = "请输入 who, 或者what, 或者where";
		}

		return answer;
	}
}

客户端代码:

package com.stevex.app.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class Client implements Runnable{
	private BlockingQueue<String> words;
	private Random random;

	public static void main(String[] args) {
		//种多个线程发起Socket客户端连接请求
		for(int i=0; i<10; i++){
			Client c = new Client();
			c.init();
			new Thread(c).start();
		}
	}

	@Override
	public void run() {
		SocketChannel channel = null;
		Selector selector = null;
		try {
			channel = SocketChannel.open();
			channel.configureBlocking(false);
			//请求连接
			channel.connect(new InetSocketAddress("localhost", 8383));
			selector = Selector.open();
			channel.register(selector, SelectionKey.OP_CONNECT);
			boolean isOver = false;

			while(! isOver){
				selector.select();
				Iterator ite = selector.selectedKeys().iterator();
				while(ite.hasNext()){
					SelectionKey key = (SelectionKey) ite.next();
					ite.remove();

					if(key.isConnectable()){
						if(channel.isConnectionPending()){
							if(channel.finishConnect()){
								//只有当连接成功后才能注册OP_READ事件
								key.interestOps(SelectionKey.OP_READ);

								channel.write(CharsetHelper.encode(CharBuffer.wrap(getWord())));
								sleep();
							}
							else{
								key.cancel();
							}
						}
					}
					else if(key.isReadable()){
						ByteBuffer byteBuffer = ByteBuffer.allocate(128);
						channel.read(byteBuffer);
						byteBuffer.flip();
						CharBuffer charBuffer = CharsetHelper.decode(byteBuffer);
						String answer = charBuffer.toString(); 
						System.out.println(Thread.currentThread().getId() + "---" + answer);

						String word = getWord();
						if(word != null){
							channel.write(CharsetHelper.encode(CharBuffer.wrap(word)));
						}
						else{
							isOver = true;
						}
						sleep();
					}
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
		finally{
			if(channel != null){
				try {
					channel.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}

			if(selector != null){
				try {
					selector.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}

	private void init() {
		words = new ArrayBlockingQueue<String>(5);
		try {
			words.put("hi");
			words.put("who");
			words.put("what");
			words.put("where");
			words.put("bye");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}	

		random = new Random();
	}

	private String getWord(){
		return words.poll();
	}

	private void sleep() {
		try {
			TimeUnit.SECONDS.sleep(random.nextInt(3));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}	

	private void sleep(long l) {
		try {
			TimeUnit.SECONDS.sleep(l);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}
时间: 2024-10-11 12:22:58

JAVA NIO 服务器与客户端实现示例的相关文章

JAVA AIO 服务器与客户端实现示例

AIO用于文件处理还是比较快乐的,但用AIO来写网络消息处理服务器端与客户端是比较麻烦的事情,当然这只是我个人意见,主要是有几点原因: 一是AIO需要操作系统支持,还好Windows与Linux(模拟)都支持: 二是AIO同时使用递归调用和异步调用容易把程序员搞晕,代码容易出错: 三是CompletionHandler会使用单独的线程跑,容易出现多线程问题,频繁线程上下文切换比较消耗资源: 四是异步写要创建队列来缓存需要写入的数据,否则肯定会遇到WritePendingException. 相对

JAVA BIO 服务器与客户端实现示例

代码只兼容JAVA 7及以上版本. 服务器端代码: package com.stevex.app.bio; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; import javax.net.Serv

JAVA NIO工作原理及代码示例

简介:本文主要介绍了JAVA NIO中的Buffer, Channel, Selector的工作原理以及使用它们的若干注意事项,最后是利用它们实现服务器和客户端通信的代码实例. 欢迎探讨,如有错误敬请指正 如需转载,请注明出处 http://www.cnblogs.com/nullzx/ 1. ByteBuffer 1.1直接缓冲区和非直接缓冲区 下面是创建ByteBuffer对象的几种方式 static ByteBuffer allocate(int capacity) static Byte

JAVA NIO服务器间连续发送文件(本地测试版)

说在前面:给我的需求是实现从服务器A将大量文件(大概几十TB)发送到服务器B,在A服务器生成文件的MD5码,并在服务器B进行md5验证,验证通过保存. 我的实现思路: 将待上传的所有文件目录生成一个txt文件,格式如下.前缀中,当后面的数字等于9999的时候,前面的数字会自行相加.(此处加前缀的目的是为了整齐,而且失败了便于查询.) AAA0000:D:\upload\addChannel.html AAA0001:D:\upload\addChannel2.html AAA0002:D:\up

异步Socket服务器与客户端

本文灵感来自Andre Azevedo 在CodeProject上面的一片文章,An Asynchronous Socket Server and Client,讲的是异步的Socket通信. Socket连接(Socket Connection) Socket服务(Socket Service) 连接主机(Connection Host) 加密与压缩(Encrypt与Compress) 请求入队(Enqueuing Requests) 确保发送和接收(Ensure send and recie

Java NIO 非阻塞Socket服务器构建

部分内容引用自xpbug的Blog. 说到socket服务器,第一反应是java.net.Socket这个类.事实上在并发和响应时间要求不高的场合,是可以用java.net.Socket来实现的,比如写一个局域网聊天工具.发送文件等.但它的缺点也很明显,需要自行对接受的线程进行维护,管理缓冲区的分配等,我尝试过用java.net.Socket完成一个瞬时负载在千人左右的服务器,却因后期改动和维护异常麻烦而放弃. Java自1.4以后,加入了新IO特性,这便是本文要介绍的NIO.下面是一段服务器的

Java NIO示例:多人网络聊天室

一个多客户端聊天室,支持多客户端聊天,有如下功能: 功能1: 客户端通过Java NIO连接到服务端,支持多客户端的连接 功能2:客户端初次连接时,服务端提示输入昵称,如果昵称已经有人使用,提示重新输入,如果昵称唯一,则登录成功,之后发送消息都需要按照规定格式带着昵称发送消息 功能3:客户端登录后,发送已经设置好的欢迎信息和在线人数给客户端,并且通知其他客户端该客户端上线 功能4:服务器收到已登录客户端输入内容,转发至其他登录客户端. 功能5 TODO:客户端下线检测  方案是:客户端在线的时候

Java Se : Java NIO(服务端)与BIO(客户端)通信

Java目前有三种IO相关的API了,下面简单的说一下: BIO,阻塞IO,最常用的Java IO API,提供一般的流的读写功能.相信学习Java的人,都用过. NIO,非阻塞IO,在JDK1.4中开始出现,大量应用与服务器端编程,用于提高并发访问的性能,常用的NIO框架有Netty,Mina. AIO,异步IO,在JDK1.7开始出现.还没有了解过,等以后了解了再说. 阻塞.非阻塞,同步.异步 在写这篇文章前,在网上了解了一下,其中争议最的问题要数阻塞.非阻塞怎么理解,异步.同步怎么理解.

Kafka JAVA客户端代码示例--高级应用

什么时间使用高级应用? 针对一个消息读取多次 在一个process中,仅仅处理一个topic中的一组partitions 使用事务,确保每个消息只被处理一次 使用高级应用(调用较底层函数)的缺点? SimpleConsumer需要做很多额外的工作(在以groups方式进行消息处理时不需要) 在应用程序中跟踪上次消息处理的offset 确定一个topic partition的lead broker 手工处理broker leander的改变 使用底层函数(SimpleConsumer)开发的步骤