NIO初识

  Java编程中的NIO,俗称new I/O,是在JDK1.4版本之后开始引入的,在JDK1.4之前,Java服务端大多使用同步阻塞式来处理网络请求,在低流量、低并发情况还能抗住,在如今互联网时代,信息量很明显差远了,在没有NIO之前,服务器端通信模块基本被C/C++占据着,它们可以利用操作系统的原生API来处理非阻塞事件,随着java的nio类库发布,经过不断发展完善,性能也逐渐与c++媲美了,加上JAVA很多优秀的开源类库,使用更广泛了,现在,来了解一下nio的原理,做一个感官上的认识。

  使用NIO,必须记住如下3个核心概念,编程实现就是围绕他们的关系的:

  1. 缓冲区Buffer:          在nio编程中,读写都在基于缓冲区的,区别于之前的基于流的,根据用途,可以使用字节缓冲区、字符缓冲区等

  2. 通道Channel:        在Buffer里的数据通过Channel与网络交互,是全双工的,而流数单工操作的

  3. 多路复用器Selector: 管理Channel,最基本的就是读写Channel,一个线程使用Selector来轮询读写Channel,通道上有事件发生时,就会进行处理,类似一个函数指针集合,在BLE开发的底层OS上也是这样处理的,增加一个模块,只要写好模块函数,然后把函数指针放到功能数组就可以了,后面就轮询这个注册了的函数,有置位就调用指针进行操作。这种模式可以实现单线程就能支持上千万并发连接。

  下面新建一个工程来测试一下:

  1. 新建一个TestNIO工程,目录结构设为如下:

    

  2. 实现服务器端,代码如下:

    

package cn.linjk.testnio.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * Created by LinJK on 19/11/2016.
 */
public class NioServer {
    private static final int serverPort = 8889;

    public static void main(String[] argc) {
        //启动一个线程来处理Selector
        HelloServer helloServer = new HelloServer(serverPort);
        if (!helloServer.getInitResult()) {
            System.out.println("Init Error");
            System.exit(-1);
        }
        System.out.println("Hello Server listening on localhost:" + serverPort);

        new Thread(helloServer).start();
    }

}

class HelloServer implements Runnable {
    private Selector            selector;
    private ServerSocketChannel serverSocketChannel;
    private volatile boolean    stop;
    private ByteBuffer byteBufferWrite;
    private boolean             contrustorFlag;

    public HelloServer(int port) {
        try {
            selector            = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();

            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            contrustorFlag = true;
        }
        catch (Exception e) {
            contrustorFlag = false;
            e.printStackTrace();
        }
    }

    public boolean getInitResult() {
        return contrustorFlag;
    }

    public void stop() {
        stop = true;
    }

