Netty Reator(二)Scalable IO in Java

Netty Reator(二)Scalable IO in Java

Netty 系列目录 (https://www.cnblogs.com/binarylei/p/10117436.html)

Doug Lea 大神的《Scalable IO in Java》http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf:可伸缩的 IO 模型

大部分 IO 都是下面这个步骤,

  • Read request
  • Decode request
  • Process service
  • Encode reply
  • Send reply

一、经典的网络 IO 模型

传统的 IO 模型是一个 socket 一个线程,代码如下:

class Server implements Runnable {
    public void run() {
        try {
            ServerSocket ss = new ServerSocket(PORT);
            while (!Thread.interrupted())
            new Thread(new Handler(ss.accept())).start(); //创建新线程来handle
            // or, single-threaded, or a thread pool
        } catch (IOException ex) { /* ... */ }
    }

    static class Handler implements Runnable {
        final Socket socket;
        Handler(Socket s) { socket = s; }
        public void run() {
            try {
                byte[] input = new byte[MAX_INPUT];
                socket.getInputStream().read(input);
                byte[] output = process(input);
                socket.getOutputStream().write(output);
            } catch (IOException ex) { /* ... */ }
        }
        private byte[] process(byte[] cmd) { /* ... */ }
    }
}

显然简单的多线程会带来扩展性问题,当 client 数量变的很多的时候,还其他的可用性、性能的问题。解决方法就是 Divide-and-conquer,分开后,就需要 Event-driven Designs 来串联起来...

二。单线程( BasicReactor Design)

所有事情 read、process、write 都由单个线程完成,完成一步重新设置下一步的 event。问题当然就是,其中任何步骤阻塞其它任务就阻塞了,因为只有一个线程。

class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException { // Reactor 初始化
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        serverSocket.configureBlocking(false); // 非阻塞
        SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); // 分步处理,第一步,接收accept事件
        sk.attach(new Acceptor()); //attach callback object, Acceptor
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext())
                    dispatch((SelectionKey)(it.next()); //Reactor负责dispatch收到的事件
                selected.clear();
            }
        } catch (IOException ex) { /* ... */ }
    }

    void dispatch(SelectionKey k) {
        Runnable r = (Runnable)(k.attachment()); //调用之前注册的callback对象
        if (r != null)
            r.run();
    }

    class Acceptor implements Runnable { // inner
        public void run() {
            try {
                SocketChannel c = serverSocket.accept();
                if (c != null)
                new Handler(selector, c);
            }
            catch(IOException ex) { /* ... */ }
        }
    }
}

final class Handler implements Runnable {
    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(MAXIN);
    ByteBuffer output = ByteBuffer.allocate(MAXOUT);
    static final int READING = 0, SENDING = 1;
    int state = READING;

    Handler(Selector sel, SocketChannel c) throws IOException {
        socket = c; c.configureBlocking(false);
        // Optionally try first read now
        sk = socket.register(sel, 0);
        sk.attach(this); //将Handler作为callback对象
        sk.interestOps(SelectionKey.OP_READ); //第二步,接收Read事件
        sel.wakeup();
    }
    boolean inputIsComplete() { /* ... */ }
    boolean outputIsComplete() { /* ... */ }
    void process() { /* ... */ }

    public void run() {
        try {
            if (state == READING) read();
            else if (state == SENDING) send();
        } catch (IOException ex) { /* ... */ }
    }

    void read() throws IOException {
        socket.read(input);
        if (inputIsComplete()) {
            process();
            state = SENDING;
            // Normally also do first write now
            sk.interestOps(SelectionKey.OP_WRITE); //第三步,接收write事件
        }
    }
    void send() throws IOException {
        socket.write(output);
        if (outputIsComplete()) sk.cancel(); //write完就结束了, 关闭select key
    }
}

//上面 的实现用Handler来同时处理Read和Write事件, 所以里面出现状态判断
//我们可以用State-Object pattern来更优雅的实现
class Handler { // ...
    public void run() { // initial state is reader
        socket.read(input);
        if (inputIsComplete()) {
            process();
            sk.attach(new Sender());  //状态迁移, Read后变成write, 用Sender作为新的callback对象
              sk.interest(SelectionKey.OP_WRITE);
            sk.selector().wakeup();
        }
    }
    class Sender implements Runnable {
        public void run(){ // ...
            socket.write(output);
            if (outputIsComplete()) sk.cancel();
        }
    }
}

单线程模式的局限还是比较明显的。所以改进是将比较耗时的部分,从 reactor 线程中分离出去,让 reactor 专门负责 IO,而另外创建 Thread Pool 和 queue 来缓存和处理任务。所以其实已经进化成 Proactor 模式,异步模式。

三、多线程(Worker Threads)

class Handler implements Runnable {
    // uses util.concurrent thread pool
    static PooledExecutor pool = new PooledExecutor(...);
    static final int PROCESSING = 3;
    // ...
    synchronized void read() { // ...
        socket.read(input);
        if (inputIsComplete()) {
            state = PROCESSING;
            pool.execute(new Processer()); //使用线程pool异步执行
        }
    }

    synchronized void processAndHandOff() {
        process();
        state = SENDING; // or rebind attachment
        sk.interest(SelectionKey.OP_WRITE); //process完,开始等待write事件
    }

