Java IO模型&NIO

Java IO模型&NIO

    • Java IO模型NIO

      • 楔子
      • 概述
      • 网络服务
      • 经典的服务设计
        • 经典的SocketServer循环阻塞
      • 可伸缩目标
      • 分而治之
      • 事件驱动设计
        • 背景知识AWT 事件
      • Reactor 模式
        • Reactor基础模式
        • Java NIO 支持
        • Channels
        • Buffers
        • Selectors
        • SelectionKeys
      • Reactor 模式实践
        • 第一步初始化
        • 第二步循环分发
        • 第三步接收者
        • 第四步 Handler设置
        • 第五步请求处理
        • 还有一种状态Handler
      • 多线程版本Reactor模型设计
        • 线程池Handler
        • 任务协作
          • 直接传递Handsoff
          • 回调Callback
          • 队列Queue
          • 异步结果Future
        • 使用线程池PooledExecutor
        • 多Reactor线程池模型
        • 使用NIO特性
      • NIO API 参考

楔子

本文大部分内容翻译自Doug Lea 先生的幻灯片[http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf]. 在此基础上,融入了一些自己的文字。欢迎浏览Doug Lea的个人主页[http://gee.cs.oswego.edu/dl/], 相关内容请尊重原作者。

概述

本文从主要讨论一下几个方面的内容:

- 可扩展的网络服务

- 事件驱动处理

- Reactor(反应堆)模式

基础版

多线程版

其他变体版

- Java NIO API 参考

网络服务

网络服务一般都涉及到Web服务,分布式对象等,但大多数服务端都有相同的基础步骤,如解析读请求(Read Request),请求解码,请求处理,响应编码,发送响应数据。但实际上,在具体的每个步骤上面所带来的性能消耗却每每不同,例如XML解析,文件传输,Web页面生成及渲染,或者其他计数服务这些不同的处理往往会有不同的性能消耗。

经典的服务设计

每个Handler都可以在各自的线程里执行所有的操作。

经典的SocketServer循环阻塞

public class Server implements Runnable {

    public void run() {
        try {
            ServerSocket ss = new ServerSocket(PORT);
            while (!Thread.interrupted())
                new Thread(new Handler(ss.accept())).start();
                // 或单线程,或线程池
        } catch (IOException ex) { /* ... */ }
    }
    static class Handler implements Runnable {
        final Socket socket;
        Handler(Socket s) { socket = s; }
        public void run() {
            try {
                byte[] input = new byte[MAX_INPUT];
                socket.getInputStream().read(input);
                byte[] output = process(input);
                socket.getOutputStream().write(output);
            } catch (IOException ex) { /* ... */ }
        }
        private byte[] process(byte[] cmd) { /* ... */ }
    }

}
注:示例代码中,大多数异常处理都已省略

如你可见,这样的模式虽简单清晰,但却缺乏弹性伸缩的能力,当客户端并发量增加后,服务端的线程数量会不断增加,最终会耗尽系统资源。即使是中间采用线程池,仍然不能承受大量的并发请求。所以,我们需要新的方案,需要新的模式。但,当务之急是先明确一个目标。

可伸缩目标

  • 在面临持续增长的负载(客户请求)压力时,可优雅的降级
  • 可通过增加物理资源(CPU,内存,磁盘,带宽)来持续改进
  • 仍需满足可用性和性能目标:潜在的短期内目标,高峰期需求指标,服务质量可调

面对上述的目标,分而治之通常是实现可伸缩,可扩展目标的最好的方式。

分而治之

分而治之就是把处理过程折分成明确,职责单一的任务,每个任务采用非阻塞的方式来执行一个操作。 当任务状态是启用时,才开始执行。例如,一个IO事件通常是作为一种触发条件,感知到该事件才开始作相应的read -> decode -> compute -> encode -> send.

这种思想是Java NIO 提供的一种基础的机制。NIO 支持一种非阻塞的读写方式,还有通过感知到的IO事件来分发给相关的任务。

事件驱动设计

这种模式通常比其他的方式更有效,因为它用到的资源会偏少,不用为每个Client分配单独的线程。除此之外,所需开销会减少,因为减少了线程上下文切换,也减少了锁竞争的场景。但调度相应的请求会偏慢,因为需要手动的为事件绑定具体的执行操作。

好事多磨砺。这种模式虽然好处多多,但对于编写程序却不是一件容易的事。因为需要处处考虑把代码写成非阻塞的行为,还需记录并跟踪服务的状态

背景知识:AWT 事件

IO事件驱动模式采用了类似的思想,但使用了不同的设计来实现。

Reactor 模式

Reactor对IO事件的相应是转发到适配的处理器(Handler)上,这类似于AWT的线程。由Handler来执行一些非阻塞的行为。Reactor模式需要绑定Handler到具体的事件上,关于这种模式的详解可以去看这本书:《面向模式的软件架构:卷2》- Pattern-Oriented Software Architecture, Volume 2 (POSA2)。

Reactor基础模式

此外单线程版本,由acceptor接收请求并转发到线程中,由该线程来完成请求的处理。

Java NIO 支持

Channels

Channel是一种通道,可以连接到文件,套接字(Socket),用于支持非阻塞的读功能。

Buffers

Buffer是一种类似于数组的对象,可以被Channel直接读写。

Selectors

Selecttor是一种选择器,或叫多路复用选择器。用于在多个Channel直接感知IO事件,并作相应的隔离区分,然后负责把事件封装成对应的SelectionKey。

SelectionKeys

SelectionKey会维护事件的状态,并与事件绑定,通过SelectionKey来完成IO的读写。

Reactor 模式实践

第一步:初始化

public class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocket;

    public Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(
                new InetSocketAddress(port));
        serverSocket.configureBlocking(false);
        SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        sk.attach(new Acceptor());

        /*
            可选方式:使用明确的SPI提供者方式
            SelectorProvider provider = SelectorProvider.provider();
            Selector s  = provider.openSelector();
            ServerSocketChannel channel = provider.openServerSocketChannel();
        */
    }
   }

