Java NIO系列4:通道和选择器

前言

今天加班回来,终于有时间继续更新NIO的文章了。在前一篇文章我们讲解了缓冲区的知识,并通过代码演示了如何使用缓冲区的API完成一些操作。这里要讲的通道于缓冲区关系密切,简单来说,缓冲区是填充数据的载体,而通道则可以理解为传输数据的载体。回忆在TCP/IP中建立握手的过程,发送端有一个发送缓冲区而接受端有一个接收缓冲区,进程从缓冲区中取数据,之后缓冲区又可以被填满,而传输数据的网络则可以理解为通道。

通道基础

相比JDK1.4之前的BIO,NIO一个重大的改变就是由原来的每次的连接都会创建一个新的线程区执行,变为只为每个请求创建线程,因为如果一个请求需要建立多次连接的话,BIO的效率就非常低下了。NIO处理事件的模型被称为反应器模型。就是在每个请求到达的时候不会立即进行处理,而是会有一个分发线程将处理请求分发给具体的处理线程进行处理。这样设计的好处在于能够提高吞吐量,提高性能。

那么,这里要提到的通道有什么关系呢?在服务端和客户端有一个专门管理通道的对象,这个对象能够监控每个通道的事件(后台的实现逻辑就是不断轮询每个通道所注册的事件,并判断是否满足要求),如果这个对象发现某个通道所注册的事件发生了,那么注册该事件的通道就可以执行一些自己的处理。

在通道API中,顶层接口是Channel,代码如下:

package java.nio.channels;
public interface Channel
{
public boolean isOpen( );
public void close( ) throws IOException;
}

在该接口中知有打开和关闭通道的方法,那么这两个方法够用吗?当然不够,实际上更具体的方法都在不同的实现类中。而在通道的实现类中又可以分为两类:FileChannel和SocketChannel。这两种通道的打开方式如下:

// 打开Socket通道
SocketChannel sc = SocketChannel.open( );
sc.connect (new InetSocketAddress ("somehost", someport));
ServerSocketChannel ssc = ServerSocketChannel.open( );
ssc.socket( ).bind (new InetSocketAddress (somelocalport));
// 打开UDP通道
DatagramChannel dc = DatagramChannel.open( );
// 打开文件通道
RandomAccessFile raf = new RandomAccessFile ("somefile", "r");
FileChannel fc = raf.getChannel( );

通道实战

通过以上的介绍,我们对通道有了一个基本的认识,下面主要演示如何通过代码的方式使用NIO中通道(由于在NIO应用最广的是Socket通道,所以下面的例子都是基于Socket通道)。

通道的简单使用

下面通过从通道拷贝数据带缓冲区为例,对通道的基本使用做一个简单演示:

package com.rhwayfun.patchwork.nio.channel;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;

/**
 * @author: rhwayfun
 * @since: 2016-05-28
 */
public class ChannelCopy {
    public static void main(String[] args) throws IOException {
        // 创建一个源通道
        ReadableByteChannel source = Channels.newChannel(System.in);
        // 创建一个目标通道
        WritableByteChannel dest = Channels.newChannel(System.out);
        channelCopy(source,dest);
        // 关闭通道
        source.close();
        dest.close();
    }

    /**
     * 通道拷贝
     * @param source
     * @param dest
     * @throws IOException
     */
    private static void channelCopy(ReadableByteChannel source, WritableByteChannel dest) throws IOException {
        // 申请16 * 1024字节大小的通道
        ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
        // 调用read方法读取缓冲区的数据
        while (source.read(buffer) != -1){
            // 翻转缓冲区,执行的操作:
            // 1、将limit的位置设为position之后的一个位置
            // 2、将position的位置重置未0
            buffer.flip();
            // 当缓冲区还有数据的话就写到目标通道中
            while (buffer.hasRemaining()){
                dest.write(buffer);
            }
            // 清空缓冲区
            buffer.clear();
        }
    }

    /**
     * 通道拷贝的另一种方式
     * @param source
     * @param dest
     * @throws IOException
     */
    private static void channelCopy2(ReadableByteChannel source, WritableByteChannel dest) throws IOException {
        ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
        while (source.read(buffer) != -1){
            // 翻转缓冲区
            buffer.flip();
            // 将缓冲区的数据写道目标通道
            dest.write(buffer);
            // 如果只写了一部分数据,将空间进行压缩,可以重复利用空间
            buffer.compact();
        }
        // 翻转缓冲区
        buffer.flip();
        // 将剩余的数据写入目标缓冲区
        while (buffer.hasRemaining()){
            dest.write(buffer);
        }
    }
}