    class Processer implements Runnable {
        public void run() { processAndHandOff(); }
    }
}

使用多个 reactor 进程,主 reactor 只负责 accept,然后将接收到的 socketchannel 交给 Thread Pool 去处理。

四、主从(Multiple Reactor Threads)

Selector[] selectors; // 一个 selector 代表一个 subReactor
int next = 0;
class Acceptor { // ...
    public synchronized void run() { ...
        Socket connection = serverSocket.accept(); // 主 selector 负责 accept
        if (connection != null)
            new Handler(selectors[next], connection); //选个 subReactor 去负责接收到的 connection
        if (++next == selectors.length) next = 0;
    }
}


每天用心记录一点点。内容也许不重要,但习惯很重要!

原文地址:https://www.cnblogs.com/binarylei/p/10117443.html

时间: 2024-10-11 13:22:28

Netty Reator(二)Scalable IO in Java的相关文章

《Scalable IO in Java》译文

<Scalable IO in Java> 是java.util.concurrent包的作者,大师Doug Lea关于分析与构建可伸缩的高性能IO服务的一篇经典文章,在文章中Doug Lea通过各个角度,循序渐进的梳理了服务开发中的相关问题,以及在解决问题的过程中服务模型的演变与进化,文章中基于Reactor反应器模式的几种服务模型架构,也被Netty.Mina等大多数高性能IO服务框架所采用,因此阅读这篇文章有助于你更深入了解Netty.Mina等服务框架的编程思想与设计模式. 下面是我对

《Scalable IO in Java》笔记

Scalable IO in Java http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf 基本上所有的网络处理程序都有以下基本的处理过程:Read requestDecode requestProcess serviceEncode replySend reply Classic Service Designs 简单的代码实现: class Server implements Runnable { public void run() { try { Se

Java基础学习笔记二十 IO流

转换流 在学习字符流(FileReader.FileWriter)的时候,其中说如果需要指定编码和缓冲区大小时,可以在字节流的基础上,构造一个InputStreamReader或者OutputStreamWriter,这又是什么意思呢? OutputStreamWriter类 查阅OutputStreamWriter的API介绍,OutputStreamWriter 是字符流通向字节流的桥梁:可使用指定的字符编码表,将要写入流中的字符编码成字节.它的作用的就是,将字符串按照指定的编码表转成字节,

Java基础(十二)IO输入输出

一.IO 概述 1.IO 概念 IO:I 代表 Input 输入:O 代表 Output 输出. Java 中 IO 是以流为基础进行输入输出,所有的数据被串行化(保存)写入输出流,或者从输入流读入. 注:数据串行化指把对象的状态以特定的形式(比如 byte[])保存到流,通过流的方式写入. 2.IO 的作用 1.文本文件,通过特定方法能够把数据写到文件,也能够读取出文件中的内容. 2.把信息保存到磁盘文件中. 3.Java 操作文件 1.创建 File 对象方式 测试创建文件的三种方式: 1

Netty入门二:开发第一个Netty应用程序

    既然是入门,那我们就在这里写一个简单的Demo,客户端发送一个字符串到服务器端,服务器端接收字符串后再发送回客户端. 2.1.配置开发环境 1.安装JDK 2.去官网下载jar包 (或者通过pom构建) 2.2.认识下Netty的Client和Server 一个Netty应用模型,如下图所示,但需要明白一点的是,我们写的Server会自动处理多客户端请求,理论上讲,处理并发的能力决定于我们的系统配置及JDK的极限. Client连接到Server端 建立链接发送/接收数据 Server端

Netty Reator(三)Reactor 模型

Netty Reator(三)Reactor 模型 Netty 系列目录 (https://www.cnblogs.com/binarylei/p/10117436.html) 本文介绍 DC Schmidt 大神的一篇文章<Reactor: an object behavioral pattern for concurrent event demultiplexing and event handler dispatching> 一.What:Reactor 模式是什么? 反应器设计模式(R

Java压缩技术(二) ZIP压缩——Java原生实现

原文:http://snowolf.iteye.com/blog/642298 去年整理了一篇ZLib算法Java实现(Java压缩技术(一) ZLib),一直惦记却没时间补充.今天得空,整理一下ZIP的java原生实现. 看了几篇zip压缩算法的帖子,讲的算是比较细致了,但就是没有对应的解压缩实现,太惜败了! 我就喜欢没事做总结,稍作整理,将其收纳! 相关链接: Java压缩技术(一) ZLib Java压缩技术(二) ZIP压缩——Java原生实现 Java压缩技术(三) ZIP解压缩——J

Kafka笔记整理(二):Kafka Java API使用

[TOC] Kafka笔记整理(二):Kafka Java API使用 下面的测试代码使用的都是下面的topic: $ kafka-topics.sh --describe hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 Topic:hadoop PartitionCount:3 ReplicationFactor:3 Configs: Topic: hadoop Partition: 0 Leader:

netty报错:io.netty.channel.ChannelPipelineException

1.九月 23, 2018 8:35:02 下午 io.netty.channel.ChannelInitializer channelRegistered警告: Failed to initialize a channel. Closing: [id: 0xa09c718b, /127.0.0.1:50509 => /127.0.0.1:9999]io.netty.channel.ChannelPipelineException: com.sxt.netty.first.Server4Hell