NIO入门系列之第8章:连网和异步 I/O

8.1  概述

连网是学习异步 I/O 的很好基础,而异步 I/O 对于在 Java 语言中执行任何输入/输出过程的人来说,无疑都是必须具备的知识。NIO 中的连网与 NIO 中的其他任何操作没有什么不同——它依赖通道和缓冲区,而您通常使用InputStream 和 OutputStream来获得通道。

本节首先介绍异步 I/O 的基础—它是什么以及它不是什么,然后转向更实用的、程序性的例子。

8.2  异步 I/O

异步 I/O 是一种没有阻塞地读写数据的方法。通常,在代码进行 read() 调用时,代码会阻塞直至有可供读取的数据。同样, write() 调用将会阻塞直至数据能够写入。

另一方面,异步I/O调用不会阻塞。相反,您将注册对特定 I/O 事件的兴趣——可读的数据的到达、新的套接字连接,等等,而在发生这样的事件时,系统将会告诉您。

异步I/O的一个优势在于,它允许您同时根据大量的输入和输出执行 I/O。同步程序常常要求助于轮询,或者创建许许多多的线程以处理大量的连接。使用异步 I/O,您可以监听任何数量的通道上的事件,不用轮询,也不用额外的线程。

我们将通过研究一个名为MultiPortEcho.java 的例子程序来查看异步 I/O 的实际应用。这个程序就像传统的 echo server,它接受网络连接并向它们回响它们可能发送的数据。不过它有一个附加的特性,就是它能同时监听多个端口,并处理来自所有这些端口的连接。并且它只在单个线程中完成所有这些工作。

// MultiPortEcho
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;
public class MultiPortEcho
{
  private int ports[];
  private ByteBuffer echoBuffer = ByteBuffer.allocate( 1024 );
  public MultiPortEcho( int ports[] ) throws IOException {
    this.ports = ports;
    go();
  }
  private void go() throws IOException {
    // Create a new selector
    Selector selector = Selector.open();
    // Open a listener on each port, and register each one
    // with the selector
    for (int i=0; i<ports.length; ++i) {
      ServerSocketChannel ssc = ServerSocketChannel.open();
      ssc.configureBlocking( false );
      ServerSocket ss = ssc.socket();
      InetSocketAddress address = new InetSocketAddress( ports[i] );
      ss.bind( address );
      SelectionKey key = ssc.register( selector, SelectionKey.OP_ACCEPT );
      System.out.println( "Going to listen on "+ports[i] );
    }
    while (true) {
      int num = selector.select();
      Set selectedKeys = selector.selectedKeys();
      Iterator it = selectedKeys.iterator();
      while (it.hasNext()) {
        SelectionKey key = (SelectionKey)it.next();
        if ((key.readyOps() & SelectionKey.OP_ACCEPT)
          == SelectionKey.OP_ACCEPT) {
          // Accept the new connection
          ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
          SocketChannel sc = ssc.accept();
          sc.configureBlocking( false );
          // Add the new connection to the selector
          SelectionKey newKey = sc.register( selector, SelectionKey.OP_READ );
          it.remove();
          System.out.println( "Got connection from "+sc );
        } else if ((key.readyOps() & SelectionKey.OP_READ)
          == SelectionKey.OP_READ) {
          // Read the data
          SocketChannel sc = (SocketChannel)key.channel();
          // Echo data
          int bytesEchoed = 0;
          while (true) {
            echoBuffer.clear();
            int r = sc.read( echoBuffer );
            if (r<=0) {
              break;
            }
            echoBuffer.flip();
            sc.write( echoBuffer );
            bytesEchoed += r;
          }
          System.out.println( "Echoed "+bytesEchoed+" from "+sc );
          it.remove();
        }
      }
//System.out.println( "going to clear" );
//      selectedKeys.clear();
//System.out.println( "cleared" );
    }
  }
  static public void main( String args[] ) throws Exception {
    if (args.length<=0) {
      System.err.println( "Usage: java MultiPortEcho port [port port ...]" );
      System.exit( 1 );
    }
    int ports[] = new int[args.length];
    for (int i=0; i<args.length; ++i) {
      ports[i] = Integer.parseInt( args[i] );
    }
    new MultiPortEcho( ports );
  }
}