TCP服务器

在前面提到有一个对象对服务端和客户端的通道进行管理,这个对象就是Selector,可以理解为选择器,这个选择器就是为通道服务的。服务器和客户端可以注册自己感兴趣的事件,这样Selector就可以不同的多个通道服务器的状态。通常Selector上可以注册的事件类型如下:

事件描述 事件定义
服务端接收客户端连接事件 SelectionKey.OP_ACCEPT(16)
客户端连接服务端事件 SelectionKey.OP_CONNECT(8)
读事件 SelectionKey.OP_READ(1)
写事件 SelectionKey.OP_WRITE(4)

比如服务器在Selector对象上注册了OP_ACCEPT事件,那么当有客户端连接上的时候,该事件就可以被响应。

下面实现了一个简单的TCP服务器和客户端:

客户端

package com.rhwayfun.patchwork.nio.selector;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

/**
 * @author: rhwayfun
 * @since: 2016-05-28
 */
public class SelectorClient {

    // 连接的主机
    private String host;
    // 主机的端口
    private int port;
    // 选择器
    private Selector selector;
    // 通道
    private SocketChannel socketChannel;

    public SelectorClient(String host,int port){
        this.host = host;
        this.port = port;
        try {
            init();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void init() throws IOException {
        // 打开一个选择器
        selector = Selector.open();
        // 打开一个通道
        socketChannel = SocketChannel.open(new InetSocketAddress(host,port));
        // 要绑定的地址
        //SocketAddress remoteAddress = new InetSocketAddress(host,port);
        // 绑定到指定的地址
        //socketChannel.bind(remoteAddress);
        // 配置为非阻塞模式
        socketChannel.configureBlocking(false);
        // 注册到选择器上
        socketChannel.register(selector, SelectionKey.OP_READ);

        // 监听来自服务端的响应
        new SelectorThread(selector).start();
    }

    public void writeDataToServer(String message) throws IOException {
        ByteBuffer writeBuffer = ByteBuffer.wrap(message.getBytes("UTF-8"));
        socketChannel.write(writeBuffer);
    }

    public static void main(String[] args) throws IOException {
        SelectorClient client = new SelectorClient("localhost",6666);
        client.writeDataToServer("我是一个客户端");
    }
}

服务器

package com.rhwayfun.patchwork.nio.selector;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

/**
 * @author: rhwayfun
 * @since: 2016-05-28
 */
public class SelectorServer {

    // 服务器监听的端口
    private static final int PORT = 6666;
    // 处理数据的缓冲区
    private ByteBuffer buffer = ByteBuffer.allocate(1024);
    // 欢迎消息
    private static final String GREETING = "Welcome to here.";

    public static void main(String[] args) {
        new SelectorServer().start(args);
    }

    private void start(String[] args) {
        int port = PORT;
        if (args.length == 1){
            port = Integer.valueOf(args[0]);
        }
        System.out.println("listening on port " + port);
        Iterator<SelectionKey> iterator = null;
        try {
            //创建一个ServerChannel
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            //获取通道关联的Socket对象
            ServerSocket serverSocket = serverChannel.socket();
            //要绑定的地址
            SocketAddress address = new InetSocketAddress(port);
            //创建需要注册的选择器
            Selector selector = Selector.open();

            //把socket对象绑定到指定的地址
            serverSocket.bind(address);

            //配置为非阻塞模式
            serverChannel.configureBlocking(false);

            //注册通道到选择器
            //第二个参数表名serverChannel感兴趣的事件是OP_ACCEPT类型的事件
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);

            // 选择器不断循环从选择器中选取已经准备好的通道进行操作
            // 选取之后,会对其感兴趣的事件进行处理。将感兴趣的事件
            // 处理完毕后将key从集合中删除,表示该通道的事件已经处
            // 理完毕

            while (true){
                // 这个操作可能会被阻塞,因为不知道注册在这个选择器上的通道是否准备好了
                int n = selector.select();
                if (n == 0){
                    continue;
                }

                // 获取SelectionKey的迭代器对象
                iterator  = selector.selectedKeys().iterator();

                while (iterator.hasNext()){
                    // 获取这个key关联的通道
                    SelectionKey key = iterator.next();
                    // 判断感兴趣的事件类型
                    if (key.isAcceptable()){
                        // 这里可以强制转换为ServerSocketChannel
                        // 因为在这个选择器上目前只注册了一个该类型的通道
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        // 调用accept方法可以得到连接到此地址的客户端连接
                        SocketChannel channel = server.accept();

                        // 注册客户端连接到选择器上,并把感兴趣的事件类型设为可读类型
                        registerChannel(selector,channel,SelectionKey.OP_READ);

                        // 给客户端发送响应消息
                        sayHello(channel);
                    }

                    // 如果是可读类型的事件,则获取传输过来的数据
                    if (key.isReadable()){
                        readDataFromClient(key);
                    }

                    // 将已经处理的key从集合中删除
                    iterator.remove();
                }
            }
        } catch (IOException e) {
            iterator.remove();
        }

    }

    /**
     *
     * @param key
     */
    private void readDataFromClient(SelectionKey key) throws IOException {
        // 获取key管理的Channel对象
        SocketChannel channel = (SocketChannel) key.channel();
        // 读取之前需要清空缓冲区
        buffer.clear();
        if (channel.read(buffer) < 0){
            channel.close();
        }else {
            buffer.flip();
            String receiveMsg = Charset.forName("UTF-8").newDecoder().decode(buffer).toString();
            System.out.println("receive client message: " + receiveMsg + " from " + channel.getRemoteAddress());
        }
    }

    /**
     * 向客户端发送响应消息
     * @param channel
     * @throws IOException
     */
    private void sayHello(SocketChannel channel) throws IOException {
        buffer.clear();
        buffer.put(GREETING.getBytes());
        buffer.flip();
        channel.write(buffer);
    }

    /**
     * 注册客户端连接到选择器上
     * @param selector
     * @param channel
     * @param opRead
     * @throws IOException
     */
    private void registerChannel(Selector selector, SocketChannel channel, int opRead) throws IOException {
        if (channel == null){
            return;
        }
        // 设为非阻塞模式
        channel.configureBlocking(false);
        // 注册该channel到选择器上
        channel.register(selector,opRead);
    }

}

通道线程

package com.rhwayfun.patchwork.nio.selector;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

/**
 * @author: rhwayfun
 * @since: 2016-05-28
 */
public class SelectorThread extends  Thread{

