JAVA NIO ServerSocketChannel(线程池版)

服务器端:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;

public class ServerSocketThreadPool{
	private static final int MAX_THREAD = Runtime.getRuntime().availableProcessors();
	private ThreadPool pool = new ThreadPool(MAX_THREAD);

	private static int PORT_NUMBER = 1234;

	public static void main(String[] args) throws Exception {
		new ServerSocketThreadPool().go();

	}

	public void go() throws Exception {
		int port = PORT_NUMBER;
		System.out.println("Listenning on port:" + port);
		// 创建通道 ServerSocketChannel
		ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
		// 绑定监听端口
		serverSocketChannel.socket().bind(new InetSocketAddress(port));
		// 设置为非阻塞方式
		serverSocketChannel.configureBlocking(false);
		// 创建选择器
		Selector selector = Selector.open();

		// 通道注册到选择器
		serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

		while (true) {
			// 一直阻塞,直到有数据请求
			int n = selector.select();
			if (n == 0) {
				continue;
			}
			Iterator<SelectionKey> it = selector.selectedKeys().iterator();
			while (it.hasNext()) {
				SelectionKey key = it.next();
				if (key.isAcceptable()) {
					ServerSocketChannel server = (ServerSocketChannel) key.channel();
					SocketChannel socket = server.accept();
					registerChannel(selector,socket, SelectionKey.OP_READ);
					sayHello(socket);
				}
				if (key.isReadable()) {
					readDataFromSocket(key);
				}
				it.remove();
			}

		}

	}

	public void registerChannel(Selector selector,SelectableChannel channel,int ops)throws Exception{
		if(channel==null){
			return;
		}
		channel.configureBlocking(false);
		channel.register(selector, ops);

	}

	public void sayHello(SocketChannel socket) throws Exception{
		ByteBuffer buffer=ByteBuffer.allocate(1024);
		buffer.clear();
		buffer.put("hello client".getBytes());
		buffer.flip();
			socket.write(buffer);
	}

	public void readDataFromSocket(SelectionKey key) throws Exception {
		WorkThread thread=pool.getWork();
		if(thread==null){
			return;
		}
		thread.serviceChannel(key);
	}

	private class ThreadPool {
		List idle=new LinkedList();

		public ThreadPool(int poolSize) {
			for(int i=0;i<poolSize;i++){
				WorkThread thread=new WorkThread(this);
				thread.setName("worker"+(i+1));
				thread.start();
				idle.add(thread);
			}

		}
		public WorkThread getWork(){
			WorkThread thread=null;
			synchronized (idle) {
				if(idle.size()>0){
					thread=(WorkThread) idle.remove(0);

				}
			}
			return thread;
		}

		public void returnWorker(WorkThread workThread) {
			synchronized (idle) {
				idle.add(workThread);
			}
		}

	}

	private class WorkThread extends Thread {
		private ByteBuffer buffer = ByteBuffer.allocate(1024);
		private ThreadPool pool;
		private SelectionKey key;

		public WorkThread(ThreadPool pool) {
			this.pool = pool;
		}

		public synchronized void run() {
			System.out.println(this.getName() + " is ready");
			while (true) {
				try {
					this.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
					this.interrupt();
				}
				if (key == null) {
					continue;
				}
				System.out.println(this.getName() + " has been awaken");
				try{
					drainChannel(key);
				}catch(Exception e){
					System.out.println("caught ‘"+e+"‘ closing channel");
					try{
						key.channel().close();
					}catch(IOException ioe){
						ioe.printStackTrace();
					}
					key.selector().wakeup();
				}
				key=null;
				this.pool.returnWorker(this);

			}

		}
		synchronized void serviceChannel(SelectionKey key){
			this.key=key;
			key.interestOps(key.interestOps()&(~SelectionKey.OP_READ));
			this.notify();
		}

		void drainChannel(SelectionKey key)throws Exception{
			SocketChannel channel=(SocketChannel) key.channel();
			buffer.clear();
			int count;
			while((count=channel.read(buffer))>0){
				buffer.flip();
				/*while(buffer.hasRemaining()){
					channel.write(buffer);
				}*/
				byte[] bytes;
				bytes=new byte[count];
				buffer.get(bytes);
				System.out.println(new String(bytes));
				buffer.clear();
			}
			if(count<0){
				channel.close();
				return;
			}
			key.interestOps(key.interestOps()|SelectionKey.OP_READ);
			key.selector().wakeup();
		}

	}

}

客户端:

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

public class ClientTest {

	private static SocketChannel socketChannel;
	private static Selector selector;
	public ClientTest()throws Exception {
		socketChannel=SocketChannel.open();
		socketChannel.connect(new InetSocketAddress("127.0.0.1", 1234));
		socketChannel.configureBlocking(false);
		selector=Selector.open();
		socketChannel.register(selector, SelectionKey.OP_READ);
	}
	public static void main(String[] args) throws Exception{
		ClientTest test=new ClientTest();

		ByteBuffer buffer=ByteBuffer.allocate(1024);
		buffer.put("hello server".getBytes());
		buffer.flip();
		while(buffer.hasRemaining()){
			test.socketChannel.write(buffer);
		}

		buffer.clear();
		socketChannel.socket().shutdownOutput();

		String response=receiveData(test.socketChannel);
		System.out.println(response);
	}

	public static String receiveData(SocketChannel socketChannel2) throws IOException {
		ByteArrayOutputStream baos = new ByteArrayOutputStream();
		String response = "";
		try {
			ByteBuffer buffer = ByteBuffer.allocate(1024);
			byte[] bytes;
			int count = 0;
			while ((count = socketChannel2.read(buffer)) >= 0) {
				buffer.flip();
				bytes = new byte[count];
				buffer.get(bytes);
				baos.write(bytes);
				buffer.clear();
			}

			bytes = baos.toByteArray();
			response = new String(bytes);
		} finally {
			try {
				baos.close();
			} catch (Exception ex) {
			}
		}
		return response;
	}

}

更详细关于ServerSocketChannel的线程池的理解,请参考《JAVA NIO》这本书的第四章选择器。

时间: 2024-08-07 03:26:48

JAVA NIO ServerSocketChannel(线程池版)的相关文章

Java四种线程池newCachedThreadPool,newFixedThreadPool,newScheduledThreadPool,newSingleThreadExecutor

介绍new Thread的弊端及Java四种线程池的使用,对Android同样适用.本文是基础篇,后面会分享下线程池一些高级功能. 1.new Thread的弊端 执行一个异步任务你还只是如下new Thread吗? Java new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } }).start(); 1 2 3 4 5 6 7 new Thread(new

Java四种线程池

Java四种线程池newCachedThreadPool,newFixedThreadPool,newScheduledThreadPool,newSingleThreadExecutor 时间:2015-10-20 22:37:40      阅读:8762      评论:0      收藏:0      [点我收藏+] 介绍new Thread的弊端及Java四种线程池的使用,对Android同样适用.本文是基础篇,后面会分享下线程池一些高级功能. 1.new Thread的弊端执行一个异

Java中的线程池

综述 在我们的开发中经常会使用到多线程.例如在Android中,由于主线程的诸多限制,像网络请求等一些耗时的操作我们必须在子线程中运行.我们往往会通过new Thread来开启一个子线程,待子线程操作完成以后通过Handler切换到主线程中运行.这么以来我们无法管理我们所创建的子线程,并且无限制的创建子线程,它们相互之间竞争,很有可能由于占用过多资源而导致死机或者OOM.所以在Java中为我们提供了线程池来管理我们所创建的线程. 线程池的使用 采用线程池的好处 在这里我们首先来说一下采用线程池的

java笔记--使用线程池优化多线程编程

使用线程池优化多线程编程 认识线程池 在Java中,所有的对象都是需要通过new操作符来创建的,如果创建大量短生命周期的对象,将会使得整个程序的性能非常的低下.这种时候就需要用到了池的技术,比如数据库连接池,线程池等. 在java1.5之后,java自带了线程池,在util包下新增了concurrent包,这个包主要作用就是介绍java线程和线程池如何使用的. 在包java.util.concurrent下的 Executors类中定义了Executor.ExecutorService.Sche

java android ExecutorService 线程池解析

ExecutorService: 它也是一个接口,它扩展自Executor接口,Executor接口更像一个抽象的命令模式,仅有一个方法:execute(runnable);Executor接口简单,可是非常重要,重要在这样的设计的模式上..Java5以后,通过Executor来启动线程比用Thread的start()更好.在新特征中,能够非常easy控制线程的启动.运行和关闭过程,还能够非常easy使用线程池的特性. 几种不同的ExecutorService线程池对象 1.newCachedT

Java多线程和线程池(转)

1.为什么要使用线程池 在java中,如果每个请求到达就创建一个新线程,开销是相当大的.在实际使用中,服务器在创建和销毁线程上花费的时间和消耗的系统资源都相当大,甚至可能要比在处理实际的用户请求的时间和资源要多的多.除了创建和销毁线程的开销之外,活动的线程也需要消耗系统资源.如果在一个jvm里创建太多的线程,可能会使系统由于过度消耗内存或“切换过度”而导致系统资源不足.为了防止资源不足,服务器应用程序需要采取一些办法来限制任何给定时刻处理的请求数目,尽可能减少创建和销毁线程的次数,特别是一些资源

Java四种线程池的学习与总结

在Java开发中,有时遇到多线程的开发时,直接使用Thread操作,对程序的性能和维护上都是一个问题,使用Java提供的线程池来操作可以很好的解决问题. 一.new Thread的弊端 执行一个异步任务你还只是如下new Thread吗? new Thread(new Runnable(){ @Override public void run(){ //TODO Auto-generatedmethod stub } } ).start(); 那你就out太多了,new Thread的弊端如下:

Java并发之——线程池

一. 线程池介绍 1.1 简介 线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务.线程池的基本思想还是一种对象池的思想,开辟一块内存空间,里面存放了众多(未死亡)的线程,池中线程执行调度由池管理器来处理.当有线程任务时,从池中取一个,执行完成后线程对象归池,这样可以避免反复创建线程对象所带来的性能开销,节省了系统的资源. 多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力. 假设一个服务器完成一项

使用Java并发包线程池和XML实现定时任务动态配置和管理

文章标题:使用并发包线程池和XML实现定时任务动态配置和管理 文章地址: http://blog.csdn.net/5iasp/article/details/32705601 作者: javaboy2012Email:[email protected]qq:    1046011462 项目结构: 一.采用的知识点: 1. java并发包2. xml配置文件读取3. 反射动态调用类的方法 二. 基本思路 1.  创建线程池: ScheduledExecutorService scheduExe