8.3  Selectors

本节的阐述对应于MultiPortEcho 的源代码中的 go() 方法的实现,因此应该看一下源代码,以便对所发生的事情有个更全面的了解。

异步 I/O 中的核心对象名为 Selector。Selector 就是您注册对各种 I/O 事件的兴趣的地方,而且当那些事件发生时,就是这个对象告诉您所发生的事件。

所以,我们需要做的第一件事就是创建一个 Selector:

Selector selector = Selector.open();

然后,我们将对不同的通道对象调用 register() 方法,以便注册我们对这些对象中发生的 I/O 事件的兴趣。register()的第一个参数总是这个Selector。

SelectionKey newKey = sc.register( selector, SelectionKey.OP_READ );

8.4  打开一个 ServerSocketChannel

为了接收连接,我们需要一个ServerSocketChannel。事实上,我们要监听的每一个端口都需要有一个 ServerSocketChannel 。对于每一个端口,我们打开一个 ServerSocketChannel,如下所示:

for (int i=0; i<ports.length; ++i) {
      ServerSocketChannel ssc = ServerSocketChannel.open();
      ssc.configureBlocking( false );
      ServerSocket ss = ssc.socket();
      InetSocketAddress address = new InetSocketAddress( ports[i] );
      ss.bind( address );
      SelectionKey key = ssc.register( selector, SelectionKey.OP_ACCEPT );
      System.out.println( "Going to listen on "+ports[i] );
}

第一行创建一个新的ServerSocketChannel ,最后三行将它绑定到给定的端口。第二行将 ServerSocketChannel 设置为非阻塞的。我们必须对每一个要使用的套接字通道调用这个方法,否则异步 I/O 就不能工作。

8.5  选择键

下一步是将新打开的ServerSocketChannels 注册到 Selector上。为此我们使用 ServerSocketChannel.register() 方法,如下所示:

SelectionKey key = ssc.register( selector, SelectionKey.OP_ACCEPT );

register() 的第一个参数总是这个 Selector。第二个参数是 OP_ACCEPT,这里它指定我们想要监听 accept 事件,也就是在新的连接建立时所发生的事件。这是适用于 ServerSocketChannel 的唯一事件类型。

请注意对register() 的调用的返回值。SelectionKey 代表这个通道在此Selector 上的这个注册。当某个Selector 通知您某个传入事件时,它是通过提供对应于该事件的 SelectionKey 来进行的。SelectionKey 还可以用于取消通道的注册。

8.6  内部循环

现在已经注册了我们对一些 I/O 事件的兴趣,下面将进入主循环。使用Selectors 的几乎每个程序都像下面这样使用内部循环:

int num = selector.select();
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();
while (it.hasNext()) {
     SelectionKey key = (SelectionKey)it.next();
     // ... deal with I/O event ...
}

首先,我们调用Selector 的 select() 方法。这个方法会阻塞,直到至少有一个已注册的事件发生。当一个或者更多的事件发生时, select() 方法将返回所发生的事件的数量。

接下来,我们调用Selector 的selectedKeys() 方法,它返回发生了事件的SelectionKey 对象的一个集合。

我们通过迭代SelectionKeys 并依次处理每个SelectionKey 来处理事件。对于每一个SelectionKey,您必须确定发生的是什么I/O 事件,以及这个事件影响哪些I/O 对象。

8.7  监听新连接

程序执行到这里,我们仅注册了ServerSocketChannel,并且仅注册它们“接收”事件。为确认这一点,我们对 SelectionKey 调用 readyOps() 方法,并检查发生了什么类型的事件:

if ((key.readyOps() & SelectionKey.OP_ACCEPT)== SelectionKey.OP_ACCEPT) {
     // Accept the new connection
     // ...
}

可以肯定地说,readOps() 方法告诉我们该事件是新的连接。

8.8  接受新的连接

因为我们知道这个服务器套接字上有一个传入连接在等待,所以可以安全地接受它;也就是说,不用担心 accept() 操作会阻塞:

ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
SocketChannel sc = ssc.accept();