第二步:循环分发

    public void run() { // 一般在新线程中作React
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext())
                    dispatch((SelectionKey) (it.next());
                selected.clear();
            }
        } catch (Exception e) {

        }
    }
    void dispatch(SelectionKey k) {
        Runnable r = (Runnable)(k.attachment());
        if (r != null)
            r.run();
    }

第三步:接收者

class Acceptor implements Runnable { // inner
        public void run() {
            try {
                SocketChannel c = serverSocket.accept();
                if (c != null)
                    new Handler(selector, c);
            }catch(IOException ex) { /* ... */ }
        }
 }

如图示:

第四步: Handler设置

final class Handler implements Runnable {
        private static final int MAXIN = 1024;
        private static final int MAXOUT = 1024;
        final SocketChannel socket;
        final SelectionKey sk;
        ByteBuffer input = ByteBuffer.allocate(MAXIN);
        ByteBuffer output = ByteBuffer.allocate(MAXOUT);
        static final int READING = 0, SENDING = 1;
        int state = READING;

        Handler(Selector sel, SocketChannel c) throws IOException {
            socket = c;
            c.configureBlocking(false);
            // Optionally try first read now
            sk = socket.register(sel, 0);
            sk.attach(this);
            sk.interestOps(SelectionKey.OP_READ);
            sel.wakeup();
        }

        boolean inputIsComplete() { /* ... */ }

        boolean outputIsComplete() { /* ... */ }

        void process() { /* ... */ }
  }

第五步:请求处理

 public void run() {
            try {
                if (state == READING) read();
                else if (state == SENDING) send();
            } catch (IOException ex) { /* ... */ }
        }

        void read() throws IOException {
            socket.read(input);
            if (inputIsComplete()) {
                process();
                state = SENDING;
                // Normally also do first write now
                sk.interestOps(SelectionKey.OP_WRITE);
            }
        }

        void send() throws IOException {
            socket.write(output);
            if (outputIsComplete()) sk.cancel();
        }

还有一种:状态Handler

就是借用GoF的状态对象模式,为每种状态都附带一个Handler,甚至根据状态重新绑定新的Handler。

    class Handler {
        public void run() { // initial state is reader
            socket.read(input);
            if (inputIsComplete()) {
                process();
                sk.attach(new Sender());
                sk.interest(SelectionKey.OP_WRITE);
                sk.selector().wakeup();
            }
        }
        class Sender implements Runnable {
            public void run(){ // ...
                socket.write(output);
                if (outputIsComplete()) sk.cancel();
            }
        }
    }

多线程版本Reactor模型设计

首先,为了可扩展,可策略性的新增线程来适应多处理器的计算机。为了复用线程,需要工作者线程池。这种模式Reactor必须响应快速,在第一时间触发相应的Handler。还允许划分非IO的处理给其他线程执行。

除此之外,还可再扩展。当单一的Reactor线程池模式达到饱和时,还能扩展成多个Reactor,用以均衡负载CPU、IO速率。

使用线程池可以有助于调整和控制线程数量,一般数量会少于客户连接数。下图是基于线程池的模型:

