java网络编程基础(五)—NIO非阻塞编程

本节讲点:

  • NIO引入:分析普通Socket通信中存在的IO问题--阻塞通信,并分析传统的线程池的优缺点,进而引入NIO解决方案
  • NIO核心框架:讲解NIO的4个核心概念所对应的框架包,包括缓存Buffer、字符集Charset、通道Channel及选择器Selector
    • 缓存Buffer:是包含数据且用于读写的线性表结构。
    • 字符集Charset:提供Unicode字符串映射到字节序列及逆映射操作
    • 通道Channel:包含Socket、File和Pipe3种管道,双向的
    • 选择器Selector:将多元异步I/O操作集中到一个或多个线程中
  • NIO通道编程
    • 文件通道FileChannel:用于实现对文件的读写、锁定和映射
    • Socket通道SocketChannel:实现基于Socket的通道
    • ServerSocket通道ServerSocketChannel:基于SeverSocket通道
    • 数据报通道DatagramChannel:实现基于DatagramSocket的通道

NIO引入

  1. 基于Socket通信存在的问题 -- I/O阻塞通信

    1. 阻塞监听客户端输入。BufferedReader类的readLine()方法阻塞。
    2. 阻塞监听客户端连接。accept()方法的调用造成阻塞
    3. 读取阻塞。产生大量的String垃圾BufferedReader创建了缓存区从客户端套接字读入数据,但是同样创建了一些字符串存储这些数据。
  2. 传统解决方法 --- 使用线程池
    1. 面对大量用户请求时,需要使用大量的线程,这一般是实现一个线程池来处理请求
    2. 线程池是服务器可以处理多个连接,但是它们同样引发了许多问题。每个线程都拥有自己的栈空间并且占用一些CPU时间,耗费很大。
  3. 最新的解决方案 --- NIO非阻塞通信

    NIO的非阻塞I/O机制是围绕选择器和通道构建的。Channel类表示服务器和客户端之间的一种通信机制。与反应器模式一致,Selector是Channel的多路复用器。

NIO核心框架

在NIO解决方案中的缓存区、Selector、Channel等概念,

  • 缓存区:表示数据存放的容器,提供可读写的数据缓存区
  • 字符集:用来对缓存区数据进行解码和编码,在字节和Unicode字符之间
  • 通道:用来接收或发送数据,提供文件、套接字等的连接
  • 选择器:它们与可选择通道一起定义了多路、无阻塞的I/O设施

NIO框架位于Java.nio包中,它为每个概念都提供了核心的支撑类

缓存区Buffer的4个基本属性

capacity:容量,一般在Buffer被创建时指定。int capacity();//返回此缓存区容量

limit:限制。在Buffer上进行的操作不能越过这个下标。当写数据到Buffer中时,limit一般和capacity相等,当读数据时,limit代表Buffer中有效数据的长度,取得和修改限制的方法如下:

int limit();//返回此缓存区的限制

Buffer limit(int newLimit);//设置此缓存区的限制

position:读写操作当前下标。当使用Buffer的相对位置进行读写操作时,读写会从这个下标进行,并完成操作后,Buffer会更显下标值,

int position();//返回此缓存区的位置

Buffer position(int newPosition);//设置此缓存区的位置

mark:一个临时存放的位置下标

字符集Charset -- 编码与解码

通过构造java.nio.charset.CharsetEncoder和java.nio.charset.CharsetDecoder将字符序列CharBuffer转换成ByteBuffer和逆转换

从CharBuffer到ByteBuffer的编码转换

Charset charset = Charset.forName("UTF-8");
CharsetEncoder encoder = charset.newEncoder();
ByteBuffer byteBuffer = encoder.encode(charBuffer);

从ByteBuffer到CharBuffer的解码转换

Charset charset = Charset.forName("UTF-8");
CharsetDecoder decoder = charset.newDecoder();
CharBuffer charBuffer = decoder.decode(byteBuffer);

通道Channel

    NIO中提供了Channel通道类来读写Buffer。Channel是一个连接,可用于接收和发送数据。因为Channel连接的是底层的物理设备,它可以直接支持设备的读写,或者文件锁。