    private Selector selector;

    public SelectorThread(Selector selector) {
        this.selector = selector;
    }

    @Override
    public void run() {
        try {
            // 获取Selector注册的通道数
            int n = selector.select();
            while (n > 0){
                // selector.selectedKeys()可以获取每个注册通道的key
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    if (key.isReadable()){
                        SocketChannel channel = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        channel.read(buffer);
                        buffer.flip();
                        String receiveMsg = Charset.forName("UTF-8").newDecoder().decode(buffer).toString();
                        System.out.println("receive server message: " + receiveMsg + " from " + channel.getRemoteAddress());
                        key.interestOps(SelectionKey.OP_READ);
                    }
                    // 处理下一个事件
                    iterator.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

以上的代码演示如何使用NIO的相关API创建TCP服务器,把上面的代码理解到位,对NIO的API就掌握的差不多了。其实,在NIO编程中,一个重要的思想是非阻塞模式,选择器就是这种思想的体现,体会Selector在通道监听有助于理解非阻塞模式的应用场景。

小结

这样,我们通过实际的代码演示了如何使用NIO的相关API实现一个简单的TCP服务器,关键在于理解NIO模型的原理,非阻塞模式是NIO的思想内核。

时间: 2024-08-29 02:01:09

Java NIO系列4:通道和选择器的相关文章

Java NIO系列教程(五) 通道之间的数据传输

原文地址:http://tutorials.jenkov.com/java-nio/scatter-gather.html 作者:Jakob Jenkov   译者:郭蕾     校对:周泰 在Java NIO中,如果两个通道中有一个是FileChannel,那你可以直接将数据从一个channel(译者注:channel中文常译作通道)传输到另外一个channel. transferFrom() FileChannel的transferFrom()方法可以将数据从源通道传输到FileChanne

Java NIO 系列教程(转)

原文中说了最重要的3个概念,Channel 通道Buffer 缓冲区Selector 选择器其中Channel对应以前的流,Buffer不是什么新东西,Selector是因为nio可以使用异步的非堵塞模式才加入的东西.以前的流总是堵塞的,一个线程只要对它进行操作,其它操作就会被堵塞,也就相当于水管没有阀门,你伸手接水的时候,不管水到了没有,你就都只能耗在接水(流)上.nio的Channel的加入,相当于增加了水龙头(有阀门),虽然一个时刻也只能接一个水管的水,但依赖轮换策略,在水量不大的时候,各

Java NIO 系列教程

转载于http://www.iteye.com/magazines/132-Java-NIO Java NIO(New IO)是从Java 1.4版本开始引入的一个新的IO API,可以替代标准的Java IO API.本系列教程将有助于你学习和理解Java NIO.感谢并发编程网的翻译和投递. (关注ITeye官微,随时随地查看最新开发资讯.技术文章.) Java NIO提供了与标准IO不同的IO工作方式: Channels and Buffers(通道和缓冲区):标准的IO基于字节流和字符流

java NIO系列教程1

ava NIO(New IO)是一个可以替代标准Java IO API的IO API(从Java 1.4开始),Java NIO提供了与标准IO不同的IO工作方式. Java NIO: Channels and Buffers(通道和缓冲区) 标准的IO基于字节流和字符流进行操作的,而NIO是基于通道(Channel)和缓冲区(Buffer)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中. Java NIO: Asynchronous IO(异步IO) Java NIO可以让你

java NIO系列教程2

7.FileChannel Java NIO中的FileChannel是一个连接到文件的通道.可以通过文件通道读写文件. FileChannel无法设置为非阻塞模式,它总是运行在阻塞模式下. 打开FileChannel 在使用FileChannel之前,必须先打开它.但是,我们无法直接打开一个FileChannel,需要通过使用一个InputStream.OutputStream或RandomAccessFile来获取一个FileChannel实例.下面是通过RandomAccessFile打开

Java NIO系列教程(三) Buffer

原文链接:http://ifeve.com/buffers/ 声明:Java NIO系列教材并非本人原创,只因阅读原文之后有感于文章之精妙,意欲与诸位共享,故而出此下策,忘原作者见谅.另附上原文地址. Java NIO的通道类似流,但又有些不同: Java NIO中的Buffer用于和NIO通道进行交互.如你所知,数据是从通道读入缓冲区,从缓冲区写入到通道中的. 缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存.这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问

Java NIO系列教程(二) Channel

原文地址:http://ifeve.com/channels/ 声明:Java NIO系列教材并非本人原创,只因阅读原文之后有感于文章之精妙,意欲与诸位共享,故而出此下策,忘原作者见谅.另附上原文地址. Java NIO的通道类似流,但又有些不同: 既可以从通道中读取数据,又可以写数据到通道.但流的读写通常是单向的. 通道可以异步地读写. 通道中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入. 正如上面所说,从通道读取数据到缓冲区,从缓冲区写入数据到通道.如下图所示: C

Java NIO系列教程(十二) Java NIO与IO

当学习了Java NIO和IO的API后,一个问题马上涌入脑海: 我应该何时使用IO,何时使用NIO呢?在本文中,我会尽量清晰地解析Java NIO和IO的差异.它们的使用场景,以及它们如何影响您的代码设计. Java NIO和IO的主要区别 下表总结了Java NIO和IO之间的主要差别,我会更详细地描述表中每部分的差异. IO                NIO 面向流            面向缓冲 阻塞IO           非阻塞IO 无 选择器 面向流与面向缓冲 Java NIO

Java NIO 系列教程 &lt;转&gt;

Java NIO提供了与标准IO不同的IO工作方式: Channels and Buffers(通道和缓冲区):标准的IO基于字节流和字符流进行操作的,而NIO是基于通道(Channel)和缓冲区(Buffer)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中. Asynchronous IO(异步IO):Java NIO可以让你异步的使用IO,例如:当线程从通道读取数据到缓冲区时,线程还是可以进行其他事情.当数据被写入到缓冲区时,线程可以继续处理它.从缓冲区写入通道也类似. S

[转]Java NIO 系列教程

Java NIO提供了与标准IO不同的IO工作方式: Channels and Buffers(通道和缓冲区):标准的IO基于字节流和字符流进行操作的,而NIO是基于通道(Channel)和缓冲区(Buffer)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中. Asynchronous IO(异步IO):Java NIO可以让你异步的使用IO,例如:当线程从通道读取数据到缓冲区时,线程还是可以进行其他事情.当数据被写入到缓冲区时,线程可以继续处理它.从缓冲区写入通道也类似. S