第11章非阻塞I/O

第11章非阻塞I/O

一个实例客户端

在实现新I/O的客户端时,调用静态工厂方法SocketChannel.open()来创建一个新的java.nio.channels.SocketChannel对象。这个方法的参数是一个java.net.SocketAddress对象,指示要连接的主机和端口。

例如:下面的代码段连接指向rama.poly.edu端口19的通道:

SocketAddress rama  = new InetSocketAddress("rama.poly.edu“,19);

SocketChannel client = SocketChannel.open(rama);

利用通道,可以直接写入通道本身,而不是写入ByteBuffer对象。

ByteBuffer buffer = ByteBuffer.allocate(74);

将这个ByteBuffer对象传递给通道的read()方法。通道会用从Socket读取的数据填充这个缓冲区。它返回成功读取并存储在缓存区的字节数:

int bytesRead = client.read(buffer);

这会至少读取一个字节,或者返回-1指示数据结束。

示例11-1:一个基于通道的chargen客户端

<span style="font-size:18px;">import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import java.io.IOException;

public class ChargenClient {

  public static int DEFAULT_PORT = 19;

  public static void main(String[] args) {

    if (args.length == 0) {
      System.out.println("Usage: java ChargenClient host [port]");
      return;
    }  

    int port;
    try {
      port = Integer.parseInt(args[1]);
    } catch (RuntimeException ex) {
      port = DEFAULT_PORT;
    }

    try {
      SocketAddress address = new InetSocketAddress(args[0], port);
      SocketChannel client = SocketChannel.open(address);

      ByteBuffer buffer = ByteBuffer.allocate(74);
      WritableByteChannel out = Channels.newChannel(System.out);

      while (client.read(buffer) != -1) {
        buffer.flip();
        out.write(buffer);
        buffer.clear();
      }
    } catch (IOException ex) {
      ex.printStackTrace();
    }
  }
}</span>

可以在阻塞或非阻塞模式下允许这个连接,在非阻塞模式下,即使没有任何可用的数据,read()也会立即返回。这就允许程序在试图读取前做其他操作。它不必等待慢速的网络连接。要改变阻塞模式,可以向configureBlocking()方法传入true(阻塞)或false(不阻塞)。

client.configureBlocking(false);

在非阻塞模式下,read()可能因为读不到任何数据而返回0。因此循环需要有些差别:

while(true){

//把每次循环都要允许的代码都放在这里,无论有没有读到数据

int n = client.read(buffer);

if(n > 0) {

buffer.flip();

out.write(buffer);

buffer.clear();

}else if( n == -1) {

//这不应当发生,除非服务器发送故障

break;

}

}

一个实例服务器

示例11-2:一个非阻塞的chargen服务器

<span style="font-size:18px;">import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import java.util.*;
import java.io.IOException;

public class ChargenServer {

  public static int DEFAULT_PORT = 19;

  public static void main(String[] args) {

    int port;
    try {
      port = Integer.parseInt(args[0]);
    } catch (RuntimeException ex) {
      port = DEFAULT_PORT;
    }
    System.out.println("Listening for connections on port " + port);

    byte[] rotation = new byte[95*2];
    for (byte i = ' '; i <= '~'; i++) {
      rotation[i -' '] = i;
      rotation[i + 95 - ' '] = i;
    }

    ServerSocketChannel serverChannel;
    Selector selector;
    try {
      serverChannel = ServerSocketChannel.open();
      ServerSocket ss = serverChannel.socket();
      InetSocketAddress address = new InetSocketAddress(port);
      ss.bind(address);
      serverChannel.configureBlocking(false);
      selector = Selector.open();
      serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    } catch (IOException ex) {
      ex.printStackTrace();
      return;
    }

    while (true) {
      try {
        selector.select();
      } catch (IOException ex) {
        ex.printStackTrace();
        break;
      }

      Set<SelectionKey> readyKeys = selector.selectedKeys();
      Iterator<SelectionKey> iterator = readyKeys.iterator();
      while (iterator.hasNext()) {

        SelectionKey key = iterator.next();
        iterator.remove();
        try {
          if (key.isAcceptable()) {
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
            SocketChannel client = server.accept();
            System.out.println("Accepted connection from " + client);
            client.configureBlocking(false);
            SelectionKey key2 = client.register(selector, SelectionKey.
                                                                    OP_WRITE);
            ByteBuffer buffer = ByteBuffer.allocate(74);
            buffer.put(rotation, 0, 72);
            buffer.put((byte) '\r');
            buffer.put((byte) '\n');
            buffer.flip();
            key2.attach(buffer);
          } else if (key.isWritable()) {
            SocketChannel client = (SocketChannel) key.channel();
            ByteBuffer buffer = (ByteBuffer) key.attachment();
            if (!buffer.hasRemaining()) {
              // Refill the buffer with the next line
              buffer.rewind();
              // Get the old first character
              int first = buffer.get();
              // Get ready to change the data in the buffer
              buffer.rewind();
              // Find the new first characters position in rotation
              int position = first - ' ' + 1;
              // copy the data from rotation into the buffer
              buffer.put(rotation, position, 72);
              // Store a line break at the end of the buffer
              buffer.put((byte) '\r');
              buffer.put((byte) '\n');
              // Prepare the buffer for writing
              buffer.flip();
            }
            client.write(buffer);
          }
        } catch (IOException ex) {
          key.cancel();
          try {
            key.channel().close();
          }
          catch (IOException cex) {}
        }
      }
    }
  }
}</span>

缓冲区

流和通道之间的关键区别在于流是基于字节的,而通道是基于块的。

无论缓冲区是何种类型,都有相同的方法来获取和设置缓冲区4个关键部分地信息。

位置

缓存区中将读取或写入的下一个位置。

public final int position()

public final Buffer position(int newPosition)

容量

缓冲区可以保存的元素的最大数目。容量值在创建缓冲区时设置,此后不能改变。

public final int capacity()

限度

缓冲区中可访问数据的末尾位置。只要不改变限度,就无法读/写超过这个位置的数据,即使缓冲区有更大的容量也没有用。

public final int limit()

public final Buffer limit(int newLimit)

标记

缓冲区中客户端指定的索引。通过调用mark()可以将标记设置为当前位置。调用reset()可以将当前位置设置为所标记的位置。

公共的Buffer超类提供了另外几个方法:

public final Buffer clear()

clear()方法将位置设置为0,并将限度设置为容量,从而将缓冲区“清空”。

public final Buffer rewind()

将位置设置为0,但不改变限度,还允许重新读取缓冲区

public final Buffer flip()

将限度设置为当前位置,位置设置为0,希望排空刚刚填充的缓冲区时可以调用这个方法。

public final int remaining()

返回缓冲区中当前位置与限度之间的元素数。

public final boolean hasRemaining()_

如果剩余元素大于0,hasRemaining()方法返回true

创建缓冲区

空的缓冲区一般由分配(allocate)方法创建。预填充数据的缓冲区由包装(wrap)方法创建。分配方法通常用于输入、而包装方法一般用于输出。

分配

基本的allocate()方法只返回一个有指定固定容量的新缓冲区,这是一个空缓冲区。

ByteBuffer buffer1  = ByteBuffer.allocate(100);

IntBuffer buffer2 = IntBuffer.allocate(100);

直接分配

ByteBuffer类有另外一个allocateDirect()方法,这个方法不为缓冲区创建后备数组。

包装

如果已经有了要输出的数据数组,一般要用缓冲区进行包装,而不是分配一个新缓冲区。

byte[] data = "Some data".getBytes("UTF-8");

ByteBuffer buffer1 = ByteBuffer.wrap(data);

填充和排空

批量方法

不同的缓冲区类都有一些批量方法来填充和排空相应元素的数组。

ByteBuffer有put()和get()方法,可以用现有的字节数组或排空相应元素类型的数组。

例如:ByteBuffer有put()和get()方法,可以用现有的字节数组或子数组填充和排空一个ByteBuffer:

public ByteBuffer get(byte[] dst,int offset,int length)

public ByteBuffer get(byte[] dst)

public ByteBuffer put(byte[] array,int offset,int length)

public ByteBuffer put(byte[] array)

数据转换

Java中的所有数据最终都解析为字节。所有基本数据类型——int、double、float等都可以写为字节。

任何适当长度的字节序列都可解释为基本类型数据。

例如:任何4字节的序列都可以对应于一个int或float(实际上两者皆可,取决于你希望如何读取)。

8字节的序列对应于一个long或double。

视图缓冲区

如果从SocketChannel读取的ByteBuffer只包含一种特定基本数据类型的元素,那么就有必要创建一个视图缓冲区。

压缩缓冲区

大多数可写缓冲区都支持compact()方法。

压缩时将缓冲区中所有剩余的数据移到缓冲区的开头,为元素释放更多空间。这些位置上的任何数据都将被覆盖。缓冲区的位置设置为数据末尾,从而可以写入更多数据。

复制缓冲区

建立缓冲区的副本,从而将相同的信息分发到两个或多个通道。6种特定类型缓冲区都提供了duplicate()方法来完成这项工作。

分片缓冲区

分片(slicing)缓冲区是复制的一个变形。分片也会创建一个新缓冲区,与原缓冲区共享数据。不过,分片的起始位置(位置0)是原缓冲区的当前位置,而且其容量最大不超过源缓冲区的限度。也就是说,分片是原缓冲区的一个子序列,值包含从当前位置到限度的所有元素。

标记和重置

通道

通道将缓冲区的数据块移入或移出到各种I/O源。

SocketChannel

SocketChannel类可以读写TCP Socket。数据必须编码到ByteBuffer对象中来完成读/写。每个SocketChannel都与一个对等端Socket对象相关联。

连接

两个静态open()方法来创建新的SocketChannel对象:

public static SocketChannel open(SocketAddress remote) throws IOException

这个方法会建立连接,将阻塞(也就是说,在连接建立或抛出异常之前,这个方法不回返回)。

public static SocketChannel open() throws IOException

无参数版本不立即连接。它创建一个初始未连接的socket,以后必须用connect()方法进行连接。

读取

为了读取SocketChannel,首先要创建一个ByteBuffer,通道可以在其中存储数据。然后将这个ByteBuffer传给read()方法:

public abstract int read(ByteBuffer dst) throws IOException

通道会用尽可能多的数据填充缓冲区,然后返回放入的字节数。

写入

Socket通道提供了读写方法。要想写入,只需填充一个ByteBuffer,将其回绕,然后传给某个写入方法,这个方法再把数据复制到输出时将缓冲区排空,这与读取过程正好相反。

基本的write()方法接收一个缓冲区作为参数:

public abstract int write(ByteBuffer src) throws IOException

关闭

public void close() throws IOException

public boolean isOpen()

ServerSocketChannel

ServerSocketChannel类只有一个目的:接受入站连接。

创建服务器Socket通道

try{

ServerSocketChannel server = ServerSocketChannel.open();

ServerSocket socket = server.socket();

SocketAddress address = new InetSokcetAddress(80);

socket.bind(address);

} catch(IOException ex) {

System.err.println("..");

}

Java 7中处理:

try{

ServerSocketChannel server = ServerSocketChannel.open();

SocketAddress address  = new InetSocketAddress(80);

server.bind(address);

} catch (IOException ex){

System.err.pringln("...");

}

接受连接

public abstract SocketChannel accept() throws IOException

accept()方法就可以监听入站连接了。可以在阻塞或非阻塞模式下操作。

阻塞模式

accept()方法等待入站连接。然后他接受一个连接,并返回连接到远程客户端的一个SocketChannel对象。在建立连接之前,线程无法进行任何操作。这种策略适用于立即响应每一个请求的简单服务器。阻塞模式是默认模式。

非阻塞模式

如果没有入站连接,accept()方法会返回null。非阻塞模式更适合于需要为每个连接完成大量工作的服务器,这样就可以并行处理多个请求。非阻塞模式一般与Selector结合使用,为了是ServerSocketChannel处于非阻塞模式,要向其configureBlocking()方法传入false。

Channels类

可以将传统的基于I/O的流、阅读器和书写器包装在通道中,也可以从通道转化为基于I/O的流、阅读器和书写器。

异步通道(Java7)

Java 7 引入了AsynchronousSocketChannel和AsynchronousServerSocketChannel类。

读/写异步通道会立即返回,甚至在I/O完成之前就会返回。所读/写的数据会由一个Future或CompletionHandler进一步处理。connect()和accept()方法也会异步执行,并返回Future。这里不使用选择器。

例子:

SocketAddress address = new InetSocketAddress(args[0],port);

AsynchronousSocketChannel client = AsynchronousSocketChannel.open();

Future<void> connected = client.connect(address);

ByteBuffer buffer = ByteBuffer.allocate(74);

//等待连接

connected.get();

//从连接读取

Future<Integer> future = client.read(buffer);

//做其他工作。。。

//等待读取完成。。。

future.get();

//回绕并排空缓冲区

buffer.flip();

WritableByteChannel out = Channels.newChannel(System.out);

out.write(buffer);

网络连接在并行运行,与此同时程序可以做其他事情。准备好处理来自网络的数据时,会停下来,通过调用Future.get()等待这些数据,但在此之前不用停下来。

如果不关心获取顺序,可以生成大量AsynchronousSocketChannel请求,并为每个请求提供一个CompletionHandler,由它在后端存储结果。

通过CompletionHandler接口声明了两个方法:completed()和failed(),如果成功读取则调用completed(),另外出现I/O错误时会调用failed()。

socket选项(Java7)

从Java 7 开始,SocketChannel,ServerSocketChannel,AsynchronousServerSocketChannel,AsynchronousSocketChannel和DatagramChannel都实现了新的NetworkChannel接口。这个接口的主要用途是支持各种TCP选项。

通道类分别有3个方法来获取、设置和列出所支持的选项:

<T> T getOption(SocketOption<T> name) throws IOException

<T> NetworkChannel setOption(SocketOption<T> name,T value) throws IOException

Set<SocketOption<?>>  supportedOptions()

StandardSocketOptions类为Java能识别的11个选项提供了相应的常量。

例如:

NetworkChannel channel = SocketChannel.open();

channel.setOption(StandardSocketOptions.SO_LINGER,240);

就绪选择

就绪选择,即能够选择读写时不阻塞的Socket。

为了完成就绪选择,要将不同的通道注册到一个Selector对象。每个通道分配有一个SelectionKey。然后程序可以询问这个Selector对象,那些通道已经准备就绪可以无阻塞地完成你希望完成的操作,可以请求Selector对象返回相应的键集合。

Selector类

创建新的选择器:

public static Selector open() throws IOException

向选择器增加通道。register()方法在SelectableChannel类中声明。通过将选择器传递给通道的一个注册方法,就可以向选择器注册这个通道:

public final SelectionKey register(Selector sel,int ops) throws ClosedChannelException

public final SelectionKey reigster(Selector sel,int ops, Object att) throws ClosedChannelException

第一个参数是通道要向哪个选择器注册。

第二个参数是SelectionKey类中的一个命名常量,标识通道所注册的操作。

第三个参数是可选的,这是键的附件。

有三个方法可以选择就绪的通道。它们的区别在于寻找就绪通道等待的时间。

public  abstract int selectNow() throws IOException

完成非阻塞选择。如果当前没有准备好要处理的连接,它会立即返回。

public abstract int select() throws IOException

阻塞。在返回前等待,直到至少有一个注册的通道准备好可以进行处理。

public abstract int select(long timeout) throws IOException

阻塞,在返回0前只等待不超过timeout毫秒。

当有通道已经准备好处理时,可以使用selectedKeys()方法获取就绪通道:

public abstract Set<SelectionKey> selectedKeys()

迭代处理返回的集合时,要依次处理各个SelectionKey.

当准备关闭服务器或不再需要选择器时,应当将它关闭:

public abstract void close() throws IOException

释放与选择器关联的所有资源。

SelectionKey类

SelectionKey对象相当于通道的指针。

用channel()方法来获取这个通道:

public abstract SelectableChannel channel()

如果结束使用连接,就要撤销其SelectionKey对象的注册,这样选择器就不会浪费资源再去查询它是否准备就绪。调用这个键的cancel()方法来撤销注册:

public abstract void cancel()

时间: 2024-10-16 19:22:11

第11章非阻塞I/O的相关文章

UNP学习笔记(第十六章 非阻塞I/O)

套接字的默认状态时阻塞的 可能阻塞的套接字调用可分为以下4类: 1.输入操作,包括read.readv.recv.recvfrom和recvmsg. 2.输入操作,包括write.writev.send.sendto和sendmsg. 3.接受外来连接,即accept函数. 4.发起外出连接,即用于TCP的connect函数(该函数一直要等到客户收到对于自己的SYN的ACK为止才返回) 非阻塞connect 当在一个非阻塞的TCP套接字上调用connect时,connect将立即返回一个EINP

Android新手入门2016(11)--非阻塞对话框AlertDialog

写了这么久,看了这么多控件,好像都是静态的,一点交互都没有.这次要弄点弹框,活跃活跃. 这次继续用上一章的代码往下面写吧. 先看看图 还是前一章的九宫图,我把对话框绑定在第一个图标. 点击一下,可以看到如下: 再来看看代码吧 package com.fable.helloworld; import android.app.Activity; import android.app.AlertDialog; import android.content.Context; import android

JavaNIO非阻塞模式

package com.java.NIO; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.c

java nio学习三:NIO 的非阻塞式网络通信

一.阻塞和非阻塞 传统的 IO 流都是阻塞式的.也就是说,当一个线程调用 read() 或 write()时,该线程被阻塞,直到有一些数据被读取或写入,该线程在此期间不能执行其他任务.因此,在完成网络通信进行 IO 操作时,由于线程会阻塞,所以服务器端必须为每个客户端都提供一个独立的线程进行处理,当服务器端需要处理大量客户端时,性能急剧下降.Java NIO 是非阻塞模式的.当线程从某通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务.线程通常将非阻塞 IO 的空闲时间用于在其他通道上

《Java并发编程实战》第十五章 原子变量与非阻塞同步机制 读书笔记

一.锁的劣势 锁定后如果未释放,再次请求锁时会造成阻塞,多线程调度通常遇到阻塞会进行上下文切换,造成更多的开销. 在挂起与恢复线程等过程中存在着很大的开销,并且通常存在着较长时间的中断. 锁可能导致优先级反转,即使较高优先级的线程可以抢先执行,但仍然需要等待锁被释放,从而导致它的优先级会降至低优先级线程的级别. 二.硬件对并发的支持 处理器填写了一些特殊指令,例如:比较并交换.关联加载/条件存储. 1 比较并交换 CAS的含义是:"我认为V的值应该为A,如果是,那么将V的值更新为B,否则不需要修

Java并发编程实战 第15章 原子变量和非阻塞同步机制

非阻塞的同步机制 简单的说,那就是又要实现同步,又不使用锁. 与基于锁的方案相比,非阻塞算法的实现要麻烦的多,但是它的可伸缩性和活跃性上拥有巨大的优势. 实现非阻塞算法的常见方法就是使用volatile语义和原子变量. 硬件对并发的支持 原子变量的产生主要是处理器的支持,最重要的是大多数处理器架构都支持的CAS(比较并交换)指令. 模拟实现AtomicInteger的++操作 首先我们模拟处理器的CAS语法,之所以说模拟,是因为CAS在处理器中是原子操作直接支持的.不需要加锁. public s

【Networkk】一篇文章完全搞清楚 scoket read/write 返回码、阻塞与非阻塞、异常处理 等让你头疼已久的问题

浅谈TCP/IP网络编程中socket的行为 我认为,想要熟练掌握Linux下的TCP/IP网络编程,至少有三个层面的知识需要熟悉: 1. TCP/IP协议(如连接的建立和终止.重传和确认.滑动窗口和拥塞控制等等) 2. Socket I/O系统调用(重点如read/write),这是TCP/IP协议在应用层表现出来的行为. 3. 编写Performant, Scalable的服务器程序.包括多线程.IO Multiplexing.非阻塞.异步等各种技术. 关于TCP/IP协议,建议参考Rich

java并发编程11.原子变量与非阻塞同步机制

在非阻塞算法中不存在死锁和其他活跃性问题. 在基于锁的算法中,如果一个线程在休眠或自旋的同时持有一个锁,那么其他线程都无法执行下去,而非阻塞算法不会受到单个线程失败的影响. 锁的劣势 许多JVM都对非竞争锁获取和释放操作进行了极大的优化,但如果有多个线程同时请求锁,那么JVM就需要借助操作系统地功能.如果出现了这种情况,那么一些线程将被挂起并且在稍后恢复运行.当线程恢复执行时,必须等待其他线程执行完它们的时间片以后,才能被调度执行.在挂起和恢复线程等过程中存在着很大的开销,并且通常存在着较大时间

11.python并发入门(part14阻塞I/O与非阻塞I/O,以及引入I/O多路复用)

一.初步了解什么是I/O模型. 1.回顾,用户态与内核态. 操作系统位于应用程序和硬件之间,本质上是一个软件,它由内核以及系统调用组成. 内核:用于运行于内核态,主要作用是管理硬件资源. 系统调用:运行与用户态,为应用程序提供系统调用的接口. 操作系统的核心,就是内核,内核具有访问底层硬件设备的权限,为了保证用户无法直接对内核进行操作,并且保证内核的安全,所以就划分了用户空间和内核空间. 2.回顾进程切换. 如果说要实现进程之间的切换,那么进程需要有能力挂起,有能力回复,这样才叫切换. 进程与进