最下面的4个类为具体实现类。共包括7个接口类、3个抽象类和4个实现类

  • Channel是最顶层的接口。
  • ReadableByteChannel和WritableByteChannel分别提供对通道读取和写入Byte数据的功能
  • ByteChannel用来将读取和写入的功能合并
  • ScatteringByteChannel和GatheringByteChannel分别提供了批量读取和写入ByteBuffer数组的能力
  • InterruptibleChannel提供了多线程异步关闭的能力

7个接口类的详细方法如下

  • Channel代表一个可以进行IO操作的通道。定义了一下方法
boolean isOpen();//判断此通道是否处于打开状态
void close();//关闭这个Channel,相关资源被释放
  • ReadableByteChannel和WritableByteChannel分别定义了一个读取和写入byte数据的Channel接口。分别定义了read()和write()方法。
int read(ByteBuffer dst);//将字节序列从此通道中读入到给定的缓存区
int write(ByteBuffer src);//将字节序列从给定的缓存区写入此通道
  • ByteChannel没有定义新的方法,它的作用只是把ReadableByteChannel和WritableByteChannel合并在一起
  • ScatteringByteChannel可以一次将数据从通道读入多个ByteBuffer中。而GatheringByteBufferChannel可以一次将多个ByteBuffer中的数据写入通道。提供的方法如下
long read(ByteBuffer[] dsts);//将字节序列从此通道中读入给定的缓存区
long read(ByteBuffer[],int offset,int length);//讲字节序列从此通道中读入给定的缓存区,位置从offset开始到length
long write(ByteBuffer[] src);//将字节序列从给定的缓存区写入此通道
long write(ByteBuffer[] src,int offset,int length);//将字节序列从给定的缓存区写入此通道,位置从offset开始到length
  • InterruptibleChannel用来提供一个可以被异步关闭的Channel,它覆盖了Channel接口的关闭方法close()。

选择器Selector

    Selector是非阻塞I/O的核心,可以同时监控多个SelectableChannel的I/O状况,对每一个监听到的事件都产生一个SelectionKey对象。使用Selector为SelectableChannel所用,需要经历如下3个步骤

(1)创建选择器Selector。

Selector类提供了静态方法,用于创建新的一个Selector实例。

static Selector open();
/**实例如下*/
try{
    //创建一个Selector
    Selector selector = Selector.open();
}catch(IOException e){}

(2)注册到SelectableChannel

SelectableChannel在打开后,可以使用register()将它注册到特定的选择器,实现通道与选择器的事件绑定。

/**注册到SelectableChannel示例
 * 必须使用configureBlocking(false)将通道设置为非阻塞
 * 才可以向Selector注册SelectableChannel对象
 */
try{
    //创建一个Selector
    Selector selector = Selector.open();
    
    //注册到Selector
    SelectableChannel socket = SocketChannle.open();
    socket.configureBlocking(false);//设置为非阻塞模式
    socket.register(selector,SelectionKey.OP_CONNECT);
}catch(IOException e){
}

(3)处理监听事件对象SelectionKey

static int OP_ACCEPT;//用于套接字接受操作的操作集位
static int OP_CONNECT;//用于套接字连接操作的操作集位
static int OP_READ;//用于读取操作的操作集位
static int OP_WRITE;//用于写入操作的操作集位

注册了事件类型以后,就可以使用Selector的select()监听该事件

int select();//监控所有注册的channel
int select(long timeout);//可以设置超时的select()操作
int selectNow();//进行一个立即返回的select()操作
Selector wakeup();//使一个还未返回的selecor()操作立即返回

一旦有该事件触发,就可以使用Selecor的selectedKeys()方法返回所有该事件的列表。可以循环处理该事件列表,在处理前删除当前事件,防止重复处理。

/**监听事件处理过程*/
while(true){
    //监听事件
    selector.select();
    //事件来源列表
    Iterator<SelectorKey> it = selecotr.selectedKeys().iterator();
    while(it.hasNext()){
        SelectorKey key = it.next();
        //删除当前事件
        it.remove();
        
        //判断事件类型
        if(key.isConnectable()){
            //连接事件
            SocketChannel channel = (SocketChannel)key.channel();
            //注册新的事件
            channel.register(selector,SelectionKey.OP_READ);
        }else if(key.isReadable){
            //读取数据事件
            SocketChannel channel = (SocketChannel)key.channle();
            //注册新的事件
            channel.register(selector,SelectionKey.OP_WRITE);
        }else if(key.isWritable){
            //写入数据事件
            SocketChannel channel = (SocketChannel)key.channle();
            //注册新的事件
            channel.register(selector,SelectionKey.OP_READ);
        }
    }
}

