Java异步非阻塞IO NIO使用与代码分析

[TOC]


Java异步非阻塞IO NIO使用与代码分析

TimeServer程序的NIO实现完整代码

TimeServer程序来自书本《Netty权威指南》,nio的代码确实有些难懂(这也是后面需要使用Netty的原因之一),不过我对代码加了注释,这样一来对nio的概念及基本的使用都会有一个非常清晰的认识:

服务端程序

TimeServer.java:

package cn.xpleaf.nio;

public class TimeServer {
    public static void main(String[] args) {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(port);
            } catch (Exception e) {
                // 采用默认值
            }
        }
        new Thread(new MultiplexerTimeServer(port)).start();
    }
}

MultiplexerTimeServer.java:

package cn.xpleaf.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.sql.Date;
import java.util.Iterator;
import java.util.Set;

public class MultiplexerTimeServer implements Runnable {

    private Selector selector;

    private ServerSocketChannel servChannel;

    private volatile boolean stop;

    /**
     * 初始化多路复用器,绑定监听端口
     */
    public MultiplexerTimeServer(int port) {
        try {
            // 创建多路复用器Selector
            selector = Selector.open();
            // 创建ServerSocketChannel,它相当于是所有客户端连接的父管道
            servChannel = ServerSocketChannel.open();
            // 将ServerSocketChannel设置为异步非阻塞
            servChannel.configureBlocking(false);
            // 绑定侦听端口,backlog为1024,表示serverchannel容纳的最大的客户端数量为1024(个人查找资料得出的结果,不一定准确)
            servChannel.socket().bind(new InetSocketAddress(port), 1024);
            // 将ServerSocketChannel注册到selector上,并监听SelectionKey.OP_ACCEPT操作位
            servChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("The time server is start in port : " + port);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }

    }

    public void stop() {
        this.stop = true;
    }

    @Override
    public void run() {
        while (!stop) {
            try {
                // timeout - 如果为正,则在等待某个通道准备就绪时最多阻塞 timeout 毫秒;如果为零,则无限期地阻塞;必须为非负数(API文档)
                // 休眠时间为1s,无论是否有读写等事件发生,selector每隔1s都被唤醒一次
                selector.select(1000);
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    // 获取key值,通过对key进行操作,可以获取到其所对应的注册到selector上的channel
                    // 最初是只有一个ServerSocketChannel所对应的key,也就是前面所创建的servChannel,它相当于是所有客户端连接的父管道
                    // nio的服务端就是通过它来创建与客户端的连接的,因为目前的代码就只有它监听了SelectionKey.OP_ACCEPT操作位
                    key = it.next();
                    // 同时把该key值从selectedKeys集合中移除
                    it.remove();
                    // 处理该key值
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (Throwable t) {
                // TODO Auto-generated catch block
                t.printStackTrace();
            }
        }

        // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 对key进行处理
     *
     * @param key
     * @throws IOException
     */
    public void handleInput(SelectionKey key) throws IOException {
        // 处理新接入的请求消息
        if (key.isValid()) {

            // 连接建立时
            if (key.isAcceptable()) {
                // 接收新的连接
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                SocketChannel sc = ssc.accept();
                // 设置SocketChannel为异步非阻塞
                sc.configureBlocking(false);
                // 注册新的连接到多路复用器selector中,监听SelectionKey.OP_READ操作位
                sc.register(selector, SelectionKey.OP_READ);
            }

            // 读数据
            if (key.isReadable()) {
                // 通过key获取到其注册在Selector上的channel
                SocketChannel sc = (SocketChannel) key.channel();
                // 分配一个新的字节缓冲区,大小为1024KB,即1MB
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                // 由于前面已经将该SocketChannel设置为异步非阻塞模式,因此它的read是非阻塞的
                // 返回值为读取到的字节数
                // 返回值不同,意义不同:
                /**
                 * 大小0:读到了字节,对字节进行编解码 等于0:没有读取到字节,属于正常场景,忽略 为-1:链路已经关闭,需要关闭SocketChannel,释放资源
                 */
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    // 读取到字节,进行解码操作

                    // 将缓冲区当前的limit设置为position,position设置为0,用于后续对缓冲区的读取操作(我想这是API中定义的吧)
                    readBuffer.flip();
                    // 根据缓冲区可读的字节个数创建字节数组
                    byte[] bytes = new byte[readBuffer.remaining()];
                    // 将缓冲区可读的字节数组复制到新创建的字节数组中
                    readBuffer.get(bytes);
                    // 将字节数组以utf-8方式转换为字符串
                    String body = new String(bytes, "utf-8");
                    System.out.println("The time server receive order : " + body);
                    // 解析客户端发送的指令,同时构造返回结果
                    String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)
                            ? new Date(System.currentTimeMillis()).toString()
                            : "BAD ORDER";
                    // 将应答消息异步发送给客户端
                    doWrite(sc, currentTime);
                } else if (readBytes < 0) {
                    // 对端链路关闭
                    key.cancel();
                    sc.close();
                } else {
                    ; // 读到0字节忽略
                }
            }
        }
    }