线程池Handler

    class Handler implements Runnable {

        // uses util.concurrent thread pool
        static Executor pool = new ThreadPoolExecutor()
        static final int PROCESSING = 3;

        synchronized void read() {
            socket.read(input);
            if (inputIsComplete()) {
                state = PROCESSING;
                pool.execute(new Processer());
            }
        }
        synchronized void processAndHandOff() {
            process();
            state = SENDING; // or rebind attachment
            sk.interest(SelectionKey.OP_WRITE);
        }
        class Processer implements Runnable {
            public void run() { processAndHandOff(); }
        }

        public void run() {}
    }

任务协作

直接传递(Handsoff)

投递任务到线程池里后,然后执行该任务,完成后,又调用下一个任务。这样虽然快速,但却很生硬,不自然。

回调(Callback)

回调Handler的分发器(Dispatcher),然后执行状态变更,重新绑定Handler。这种类似GoF中的中介者模式。

队列(Queue)

任务直接投递到队列中

异步结果(Future)

异步执行任务,利用Wait/Notify机制来获取执行结果。

使用线程池PooledExecutor

首先这必须是一个可调整的工作者线程池,由方法签名为execute(Runnable r), 需要控制的参数有:

任务队列的类型、最大线程数量、最小线程数量、预热与按需分配线程数量,线程Keep-Alive时间,饱和策略(阻塞,丢弃,生产者执行等)。

多Reactor线程池模型

这种模型主要用于静态或动态的协调控制CPU、IO速率,由一个主接收者分发Selector到其他的Reactor。

Selector[] selectors; // also create threads
    int next = 0;
    class Acceptor { // ...
        public synchronized void run() { ...
            Socket connection = serverSocket.accept();
            if (connection != null)
                new Handler(selectors[next], connection);
            if (++next == selectors.length) next = 0;
        }
}

模型图如示:

使用NIO特性

NIO支持每个Reactor使用多个Selector,用以绑定不同的Handler到不同的事件中,但需要做好同步协作。还支持自动的文件到网络或网络到文件的复制传输,内存文件映射,直接内存分配。

NIO API 参考

Buffer

ByteBuffer (CharBuffer, LongBuffer, etc not shown.)

Channel

SelectableChannel

SocketChannel

ServerSocketChannel

FileChannel

Selector

SelectionKey

下面对部分API作简单的介绍

Buffer

abstract class Buffer {
    int capacity();
    int position();
    Buffer position(int newPosition);
    int limit();
    Buffer limit(int newLimit);
    Buffer mark();
    Buffer reset();
    Buffer clear();
    Buffer flip();
    Buffer rewind();
    int remaining();
    boolean hasRemaining();
    boolean isReadOnly();
}

很多方法都是见名之意,Buffer内部维护有四个状态变量:mark、position、limit、capacity. 其中mark表示标记的位置,而其余三者之间的关系是:position < limit < capacity. 如图:

这三个变量一起可以跟踪缓冲区的状态和它所包含的数据。

Channel

interface Channel {
    boolean isOpen();
    void close() throws IOException;
}
interface ReadableByteChannel extends Channel {
    int read(ByteBuffer dst) throws IOException;
}
interface WritableByteChannel extends Channel {
    int write(ByteBuffer src) throws IOException;
}
interface ScatteringByteChannel extends ReadableByteChannel {
    int read(ByteBuffer[] dsts, int offset, int length) throws IOException;
    int read(ByteBuffer[] dsts) throws IOException;
}
interface GatheringByteChannel extends WritableByteChannel {
    int write(ByteBuffer[] srcs, int offset, int length) throws IOException;
    int write(ByteBuffer[] srcs) throws IOException;
}

Channel可看作IO操作的连接器。其子类来实现不同的行为,例如用于读,用于写的Channel, 还有聚合、分散功能的Channel。

Selector

abstract class Selector {
    static Selector open() throws IOException;
    Set keys();
    Set selectedKeys();
    int selectNow() throws IOException;
    int select(long timeout) throws IOException;
    int select() throws IOException;
    void wakeup();
    void close() throws IOException;
}

Selector是一种多路复用选择器,Selector注册好的Channel是由SelectionKey来表示。

时间: 2024-08-05 00:35:12

Java IO模型&NIO的相关文章

Java IO模型:BIO、NIO、AIO

Java IO模型:BIO.NIO.AIO 本来是打算直接学习网络框架Netty的,但是先补充了一下自己对Java 几种IO模型的学习和理解.分别是 BIO.NIO.AIO三种IO模型. IO模型的基本说明 BIO模型图 缺点: 如果有很多个Client,则会产生很多个线程.压力主要是在服务器端.客户端的压力并不大. 另外建立连接之后,并不是在时时刻刻的使用.会有空间时间. 会阻塞. NIO模型图 特点: 事件驱动 多路复用 Netty底层使用的NIO模型 AIO模型 目前还未得到广泛运用.异步