下一步是将新连接的SocketChannel 配置为非阻塞的。而且由于接受这个连接的目的是为了读取来自套接字的数据,所以我们还必须将 SocketChannel 注册到 Selector上,如下所示:

sc.configureBlocking( false );
SelectionKey newKey = sc.register( selector, SelectionKey.OP_READ );

注意我们使用register() 的 OP_READ 参数,将 SocketChannel 注册用于读取而不是接受新连接。

8.9  删除处理过的 SelectionKey

在处理SelectionKey 之后,我们几乎可以返回主循环了。但是我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活的键出现,这会导致我们尝试再次处理它。我们调用迭代器的 remove() 方法来删除处理过的 SelectionKey:

it.remove();

现在我们可以返回主循环并接受从一个套接字中传入的数据(或者一个传入的 I/O 事件)了。

8.10  传入的 I/O

当来自一个套接字的数据到达时,它会触发一个 I/O 事件。这会导致在主循环中调用Selector.select(),并返回一个或者多个I/O 事件。这一次, SelectionKey 将被标记为 OP_READ 事件,如下所示:

} else if ((key.readyOps() & SelectionKey.OP_READ)
     == SelectionKey.OP_READ) {
     // Read the data
     SocketChannel sc = (SocketChannel)key.channel();
     // ...
}

与以前一样,我们取得发生I/O 事件的通道并处理它。在本例中,由于这是一个 echo server,我们只希望从套接字中读取数据并马上将它发送回去。关于这个过程的细节,请参见参考资料中的源代码 (MultiPortEcho.java)。

8.11  回到主循环

每次返回主循环,我们都要调用select 的 Selector()方法,并取得一组 SelectionKey。每个键代表一个 I/O 事件。我们处理事件,从选定的键集中删除 SelectionKey,然后返回主循环的顶部。

这个程序有点过于简单,因为它的目的只是展示异步 I/O 所涉及的技术。在现实的应用程序中,您需要通过将通道从 Selector 中删除来处理关闭的通道。而且您可能要使用多个线程。这个程序可以仅使用一个线程,因为它只是一个演示,但是在现实场景中,创建一个线程池来负责 I/O 事件处理中的耗时部分会更有意义。

NIO入门系列之第8章:连网和异步 I/O

时间: 2024-08-01 18:13:16

NIO入门系列之第8章:连网和异步 I/O的相关文章

NIO入门系列之第5章:关于缓冲区的更多内容

第5章 关于缓冲区的更多内容 5.1  概述 到目前为止,您已经学习了使用缓冲区进行日常工作所需要掌握的大部分内容.我们的例子没怎么超出标准的读/写过程种类,在原来的 I/O中可以像在 NIO 中一样容易地实现这样的标准读写过程. 本节将讨论使用缓冲区的一些更复杂的方面,比如缓冲区分配.包装和分片.我们还会讨论 NIO 带给 Java 平台的一些新功能.您将学到如何创建不同类型的缓冲区以达到不同的目的,如可保护数据不被修改的只读缓冲区,和直接映射到底层操作系统缓冲区的直接缓冲区.我们将在本节的最

NIO入门系列之第6章:分散和聚集

第6章 分散和聚集 6.1  概述 分散/聚集 I/O 是使用多个而不是单个缓冲区来保存数据的读写方法. 一个分散的读取就像一个常规通道读取,只不过它是将数据读到一个缓冲区数组中而不是读到单个缓冲区中.同样地,一个聚集写入是向缓冲区数组而不是向单个缓冲区写入数据. 分散/聚集 I/O 对于将数据流划分为单独的部分很有用,这有助于实现复杂的数据格式. 6.2  分散/聚集 I/O 通道可以有选择地实现两个新的接口: ScatteringByteChannel 和 GatheringByteChan

NIO入门系列之第7章:文件锁定

第7章 文件锁定 7.1  概述 文件锁定初看起来可能让人迷惑.它似乎指的是防止程序或者用户访问特定文件.事实上,文件锁就像常规的 Java 对象锁-它们是劝告式的(advisory)锁.它们不阻止任何形式的数据访问,相反,它们通过锁的共享和获取赖允许系统的不同部分相互协调. 您可以锁定整个文件或者文件的一部分.如果您获取一个排它锁,那么其他人就不能获得同一个文件或者文件的一部分上的锁.如果您获得一个共享锁,那么其他人可以获得同一个文件或者文件一部分上的共享锁,但是不能获得排它锁.文件锁定并不总