NIO通道编程详解

  • 文件通道FileChannel:用于对文件的读取、写入、锁定和映射
  • Socket通道SocketChannel:实现基于Socket的通道
  • ServerSocket通道ServerSocketChannel:实现基于ServerSocket的通道
  • 数据报通道DatagramChannel:实现基于DatagramSocket的通道

1、文件通道FileChannel

在Channel使用中,文件通道FileChannel是最具有代表性的。FileChannel实现了类似输入输出流的功能,用于读取、写入、锁定和映射文件。

(1)创建FileChannel对象

FileChannel并没有向我们提供打开文件的方法,我们可以通过调用RandomAccessFile、fileInputStream和FileOutputStream类实例的getChannel(0方法获取其实例,如图示

给出示例演示3中获取FileChannel对象的方法,注意在打开文件后要关闭FileChannel和来源文件对象

package org.test.nio;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;

public class TestFileChannel {

	public static void main(String[] args) {
		// TODO 获取FileChannel实例
		//1、根据RandomAccessFile获得
		try {
			//RandomAccessFile打开文件rw适用于读写FileChannel
			RandomAccessFile randomFile = new RandomAccessFile("D:\\DownloadFile\\test.txt", "rw");
			FileChannel channel = randomFile.getChannel();
			channel.close();
			randomFile.close();
		} catch (FileNotFoundException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		//2、根据FileInputStream获取
		try {
			FileInputStream fis = new FileInputStream("D:\\DownloadFile\\test.txt");
			FileChannel channel = fis.getChannel();
			channel.close();
			fis.close();
		} catch (FileNotFoundException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		//3、根据FileOutputStream获取
		try {
			FileOutputStream fos = new FileOutputStream(":\\DownloadFile\\test.txt");
			FileChannel channel = fos.getChannel();
			channel.close();
			fos.close();
		} catch (FileNotFoundException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

(2)从FileChannel中读取数据

FileChannel提供了4个方法读取数据

a、读入缓存区对象 abstract int read(ByteBuffer dst);

b、读入缓存区数组 long read(ByteBuffer[] dsts);

c、读入缓存区数据 long read(ByteBuffer[] dsts,int offset,int length);

d、从给定位置缓存区数组 abstract int read(ByteBuffer dst,long postion);//返回当前文件的位置,额可以通过size()返回文件的大小。

//读取数据
			ByteBuffer byteBuffer = ByteBuffer.allocate(10);
			channel.read(byteBuffer);
			System.out.println(new String(byteBuffer.array()));

(3)向FileChannel写入数据

FileChannel提供了4个方法写入数据

a、写入缓存区对象 abstract int write(ByteBuffer dst);

b、写入缓存区数组 long write(ByteBuffer[] dsts);

c、写入缓存区数据 long write(ByteBuffer[] dsts,int offset,int length);

d、将缓存区数据写入通道中的指定位置 abstract int write(ByteBuffer dst,long postion);//返回当前文件的位置,额可以通过size()返回文件的大小。执行写入操作后,使用force(true)函数强制将所有对此通道的文件更新写入包含该文件中,防止缓存。

//写入数据
			ByteBuffer byteBuffer = ByteBuffer.wrap("我是写入的内容".getBytes());
			channel.write(byteBuffer);

(4)使用文件锁

FileChannel提供了两种加锁机制,分别对应lock()和tryLock(),两者区别在于,lock是同步的,直至成功才返回,tryLock是异步的,无论成不成功都立即返回。

(5)使用内存映射

MappedByteBuffer是通过FileChannel创建的文件到内存的映射。MappedByteBuffer是一个直接缓存区。

try {
			File file = new File("D:\\DownloadFile\\test1.txt");
			if(!file.exists()){
				if(file.createNewFile()){
					System.out.println("文件创建成功");
				}
			}
			//使用内存映射复制文件
			FileChannel channel = new FileInputStream("D:\\DownloadFile\\test.txt").getChannel();
			FileChannel channel2 = new FileOutputStream(file).getChannel();
			//映射第一个文件到内存
			MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size());
			//写入到第二个文件
			channel2.write(buffer);
			channel.close();
			channel2.close();
		} catch (FileNotFoundException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

2、Socket通道SocketChannel

为了实现基础的非阻塞套接字读写操作,我们需要处理两个类,InetSocketAddress类,它指定连接到哪里,以及SocketChannel类,它执行实际的读写操作。

InetSocketAddress类与InetAddress类相似,用于创建指向某一个主机和端口的地址对象。

InetSocketAddress(InetAddress addr,int port);//根据IP地址和端口号创建
InetSocketAddress(int port);//IP地址为通配符地址,端口号指定
InetSocketAddress(String hostname,int port);//根据主机名和端口号指定

创建的InetSocketAddress对象可以通过以下方法取得其中的属性,并可转化为InetAddress

String getHostName();//
int getPort();//
InetAddress getAddress();//

套接字类SocketChannel类似于Socket,可以用于创建一个套接字对象,不同的是,它具有非阻塞的功能。与Socket的使用过程很相似。Socket具体使用方法,请见http://aku28907.blog.51cto.com/5668513/1775429

Socket使用工作过程:

(1)创建一个SocketChannel对象:SocketChannel socket = SocketChannel.open();

(2)设置为非阻塞模式:socket.configureBlocking(false);

(3)注册到Selector:socket.register(selector,SelectionKey.OP_CONNECT);

(4)开始连接到远程地址:InetSocketAddress ip = new InetSocketAddress("localhost",12345);socket.connect(ip);

(5)开始处理读写事件

在使用selector的select()开始监听后,第一个监听到的事件是连接事件。

连接事件:在新客户端连接后,必须使用finishConnect()显示地完成连接,然后将该客户端注册为读或写事件,以继续监听该客户端的监听

if(key.isConnectable()){
    //连接事件
     SocketChannel channel = (SocketChannel)key.channel();
     if(channel.isConnectionPeading()){
         channel.finishConnect();//显示地完成连接
     }
     channel.register(selector,SelectionKey.OP_READ);
}

读取数据事件:在读取数据时,可以使用SocketChannel的read()方法读取到ByteBuffer对象中,并使用解码器CharsetDecoder进行解码输出。

if(key.isReadable()){
            //读取数据事件
            SocketChannel channel = (SocketChannel)key.channle();
            //注册新的事件
            channel.register(selector,SelectionKey.OP_WRITE);
            
            //读取数据
            CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int count = channel.read(buffer);
            System.out.println(count + ":" + decoder.decode(buffer));
}

写入数据事件:在写入数据时,可以使用SocketChannel的write()方法写入编码后的ByteBuffer数据

if(key.isWritable()){
    //写入数据事件
    SocketChannel channel = (SocketChannel)key.channle();
    channel.register(selecotr,SelectionKey.OP_READ);
    
    //写入数据
    CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
    channel.write(encoder.encode(CharBuffer.wrap("Hello")));
}

关闭连接:selector.close();socket.close();

完成代码示例如下:

(后续客户端将基于此示例进行完善)

package org.test.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.util.Iterator;

public class TestSocketChannel {

	public static void main(String[] args) {
		Selector selector = null;
		SocketChannel socket = null;

		try {
			// TODO 创建一个Selector
			selector = Selector.open();

			// TODO 创建并注册Socket
			socket = SocketChannel.open();
			socket.configureBlocking(false);
			socket.register(selector, SelectionKey.OP_CONNECT);

			// TODO 连接到远程地址
			InetSocketAddress ip = new InetSocketAddress("localhost", 12345);
			socket.connect(ip);

			//TODO 监听事件
			while(true){
				selector.select();
				//事件来源列表
				Iterator<SelectionKey> it = selector.selectedKeys().iterator();
				while(it.hasNext()){
					SelectionKey key = it.next();
					//删除当前事件
					it.remove();

					//判断当前事件类型
					if(key.isConnectable()){
						//连接事件
						SocketChannel channel = (SocketChannel)key.channel();
						channel.register(selector, SelectionKey.OP_READ);
					}else if(key.isReadable()){
						//读取数据事件
						SocketChannel channel = (SocketChannel)key.channel();
						channel.register(selector, SelectionKey.OP_WRITE);

						//读取数据
						CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
						ByteBuffer buffer = ByteBuffer.allocate(1024);
						int count = channel.read(buffer);
						System.out.println(count + ":" + decoder.decode(buffer));
					}else if(key.isWritable()){
						//写入数据事件
						SocketChannel channel = (SocketChannel)key.channel();
						channel.register(selector, SelectionKey.OP_READ);

						//写入数据
						CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
						channel.write(encoder.encode(CharBuffer.wrap("Hello")));
					}
				}
			}
		} catch (ClosedChannelException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (CharacterCodingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally{
			try {
				selector.close();
				socket.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
}

3、ServerSocket通道ServerSocketChannel

    ServerSocketChannel类似于ServerSocket,可以用于创建一个服务端套接字对象,它具有非阻塞的功能。与ServerSocket的使用过程相似,ServerSocketChannel的工作过程如下:

(1)创建一个ServerSocketChannel对象:ServerSocketChannel server = ServerSocketChannel.open();

(2)设置为非阻塞模式:server.configureBlocking(false);

(3)注册到Selector:server.register(selector,SelectionKey.OP_ACCEPT);

(4)开始启动端口监听

创建一个InetSocketAddress地址对象,使用SocketChannel的socket()函数取得ServerSocket对象,然后再使用ServerSocket的bind()函数绑定到指定的地址端口

InetSocketAddress ip = new InetSocketAddress("localhost",12345);
server.socket().bind(ip);

(5)开始处理客户端连接事件和处理时间

客户端连接事件、读取数据事件和关闭连接

完整实例如下:

(后续实例中,我们将完善此服务端代码)

package org.test.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.util.Iterator;

public class TestServerSocketChannel {

	public static void main(String[] args) {
		Selector selector = null;
		ServerSocketChannel server = null;

		try {
			// TODO 创建一个Selector
			selector = Selector.open();

			// TODO 创建Socket并注册
			server = ServerSocketChannel.open();
			server.configureBlocking(false);
			server.register(selector, SelectionKey.OP_ACCEPT);

			// TODO 启动端口监听
			InetSocketAddress ip = new InetSocketAddress("localhost", 12345);
			server.socket().bind(ip);

			// TODO 监听事件
			while(true){
				//监听事件
				selector.select();
				//事件来源列表
				Iterator<SelectionKey> it = selector.selectedKeys().iterator();
				while(it.hasNext()){
					SelectionKey key = it.next();
					//删除该事件
					it.remove();

					//判断事件类型
					if(key.isConnectable()){
						//连接事件
						SocketChannel channel = (SocketChannel)key.channel();
						if(channel.isConnectionPending()){
							channel.finishConnect();
						}
						channel.register(selector, SelectionKey.OP_READ);
					}else if(key.isReadable()){
						//读取数据事件
						//读取数据事件
						SocketChannel channel = (SocketChannel)key.channel();
						channel.register(selector, SelectionKey.OP_WRITE);

						//读取数据
						CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
						ByteBuffer buffer = ByteBuffer.allocate(1024);
						int count = channel.read(buffer);
						System.out.println(count + ":" + decoder.decode(buffer));
					} else if(key.isWritable()){
						SocketChannel channel = (SocketChannel)key.channel();
						channel.register(selector, SelectionKey.OP_READ);

						//写入数据
						CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
						channel.write(encoder.encode(CharBuffer.wrap("Hello")));
					}
				}
			}
		} catch (ClosedChannelException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (CharacterCodingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally{
			//关闭
			try {
				selector.close();
				server.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}

}

实例一:支持多客户端的Client/Server任务响应程序

演示一个可以接受多个客户端请求的服务器程序,服务端使用非阻塞模式监听多个客户端的连接和发送来的消息,在收到消息后根据消息命令来处理不同的业务逻辑,然后回复给客户端,客户端通过控制台输入的字符串发送给服务器端。

服务器端代码

package org.test.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.util.Iterator;

public class NIOServer {

	public static void main(String[] args) {
		Selector selector = null;
		ServerSocketChannel server = null;

		try {
			// TODO 创建一个Selector
			selector = Selector.open();

			// TODO 创建Socket并注册
			server = ServerSocketChannel.open();
			server.configureBlocking(false);
			server.register(selector, SelectionKey.OP_ACCEPT);

			// TODO 启动端口监听
			InetSocketAddress ip = new InetSocketAddress(12345);
			server.socket().bind(ip);

			// TODO 监听事件
			while(true){
				//监听事件
				selector.select();
				//事件来源列表
				Iterator<SelectionKey> it = selector.selectedKeys().iterator();
				while(it.hasNext()){
					SelectionKey key = it.next();
					//删除该事件
					it.remove();

					//判断事件类型
					if(key.isAcceptable()){
						//连接事件
						ServerSocketChannel server2 = (ServerSocketChannel) key.channel();
						SocketChannel channel = server2.accept();
						channel.configureBlocking(false);
						if(channel.isConnectionPending()){
							channel.finishConnect();
						}
						channel.register(selector, SelectionKey.OP_READ);
						System.out.println("accept客户端连接:"
								+ channel.socket().getInetAddress().getHostName()
								+ channel.socket().getPort());
					}else if(key.isReadable()){
						//读取数据事件
						SocketChannel channel = (SocketChannel)key.channel();

						//读取数据
						CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
						ByteBuffer buffer = ByteBuffer.allocate(64);
						int count = channel.read(buffer);
						buffer.flip();
						String msg = decoder.decode(buffer).toString();
						System.out.println(count + "收到:" + msg);

						//写入数据
						CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
						channel.write(encoder.encode(CharBuffer.wrap("server"+msg)));
					} else if(key.isWritable()){
						SocketChannel channel = (SocketChannel)key.channel();
						channel.register(selector, SelectionKey.OP_READ);

						//写入数据
						CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
						channel.write(encoder.encode(CharBuffer.wrap("Hello")));
					}
				}
			}
		} catch (ClosedChannelException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (CharacterCodingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally{
			//关闭
			try {
				selector.close();
				server.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
}

客户端代码

package org.test.nio;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class NIOClient {

	public static void main(String[] args) {
		NIOClientThread clientThread = new NIOClientThread();
		clientThread.start();

		//输入输出流
		BufferedReader sin = new BufferedReader(new InputStreamReader(System.in));

		try {
			//循环读取键盘输入
			String readLine;
			while((readLine = sin.readLine()) != null){
				if(readLine.equals("bye")){
					clientThread.close();
					System.exit(0);
				}
				clientThread.send(readLine);
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

客户端处理线程

package org.test.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.util.Iterator;

public class NIOClientThread extends Thread {
	private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
	private CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
	private Selector selector = null;
	private SocketChannel socket = null;
	private SelectionKey clientKey = null;

	// TODO 启动客户端
	public NIOClientThread() {
		try {
			// 创建一个Selector
			selector = Selector.open();

			// 创建并注册Socket
			socket = SocketChannel.open();
			socket.configureBlocking(false);
			clientKey = socket.register(selector, SelectionKey.OP_CONNECT);

			// 连接到远程地址
			InetSocketAddress ip = new InetSocketAddress("localhost", 12345);
			socket.connect(ip);
		} catch (ClosedChannelException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	// TODO 读取事件
	@Override
	public void run() {
		try {
			// 监听事件
			while (true) {
				selector.select(1);
				// 事件来源列表
				Iterator<SelectionKey> it = selector.selectedKeys().iterator();
				while (it.hasNext()) {
					SelectionKey key = it.next();
					// 删除当前事件
					it.remove();

					// 判断当前事件类型
					if (key.isConnectable()) {
						// 连接事件
						SocketChannel channel = (SocketChannel) key.channel();
						if(channel.isConnectionPending()){
							channel.finishConnect();
						}
						channel.register(selector, SelectionKey.OP_READ);
						System.out.println("连接服务器端成功!");
					} else if (key.isReadable()) {
						// 读取数据事件
						SocketChannel channel = (SocketChannel) key.channel();
						channel.register(selector, SelectionKey.OP_WRITE);

						// 读取数据
						ByteBuffer buffer = ByteBuffer.allocate(64);
						int count = channel.read(buffer);
						buffer.flip();
						String msg = decoder.decode(buffer).toString();
						System.out.println(count + "收到:" + msg);
					}
				}
			}
		} catch (ClosedChannelException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (CharacterCodingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	// TODO 发送消息
	public void send(String msg) {
		// 写入数据事件
		try {
			SocketChannel channel = (SocketChannel) clientKey.channel();
			channel.write(encoder.encode(CharBuffer.wrap(msg)));
		} catch (CharacterCodingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	// TODO 关闭客户端
	public void close(){
		try {
			selector.close();
			socket.close();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

运行结果

服务器端:

accept客户端连接:1.0.0.127.in-addr.arpa52591
15收到:你好服务器
accept客户端连接:1.0.0.127.in-addr.arpa52596
9收到:啊啊啊

客户端:
连接服务器端成功!
你好服务器
21收到:server你好服务器

连接服务器端成功!
啊啊啊
15收到:server啊啊啊

4、数据报通道DatagramChannel

DatagramChannel与DatagramSocket类似,用于实现非阻塞的数据报通信。与DatagramSocket的使用过程相似,Datagram的使用过程见:http://aku28907.blog.51cto.com/5668513/1782137。DatagramChannel的工作过程如下:

(1)创建一个DatagramChannel对象:socket = DatagramChannel.open();

(2)开始连接到远程地址

创建一个InetSocketAddress地址对象,使用SocketChannel的connect()函数连接该地址

InetSocketAddress ip = new InetSocketAddress("localhost",12345);

socket.connect(ip);

(3)发送或者接收数据

可以使用socket的write()来发送Buffer对象的数据,用receive()来接收Buffer数据。

socket.write(buffer);

socket.receive(buffer);

实例二:利用数据报通信的Client/Server程序

(示例比较简单,有兴趣的博友可以参照SocketChannel进行改造下)

服务器端

package org.test.nio.udp;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;

public class UDPServer {

	public static void main(String[] args) {
		DatagramChannel socket = null;
		try {
			//创建socket
			socket = DatagramChannel.open();
			InetSocketAddress ip = new InetSocketAddress("localhost", 12345);
			socket.socket().bind(ip);

			//循环监听
			while(true){
				CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
				ByteBuffer buffer = ByteBuffer.allocate(64);
				socket.receive(buffer);
				buffer.flip();
				System.out.println(decoder.decode(buffer).toString());
			}
		} catch (SocketException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (CharacterCodingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally{
			try {
				socket.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}

	}

}

客户端

package org.test.nio.udp;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.CharBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;

public class UDPClient {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		DatagramChannel socket = null;
		try {
			//创建一个Socket
			socket = DatagramChannel.open();
			InetSocketAddress ip = new InetSocketAddress("localhost", 12345);
			socket.connect(ip);

			//发送数据
			CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
			socket.write(encoder.encode(CharBuffer.wrap("Hello")));
		} catch (CharacterCodingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally{
			try {
				socket.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}

}

运行结果:

时间: 2024-10-17 06:03:55

java网络编程基础(五)—NIO非阻塞编程的相关文章

java网络爬虫基础学习(一)

刚开始接触java爬虫,在这里是搜索网上做一些理论知识的总结 主要参考文章:gitchat 的java 网络爬虫基础入门,好像要付费,也不贵,感觉内容对新手很友好. 一.爬虫介绍 网络爬虫是一个自动提取网页的程序,它为搜索引擎从万维网下载网页,是搜索引擎的重要组成部分. 传统爬虫: 获得URL ->放入队列 ->抓取网页,分析信息 ->新的URL ->放入队列 ->抓取网页,分析信息... ->满足一定条件,停止. 聚焦爬虫: 根据一定的网页分析算法过滤与主题无关的链接

java网络爬虫基础学习(三)

尝试直接请求URL获取资源 豆瓣电影 https://movie.douban.com/explore#!type=movie&tag=%E7%83%AD%E9%97%A8&sort=time&page_limit=20&page_start=0 浏览器打开该地址: 发现是这样的 在这里我们需要用java抓取电影的信息,首先要找到资源链接,浏览器右键->检查打开谷歌调试工具 我们可以看到下图 有很多的资源请求,在这里我是一个个搜索,看那个是电影信息的Headers 发

网络编程之IO模型——非阻塞IO

网络编程之IO模型--非阻塞IO 非阻塞IO(non-blocking IO) Linux下,可以通过设置socket使其变为non-blocking.当对一个non-blocking socket执行读操作时,流程是这个样子: 从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error.从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果.用户进程判断结果是一个error时,它就

Java NIO非阻塞理论学习

Java NIO和阻塞IO的区别: 阻塞I/O在调用InputStream.read()方法时是阻塞的,它会一直等到数据到来时(或超时)才会返回:同样,在调用ServerSocket.accept()方法时,也会一直阻塞到有客户端连接才会返回,每个客户端连接过来后,服务端都会启动一个线程去处理该客户端的请求. 阻塞I/O的缺点: 1. 当客户端多时,会创建大量的处理线程.且每个线程都要占用栈空间和一些CPU时间 2. 阻塞可能带来平凡的上下文切换,且大部分上下文切换可能是无意义的. Java N

Java NIO 非阻塞Socket服务器构建

部分内容引用自xpbug的Blog. 说到socket服务器,第一反应是java.net.Socket这个类.事实上在并发和响应时间要求不高的场合,是可以用java.net.Socket来实现的,比如写一个局域网聊天工具.发送文件等.但它的缺点也很明显,需要自行对接受的线程进行维护,管理缓冲区的分配等,我尝试过用java.net.Socket完成一个瞬时负载在千人左右的服务器,却因后期改动和维护异常麻烦而放弃. Java自1.4以后,加入了新IO特性,这便是本文要介绍的NIO.下面是一段服务器的

Linux网络编程--非阻塞编程

非阻塞方式程序设计介绍 非阻塞方式的操作与阻塞方式的操作最大的不同点是函数的调用立刻返回,不管数据是否成功读取或者成功写入.使用fcntl()将套接字文件描述符按照如下的代码进行设置后,可以进行非阻塞的编程: fcntl(s, F_SETFL, O_NONBLOCK); 其中的s是套接字文件描述符,使用F_SETFL命令将套接字s设置为非阻塞方式后,再进行读写操作就可以马上返回了. 非阻塞程序设计的例子 函数accept()可以使用非阻塞的方式轮询等待客户端的到来,在之前要设置NON_BLOCK

网络I/O模型--04非阻塞模式(解除accept()、 read()方法阻塞)的基础上加入多线程技术

由于应用程序级别并没有使用多线程技术,这就导致了应用程序只能一个一个地对Socket 套接字进行处理.这个 Socket 套接宇没有处理完,就没法处理下一个 Socket 套接字 .针对这个 问题还是可以进行改进的:让应用程序层面上各个 Socket 套接字的处理相互不影响 . 服务端代码 package testBlockSocket; import java.io.InputStream; import java.io.OutputStream; import java.net.Server

《Java并发编程实战》笔记-非阻塞算法

如果在某种算法中,一个线程的失败或挂起不会导致其他线程也失败和挂起,那么这种算法就被称为非阻塞算法.如果在算法的每个步骤中都存在某个线程能够执行下去,那么这种算法也被称为无锁(Lock-Free)算法.如果在算法中仅将CAS用于协调线程之间的操作,并且能正确地实现,那么它既是一种无阻塞算法,又是一种无锁算法. 创建非阻塞算法的关键在于,找出如何将原子修改的范围缩小到单个变量上,同时还要维护数据的一致性. 非阻塞算法的所有特性:某项工作的完成具有不确定性,必须重新执行.

Tomcat性能优化之开启Nio非阻塞模式

了解一下,Tomcat支持三种接收请求的处理方式:BIO.NIO.APR . 1:Bio方式,阻塞式I/O操作即使用的是传统Java I/O操作,Tomcat7以下版本默认情况下是以bio模式运行的,由于每个请求都要创建一个线程来处理,线程开销较大,不能处理高并发的场景,在三种模式中性能也最低 配置如下(tomcat安装目录下的/conf/server.xml): tomcat启动如下,看到http-bio-8080便是bio模式: 2:Nio方式,是Java SE 1.4及后续版本提供的一种新