java IO、NIO、AIO详解

概述 在我们学习Java的IO流之前,我们都要了解几个关键词 同步与异步(synchronous/asynchronous):同步是一种可靠的有序运行机制,当我们进行同步操作时,后续的任务是等待当前调用返回,才会进行下一步:而异步则相反,其他任务不需要等待当前调用返回,通常依靠事件.回调等机制来实现任务间次序关系 阻塞与非阻塞:在进行阻塞操作时,当前线程会处于阻塞状态,无法从事其他任务,只有当条件就绪才能继续,比如ServerSocket新连接建立完毕,或者数据读取.写入操作完成:而非阻塞则是不

java IO 模型--快速分清 同步|阻塞

IO 介绍 IO 模型 IO请求 分为两个阶段:等待资源 和 使用资源: IO请求:一般需要请求特殊资源(如 磁盘.RAM 或文件),当资源被上一个使用者使用没有释放的时候, IO请求会被阻塞,直到资源可用. 等待资源 阶段有两种策略: 阻塞:当IO请求资源没有准备好的时候,请求阻塞,直到得到资源的响应(有资源 或者 超时异常): 非阻塞:资源不可用时,IO请求直接返回,返回数据不可用标志. 使用资源 阶段,IO 分为 同步IO 和 异步IO 同步IO:应用阻塞在发送数据和接受数据阶段,直到数据

分别使用Java IO、NIO、Netty实现的一个Echo Server示例

分别使用Java IO.Java NIO.Netty来实现一个简单的EchoServer(即原样返回客户端的输入信息). Java IO int port = 9000; ServerSocket ss = new ServerSocket(port); while (true) { final Socket socket = ss.accept(); new Thread(new Runnable() { public void run() { while (true) { try { Buf

java IO和NIO 的区别

Java NIO和IO的主要区别 下表总结了Java NIO和IO之间的主要差别. IO                NIO 面向流            面向缓冲 阻塞IO           非阻塞IO 无 选择器 面向流与面向缓冲 Java NIO和IO之间第一个最大的区别是,IO是面向流的,NIO是面向缓冲区的. Java IO面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方.此外,它不能前后移动流中的数据.如果需要前后移动从流中读取的数据,需要先将它

漫谈Java IO之 NIO那些事儿

前面一篇中已经介绍了基本IO的使用以及最简单的阻塞服务器的例子,本篇就来介绍下NIO的相关内容,前面的分享可以参考目录: 网络IO的基本知识与概念 普通IO以及BIO服务器 NIO的使用与服务器Hello world Netty入门与服务器Hello world Netty深入浅出 NIO,也叫做new-IO或者non-blocking-IO,就暂且理解为非阻塞IO吧. 为什么选择NIO 那么NIO相对于IO来说,有什么优势呢?总结来说: IO是面向流的,数据只能从一端读取到另一端,不能随意读写

Java IO与NIO的总结、比较

一.IO流总结 1.Java I/O主要包括如下3层次: 流式部分--最主要的部分.如:OutputStream.InputStream.Writer.Reader等 非流式部分--如:File类.RandomAccessFile类和FileDescriptor等类 其他--文件读取部分的与安全相关的类,如:SerializablePermission类,以及与本地操作系统相关的文件系统的类,如:FileSystem类和Win32FileSystem类和WinNTFileSystem类. 2.流

JAVA - IO模型

本章内容 1 同步,异步,阻塞,非阻塞的概念 2 五种IO模型:阻塞式IO,非阻塞式IO,多路复用IO,信号驱动IO,异步IO 各自的特点 同步,异步,阻塞,非阻塞(注意:这里是从用户线程和内核层面来介绍概念的) 同步和异步(描述的是用户线程与内核的交互方式 - 消息的通信机制) 1 同步 用户线程发起IO后需要等待 2 异步 用户线程发起IO请求后仍然继续执行,当内核操作完成后会通知用户线程,或者调用用户线程注册的回调函数 阻塞和非阻塞(描述的是程序等待调用结果是的状态 - 调用者的状态) 1

Java IO和NIO文章目录

1.java IO详尽解析 2.深入分析 Java I/O 的工作机制 3.InputStream类详解 4.OutputStream类详解 5.JAVA的节点流和处理流 6.FileInputStream和FileOutputStream详解 7.JAVA IO中的设计模式