    /**
     * 将应答消息异步发送给客户端
     *
     * @param channel
     * @param response
     * @throws IOException
     */
    public void doWrite(SocketChannel channel, String response) throws IOException {
        if (response != null && response.trim().length() > 0) {
            // 将字符串编码为字节数组
            byte[] bytes = response.getBytes();
            // 根据字节数组的容量创建ByteBuffer
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            // 将字节数组复制到缓冲区
            writeBuffer.put(bytes);
            // flip操作
            writeBuffer.flip();
            // 将缓冲区的字节数组发送出去
            channel.write(writeBuffer);
            /**
             * 注意这里并没有处理半包问题,《Netty权威指南》中的说明如下(P35)
             * 需要指出的是,由于SocketChannel是异步非阻塞的,它并不保证一次能够把需要发送的字节数组发送完,此时会出现半包问题。
             * 我们需要注册写操作,不断轮询Selector将没有发送完的ByteBuffer发送完毕,然后可以通过ByteBuffer的hasRemain()方法
             * 判断消息是否发送完成。此处仅仅是个简单的入门级例程,没有演示如何处理“写半包”场景,后续的章节会有详细说明。
             */
        }
    }

}

客户端程序

TimeClient.java:

package cn.xpleaf.nio;

public class TimeClient {
    public static void main(String[] args) {
        int port = 8080;
        if(args != null && args.length > 0) {
            try {
                port = Integer.valueOf(port);
            } catch (Exception e) {
                // 采用默认值
            }
        }
        new Thread(new TimeClientHandle("127.0.0.1", port)).start();
    }
}

TimeClientHandle.java:

package cn.xpleaf.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class TimeClientHandle implements Runnable {

    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean stop;

    /**
     * 初始化多路复用器,设置连接的服务端地址和端口
     *
     * @param host
     * @param port
     */
    public TimeClientHandle(String host, int port) {
        this.host = host == null ? "127.0.0.1" : host;
        this.port = port;
        try {
            // 创建多路复用器Selector
            selector = Selector.open();
            // 创建SocketChannel,用来连接服务端
            socketChannel = SocketChannel.open();
            // 将SocketChannel设置为异步非阻塞
            socketChannel.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    @Override
    public void run() {
        // 先尝试直接连接
        try {
            doConnect();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        // 当然这里也可以将上面的直接连接代码注释,然后使用下面这两行代码
        // 只是需要注意的是,如果一开始没有尝试连接,那么即使后来注册侦听连接也是没有意义的
        // 此时没有发送连接请求,服务端根本就不会响应
        // socketChannel.connect(new InetSocketAddress(host, port));
        // socketChannel.register(selector, SelectionKey.OP_CONNECT);
        while (!stop) {
            try {
                // timeout - 如果为正,则在等待某个通道准备就绪时最多阻塞 timeout 毫秒;如果为零,则无限期地阻塞;必须为非负数(API文档)
                // 休眠时间为1s,无论是否有读写等事件发生,selector每隔1s都被唤醒一次
                selector.select(1000);
                // 获取所有就绪的channel的key
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    // 获取key值,通过对key进行操作,可以获取到其所对应的注册到selector上的channel
                    // 最初是只有一个ServerSocketChannel所对应的key,也就是前面所创建的servChannel,它相当于是所有客户端连接的父管道
                    // nio的服务端就是通过它来创建与客户端的连接的,因为目前的代码就只有它监听了SelectionKey.OP_ACCEPT操作位
                    key = it.next();
                    // 同时把该key值从selectedKeys集合中移除
                    it.remove();
                    // 处理该key值
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }

        // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 对key进行处理
     *
     * @param key
     * @throws IOException
     */
    public void handleInput(SelectionKey key) throws IOException {
        if (key.isValid()) {

            // 通过key获取到SocketChannel
            SocketChannel sc = (SocketChannel) key.channel();

            // isConnectable是判断是否处于连接状态
            // 如果是,说明服务端已经返回ACK应答消息,后面就需要对连接结果进行判断
            if (key.isConnectable()) {
                // 对连接结果进行判断
                if (sc.finishConnect()) {
                    // 注册SocketChannel到多路复用器selector上,并监听SelectionKey.OP_READ操作位
                    sc.register(selector, SelectionKey.OP_READ);
                    doWrite(sc);
                } else {
                    // 连接失败,进程退出
                    System.exit(1);
                }
            }

            // 读数据
            if (key.isReadable()) {
                // 分配一个新的字节缓冲区,大小为1024KB,即1MB
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                // 由于前面已经将该SocketChannel设置为异步非阻塞模式,因此它的read是非阻塞的
                // 返回值为读取到的字节数
                // 返回值不同,意义不同:
                /**
                 * 大小0:读到了字节,对字节进行编解码 等于0:没有读取到字节,属于正常场景,忽略 为-1:链路已经关闭,需要关闭SocketChannel,释放资源
                 */
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    // 读取到字节,进行解码操作

                    // 将缓冲区当前的limit设置为position,position设置为0,用于后续对缓冲区的读取操作(我想这是API中定义的吧)
                    readBuffer.flip();
                    // 根据缓冲区可读的字节个数创建字节数组
                    byte[] bytes = new byte[readBuffer.remaining()];
                    // 将缓冲区可读的字节数组复制到新创建的字节数组中
                    readBuffer.get(bytes);
                    // 将字节数组以utf-8方式转换为字符串
                    String body = new String(bytes, "utf-8");
                    System.out.println("Now : " + body);
                } else if (readBytes < 0) {
                    // 对端链路关闭
                    key.cancel();
                    sc.close();
                } else {
                    ; // 读到0字节忽略
                }
            }
        }
    }

    /**
     * 连接到服务端
     *
     * @throws IOException
     */
    private void doConnect() throws IOException {
        // 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
        if (socketChannel.connect(new InetSocketAddress(host, port))) {
            socketChannel.register(selector, SelectionKey.OP_READ);
            doWrite(socketChannel);
        } else {
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
    }

    /**
     * 写操作
     *
     * @throws IOException
     */
    private void doWrite(SocketChannel sc) throws IOException {
        // 将字符串编码为字节数组
        byte[] req = "QUERY TIME ORDER".getBytes();
        // 根据字节数组的容量创建ByteBuffer
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        // 将字节数组复制到缓冲区
        writeBuffer.put(req);
        // flip操作
        writeBuffer.flip();
        // 将缓冲区的字节数组发送出去
        sc.write(writeBuffer);
        if (!writeBuffer.hasRemaining()) {
            System.out.println("Send order 2 server succeesd.");
        }
        // 也是没有处理"半包写"的问题,可以查看服务端程序的代码注释说明
    }

}

程序测试

服务端执行:

The time server is start in port : 8080

客户端执行:

Send order 2 server succeesd.
Now : 2018-02-10

此时再查看服务端的输出结果:

The time server is start in port : 8080
The time server receive order : QUERY TIME ORDER

原文地址:http://blog.51cto.com/xpleaf/2070942

时间: 2024-10-14 20:18:21

Java异步非阻塞IO NIO使用与代码分析的相关文章

Nginx:异步非阻塞IO

在使用socket编程中,经常会看到阻塞.非阻塞.同步.异步,那么它们之间到底有什么关系跟区别呢? 本次将那Nginx的异步非阻塞的事件驱动模型来解释一下它们之间的关系. 阻塞IO 在linux中,默认所有socket都是阻塞的. 这意味着使用该socket调用诸如recv的函数时,在没有数据到达之前,该函数将不会返回,导致线程被阻塞,直到数据到达. 非阻塞IO 我们可以使用fcntl把socket设置为非阻塞的. 这意味着使用该socket调用诸如recv的函数时,该函数将立刻返回,可以根据返

转一贴,今天实在写累了,也看累了--【Python异步非阻塞IO多路复用Select/Poll/Epoll使用】

下面这篇,原理理解了, 再结合 这一周来的心得体会,整个框架就差不多了... http://www.haiyun.me/archives/1056.html 有许多封装好的异步非阻塞IO多路复用框架,底层在linux基于最新的epoll实现,为了更好的使用,了解其底层原理还是有必要的.下面记录下分别基于Select/Poll/Epoll的echo server实现.Python Select Server,可监控事件数量有限制: 1 2 3 4 5 6 7 8 9 10 11 12 13 14

异步非阻塞IO的Python Web框架--Tornado

Tornado的全称是Torado Web Server,从名字上就可知它可用作Web服务器,但同时它也是一个Python Web的开发框架.最初是在FriendFeed公司的网站上使用,FaceBook收购之后便进行了开源. 作为Web框架,是一个轻量级的Web框架,类似于另一个Python web 框架Web.py,其拥有异步非阻塞IO的处理方式. 作为Web服务器,Tornado有较为出色的抗负载能力,官方用nginx反向代理的方式部署Tornado和其它Python web应用框架进行对

nodejs的异步非阻塞IO

简单表述一下:发启向系统IO操作请求,系统使用线程池IO操作,执行完放到事件队列里,node主线程轮询事件队列,读取结果与调用回调.所以说node并非真的单线程,还是使用了线程池的多线程. 上个图看看吧 举一反三:所有的异步非阻塞思路都类似,如:nginx,python的模拟异步非阻塞,还有java的nio.C#的 EAP

swoole与php协程实现异步非阻塞IO开发

“协程可以在遇到阻塞的时候中断主动让渡资源,调度程序选择其他的协程运行.从而实现非阻塞IO” 然而php是不支持原生协程的,遇到阻塞时如不交由异步进程来执行是没有任何意义的,代码还是同步执行的,如下所示: function foo() { $db=new Db(); $result=(yield $db->query()); yield $result; } 上面的数据库查询操作是阻塞的,当调度器调度该协程到这一步时发现执行了阻塞操作,此时调度器该怎么办?选择其余协程执行?那该协程的阻塞操作又该

Java网络编程和NIO详解5:Java 非阻塞 IO 和异步 IO

Java网络编程和NIO详解5:Java 非阻塞 IO 和异步 IO Java 非阻塞 IO 和异步 IO 转自https://www.javadoop.com/post/nio-and-aio 本系列文章首发于我的个人博客:https://h2pl.github.io/ 欢迎阅览我的CSDN专栏:Java网络编程和NIO https://blog.csdn.net/column/details/21963.html 部分代码会放在我的的Github:https://github.com/h2p

【面试】迄今为止把同步/异步/阻塞/非阻塞/BIO/NIO/AIO讲的这么清楚的好文章(快快珍藏)

网上有很多讲同步/异步/阻塞/非阻塞/BIO/NIO/AIO的文章,但是都没有达到我的心里预期,于是自己写一篇出来. 常规的误区 假设有一个展示用户详情的需求,分两步,先调用一个HTTP接口拿到详情数据,然后使用适合的视图展示详情数据. 如果网速很慢,代码发起一个HTTP请求后,就卡住不动了,直到十几秒后才拿到HTTP响应,然后继续往下执行. 这个时候你问别人,刚刚代码发起的这个请求是不是一个同步请求,对方一定回答是.这是对的,它确实是. 但你要问它为什么是呢?对方一定是这样回答的,“因为发起请

从操作系统内核看Java非阻塞IO事件检测

非阻塞服务器模型最重要的一个特点是,在调用读取或写入接口后立即返回,而不会进入阻塞状态.在探讨单线程非阻塞IO模型前必须要先了解非阻塞情况下Socket事件的检测机制,因为对于非阻塞模式最重要的事情是检测哪些连接有感兴趣的事件发生,一般会有如下三种检测方式. 应用程序遍历socket检测 如图所示,当多个客户端向服务器请求时,服务器端会保存一个socket连接列表,应用层线程对socket列表进行轮询尝试读取或写入.对于读取操作,如果成功读取到若干数据则对读取到的数据进行处理,读取失败则下个循环

Java 中阻塞非阻塞io以及同步异步IO

然后借鉴下<Unix网络编程卷>中的理论: IO操作中涉及的2个主要对象为程序进程.系统内核.以读操作为例,当一个IO读操作发生时,通常经历两个步骤: 1,等待数据准备 2,将数据从系统内核拷贝到操作进程中 例如,在socket上的读操作,步骤1会等到网络数据包到达,到达后会拷贝到系统内核的缓冲区:步骤2会将数据包从内核缓冲区拷贝到程序进程的缓冲区中. 阻塞(blocking)与非阻塞(non-blocking)IO IO的阻塞.非阻塞主要表现在一个IO操作过程中,如果有些操作很慢,比如读操作