    @Override
    public void run() {
        while (!stop) {
            try {
                selector.select(1000); //1秒轮询周期,可以按需修改

                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                SelectionKey selectionKey = null;

                while (it.hasNext()) {
                    selectionKey = it.next();
                    it.remove();

                    try {
                        //handle event
                        handleIncomeEvent(selectionKey);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                        if (selectionKey != null) {
                            selectionKey.cancel();
                            if (selectionKey.channel() != null) {
                                selectionKey.channel().close();
                            }
                        }
                    }
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        //User set stop listening, clear something
        if (selector != null) {
            try {
                selector.close();
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void handleIncomeEvent(SelectionKey key) {
        if (key.isValid()) {
            //连接事件
            if (key.isAcceptable()) {
                try {
                    ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    //监听到了连接事件,原有基础上注册监听读取用户端数据事件
                    socketChannel.register(selector, SelectionKey.OP_READ);
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
            }

            //读到客户端数据事件
            if (key.isReadable()) {
                SocketChannel socketChannel = (SocketChannel)key.channel();
                ByteBuffer byteBufferRead = ByteBuffer.allocate(1024);
                try {
                    int readCnt = socketChannel.read(byteBufferRead);

                    if (readCnt > 0) {
                        byteBufferRead.flip();//刷新缓冲区,然后从缓冲区读取数据

                        byte[] bytes = new byte[byteBufferRead.remaining()];
                        byteBufferRead.get(bytes);

                        String request = new String(bytes, "UTF-8");
                        System.out.println("Server receive: " + request);

                        //say hello to client
                        byteBufferWrite = ByteBuffer.allocate(20);
                        byteBufferWrite.put("[<<-]Hello".getBytes());
                        byteBufferWrite.flip();//刷新数据到缓冲区
                        socketChannel.write(byteBufferWrite);
                        //避免缓冲区已满,造成写数据不全现象,注册写事件,轮询是否所有数据已写完
                        socketChannel.register(selector, SelectionKey.OP_WRITE);
                    }
                    else if (readCnt < 0) {
                        key.cancel();
                        socketChannel.close();
                    }
                    else {
                        //
                    }
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
            }

            if (key.isWritable()) {
                SocketChannel socketChannel = (SocketChannel)key.channel();

                while (byteBufferWrite.hasRemaining()){
                    //.....
                }
            }
        }
        else {
            System.out.println("Input key unvalid");
        }
    }
}

  3. 实现客户端,测试功能,有些异常没有写全,也没实现重连服务器机制,只把框架写了,代码如下:

    

package cn.linjk.testnio.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * Created by LinJK on 19/11/2016.
 */
public class NioClient {
    private static Selector selector;
    private static SocketChannel socketChannel;
    private static volatile boolean stop;

    public static void main(String[] argc) {

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    selector = Selector.open();
                    socketChannel = SocketChannel.open();
                    socketChannel.configureBlocking(false);
                    //connect to server
                    if (socketChannel.connect(new InetSocketAddress("127.0.0.1", 8889))) {
                        //注册监听服务器返回事件
                        socketChannel.register(selector, SelectionKey.OP_READ);
                        //send request to server
                        ByteBuffer byteBufferWrite = ByteBuffer.allocate(100);
                        byteBufferWrite.put("I am Jim".getBytes());
                        byteBufferWrite.flip();
                        socketChannel.write(byteBufferWrite);
                        if (!byteBufferWrite.hasRemaining()) {
                            System.out.println("Send Finish.");
                        }
                    }
                    else {
                        socketChannel.register(selector, SelectionKey.OP_CONNECT);
                    }

                    while (!stop) {
                        selector.select(1000);

                        Set<SelectionKey> selectionKeys = selector.selectedKeys();
                        Iterator<SelectionKey> it = selectionKeys.iterator();

                        SelectionKey selectionKey = null;
                        while (it.hasNext()) {
                            selectionKey = it.next();
                            it.remove();

                            if (selectionKey.isValid()) {
                                SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
                                if (selectionKey.isConnectable()) {
                                    if (socketChannel.finishConnect()) {
                                        socketChannel.register(selector, SelectionKey.OP_READ);
                                        //send data
                                        ByteBuffer byteBufferWrite = ByteBuffer.allocate(100);
                                        byteBufferWrite.put("I am Jim".getBytes());
                                        byteBufferWrite.flip();
                                        socketChannel.write(byteBufferWrite);
                                        if (!byteBufferWrite.hasRemaining()) {
                                            System.out.println("Send Finish.");
                                        }
                                    }
                                }
                                //收到服务器返回数据事件
                                if (selectionKey.isReadable()) {
                                    ByteBuffer byteBufferRead = ByteBuffer.allocate(100);

                                    int readCnt = socketChannel.read(byteBufferRead);
                                    if (readCnt > 0) {
                                        byteBufferRead.flip();

                                        byte[] bytes = new byte[byteBufferRead.remaining()];
                                        byteBufferRead.get(bytes);

                                        System.out.println("Receive from server: " + new String(bytes, "UTF-8"));
                                        stop = true;
                                    }
                                    else if (readCnt < 0){
                                        selectionKey.channel();
                                        socketChannel.close();
                                    }
                                }
                            }
                        }
                    }

                    if (selector != null) {
                        selector.close();
                    }
                }
                catch (IOException e) {
                    //资源清理....
                    System.exit(1);
                }
            }
        }).start();
    }

}

  4. 代码分析:

    对比服务端和客户端的代码逻辑,有如下两点相似:

    a. 程序启动后创建一个线程来管理Selctor

    b. 都配置为非阻塞操作,然后注册SelctionKey到SocketChanell,然后在线程的run()函数里轮询哪个事件发生了再进行操作

    流程都相似,稍微有点不一样,看代码并运行一下就明白了。

  5. 运行结果:

    先运行Server端,然后运行Client端,二者输出分别如下:

    Server:

    

    Client:

    

   6. 总结:

    NIO和IO直接最大区别就是,NIO是面向缓冲区的,IO是面向流的,面向缓冲区数据处理比较灵活,数据处理速度与吞吐量更大,同时保证数据完整性比较重要,前面提到缓冲区满时,需要检测"半包"也是这个意思,使用NIO的非阻塞避免了因网络情况阻塞造成的高并发环境下时延问题,在高并发通讯情况下,可以使用它来处理通信还是很好的。

时间: 2024-12-28 15:14:16

NIO初识的相关文章

Java NIO (一) 初识NIO

Java NIO(New IO / Non-Blocking IO)是从JDK 1.4版本开始引入的IO API , 可以替代标准的Java IO API .NIO与原来标准IO有同样的作用和目的,但是使用方式和读写方式完全不同,NIO支持面向缓冲区,基于通道的IO操作.NIO以更高效的方式进行文件的读写操作. NIO的核心组成部分: ·  Buffers ·  Channels ·  Selectors Channel 和 Buffer 在NIO中数据源以及数据的目的位置都是直接和通道(Cha

初识中间件之消息队列

初识中间件之消息队列 测试那点事儿 测试那点事儿 初识中间件之消息队列 1 消息队列概述 消息队列是分布式系统中的重要组件,主要解决应用耦合,异步消息,流量削锋等问题,以实现高性能,高可用,可伸缩和最终一致性架构,是大型分布式系统中不可缺少的中间件. 目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等,比如我之前用过的RabbitMQ以及kafka. 2 消息队列应用场景 在实际应用中,消息队列常用于异步处理.应用解耦.流量削锋

深入理解Java NIO

初识NIO: 在 JDK 1. 4 中 新 加入 了 NIO( New Input/ Output) 类, 引入了一种基于通道和缓冲区的 I/O 方式,它可以使用 Native 函数库直接分配堆外内存,然后通过一个存储在 Java 堆的 DirectByteBuffer 对象作为这块内存的引用进行操作,避免了在 Java 堆和 Native 堆中来回复制数据. NIO 是一种同步非阻塞的 IO 模型.同步是指线程不断轮询 IO 事件是否就绪,非阻塞是指线程在等待 IO 的时候,可以同时做其他任务

初理解Java中的BIO,NIO,AIO

初识: java 中的 BIO.NIO和 AIO 理解为是 Java 语言对操作系统的各种 IO 模型的封装.程序员在使用这些 API 的时候,不需要关心操作系统层面的知识,也不需要根据不同操作系统编写不同的代码.只需要使用Java的API就可以了. 在讲 BIO,NIO,AIO 之前先来回顾一下这样几个概念:同步与异步,阻塞与非阻塞. 同步与异步: 同步: 同步就是发起一个调用后,被调用者未处理完请求之前,调用不返回. 异步: 异步就是发起一个调用后,立刻得到被调用者的回应表示已接收到请求,但

下载-深入浅出Netty源码剖析、Netty实战高性能分布式RPC、NIO+Netty5各种RPC架构实战演练三部曲视频教程

下载-深入浅出Netty源码剖析.Netty实战高性能分布式RPC.NIO+Netty5各种RPC架构实战演练三部曲视频教程 第一部分:入浅出Netty源码剖析 第二部分:Netty实战高性能分布式RPC 第三部分:NIO+Netty5各种RPC架构实战演练

初识Python,望君多多关照

在学习Python之前,我们接触过数据结构和网页制作.前者让我们学习如何把C语言运用的更加整齐规范,而后者让我们亲身学习如何运用所学,制作一个静态网页.通过这些课程的学习,让我对C语言产生了比较大的压力,以至于对编程.对这学期的Python课程都有一种如临大敌的感觉. 但是真的学习了这门课程,体会了编码过程中的一些固定运用方法和套路之后,也许过程中对这门课程隐隐约约产生了一点点朦胧的感觉,仿佛他也并没有想象中的那么困难,起码现在的学习让我认为,他可能没有C语言那么繁琐和麻烦.当然,以一个初学者的

初识数组排序!!!!

<!doctype html> <html lang="en"> <head> <meta charset="UTF-8"> <title>初识数组排序</title> <!--调试成功--> <style type="text/css"> *{ padding:0; margin: 0; } li,ul{ list-style: none; } #p

Java NIO (五) 管道 (Pipe)

Java NIO 管道是2个线程之间的单向数据连接.Pipe有一个source通道和一个sink通道.数据会被写到sink通道,从source通道读取. 如下图: 向管道写数据: 从管道读数据: 1. 从读取管道的数据,需要访问source通道. 2. 调用source通道的read()方法来读取数据

初识操作系统和linux

初识操作系统和linux 1.计算机系统由硬件系统和软件系统两大部分组成:是一种能接收和存储信息,并按照存储在其内部的程序对海量数据进行自动.高速地处理,然后把处理结果输出的现代化智能电子设备. 2.世界上第一台计算机是1946年诞生在美国宾州大学. 3.冯·诺依曼体系结构:1946年数学家冯·诺依曼于提出计算机硬件系统由运算器.控制器.存储器.输入设备.输出设备.摩根定律:当价格不变时,集成电路上可容纳的元器件的数目,约每隔18-24个月便会增加一倍,性能也将提升一倍.现在计算机技术进本很难遵