NIO入门系列之第4章:缓冲区内部细节

4.1  概述 本节将介绍 NIO 中两个重要的缓冲区组件:状态变量和访问方法 (accessor). 状态变量是前一节中提到的"内部统计机制"的关键.每一个读/写操作都会改变缓冲区的状态.通过记录和跟踪这些变化,缓冲区就可能够内部地管理自己的资源. 在从通道读取数据时,数据被放入到缓冲区.在有些情况下,可以将这个缓冲区直接写入另一个通道,但是在一般情况下,您还需要查看数据.这时使用访问方法 get() 来完成的.同样,如果要将原始数据放入缓冲区中,就要使用访问方法 put(). 在本

NIO入门系列之第3章:从理论到实践:NIO 中的读和写

3.1  概述 读和写是 I/O 的基本过程.从一个通道中读取很简单:只需创建一个缓冲区,然后让通道将数据读到这个缓冲区中.写入也相当简单:创建一个缓冲区,用数据填充它,然后让通道用这些数据来执行写入操作. 在本节中,我们将学习有关在Java 程序中读取和写入数据的一些知识.我们将回顾 NIO 的主要组件(缓冲区.通道和一些相关的方法),看看它们是如何交互以进行读写的.在接下来的几节中,我们将更详细地分析这其中的每个组件以及其交互. 3.2  从文件中读取 在我们第一个练习中,我们将从一个文件中

NIO入门系列之第9章:字符集

9.1  概述 根据 Sun 的文档,一个 Charset 是"十六位 Unicode 字符序列与字节序列之间的一个命名的映射".实际上,一个 Charset 允许您以尽可能最具可移植性的方式读写字符序列. Java 语言被定义为基于 Unicode.然而在实际上,许多人编写代码时都假设一个字符在磁盘上或者在网络流中用一个字节表示.这种假设在许多情况下成立,但是并不是在所有情况下都成立,而且随着计算机变得对 Unicode 越来越友好,这个假设就日益变得不能成立了. 在本节中,我们将看

NIO入门系列之第一章:输入/输出:概念性描述

第1章 输入/输出:概念性描述 1.1  I/O 简介 I/O 或者输入/输出指的是计算机与外部世界或者一个程序与计算机的其余部分的之间的接口.它对于任何计算机系统都非常关键,因而所有 I/O 的主体实际上是内置在操作系统中的.单独的程序一般是让系统为它们完成大部分的工作. 在 Java 编程中,直到最近一直使用流的方式完成 I/O.所有 I/O 都被视为单个的字节的移动,通过一个称为 Stream 的对象一次移动一个字节.流 I/O 用于与外部世界接触.它也在内部使用,用于将对象转换为字节,然

NIO入门系列之第二章:通道和缓冲区

第2章 通道和缓冲区 2.1  概述 通道和缓冲区是 NIO 中的核心对象,几乎在每一个I/O 操作中都要使用它们. 通道是对原 I/O 包中的流的模拟.到任何目的地(或来自任何地方)的所有数据都必须通过一个 Channel 对象.一个 Buffer 实质上是一个容器对象.发送给一个通道的所有对象都必须首先放到缓冲区中:同样地,从通道中读取的任何数据都要读到缓冲区中. 2.2  什么是缓冲区? Buffer 是一个对象,它包含一些要写入或者刚读出的数据.在 NIO 中加入 Buffer 对象,体

Jenkins入门系列之——02第二章 Jenkins安装与配置

2014-12-08:已不再担任SCM和CI的职位,Jenkins的文章如无必要不会再维护. 写的我想吐血,累死了. 网页看着不爽的,自己去下载PDF.有问题请留言! Jenkins入门系列之--03PDF文档下载 第二章 Jenkins安装与配置 2 Jenkins安装 在最简单的情况下,Jenkins 只需要两个步骤: 1.下载最新的版本(一个 WAR 文件).Jenkins官方网址: http://Jenkins-ci.org/ 2.运行 java -jar jenkins.war 注意: