也讲Java NIO

也讲Java NIO

一点开场白

百度搜索java nio,前面的几个帖子总是从各种基础概念介绍起,通道、缓冲区、选择器… 然后看着看着就晕了,所以,经过一晚上的研究,我想从自己的理解讲讲nio。

一、单线程的通信

在没有nio之前,java妥妥的可以进行CS项目间的通信,来个最简单的例子。(懒得写,抄了段)

server 端

package nio.nonio;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

public class SimpleServer {

    public static int PORT = 8080;

    public static void main(String[] agrs) throws IOException {
        ServerSocket s = null;
        Socket socket = null;
        BufferedReader br = null;
        PrintWriter pw = null;
        // 设定服务端的端口号
        s = new ServerSocket(PORT);
        System.out.println("ServerSocket Start:" + s);
        while (true) {
            try {
                // 等待请求,此方法会一直阻塞,直到获得请求才往下走
                System.out.println("waiting for connection...");
                socket = s.accept();
                System.out.println("Connection accept socket:" + socket);
                // 用于接收客户端发来的请求
                br = new BufferedReader(new InputStreamReader(
                        socket.getInputStream()));
                pw = new PrintWriter(new BufferedWriter(new OutputStreamWriter(
                        socket.getOutputStream())), true);
                while (true) {
                    String str = br.readLine();
                    System.out.println("Client Socket Message:" + str);
                    if (str.equals("END")) {
                        break;
                    }
                    Thread.sleep(1000);
                    pw.println("Message Received");
                    pw.flush();
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                System.out.println("Close.");
                try {
                    br.close();
                    pw.close();
                    socket.close();
                } catch (Exception e2) {
                }
            }
        }

    }
}

Client端

package nio.nonio;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.Socket;

public class SimpleClient {

    public static void main(String[] args) {
        Socket socket = null;
        BufferedReader br = null;
        PrintWriter pw = null;
        try {
            // 客户端socket指定服务器的地址和端口号
            socket = new Socket("127.0.0.1", SimpleServer.PORT);
            System.out.println("Socket=" + socket);
            // 同服务器原理一样
            br = new BufferedReader(new InputStreamReader(
                    socket.getInputStream()));
            pw = new PrintWriter(new BufferedWriter(new OutputStreamWriter(
                    socket.getOutputStream())));
            for (int i = 0; i < 10; i++) {
                pw.println("howdy " + i);
                pw.flush();
                String str = br.readLine();
                System.out.println(str);
            }
            pw.println("END");
            pw.flush();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                System.out.println("close......");
                br.close();
                pw.close();
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

其实,以上将是最简单的一个服务端和客户端通信的例子。服务端和客户端分别是用一个线程处理。服务端在while 循环中,会调用s.accept();若没有连接请求,则一直阻塞,直到有连接接入,则不断读取信息,直接读到END,才终止连接,然后继续等待下一个连接。

这种方式,服务端一次只能够处理一个连接,若有多个客户端要接入,则需要排队等待。为了能够快速响应客户端的请求,所以,有了服务端的多线程处理。

二、多线程的通信方式

每次accept一个连接请求后,就建立一个线程来处理。这样,就不用等到本次连接数据读取完毕并关闭后,再继续处理下个连接了。如下图:

代码是这样的。

package nio.multithread;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

public class MultiThreadServer {

    public static int PORT = 8080;

    void start() throws IOException {
        ServerSocket s = null;
        // 设定服务端的端口号
        s = new ServerSocket(PORT);
        System.out.println("ServerSocket Start:" + s);
        while (true) {
            try {
                // 等待请求,此方法会一直阻塞,直到获得请求才往下走
                System.out.println("waiting for connection...");
                Socket socket = null;
                socket = s.accept();
                new Thread(new MessageHandler(socket)).start();
                System.out.println("Connection accept socket:" + socket);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] agrs) throws IOException {
        new MultiThreadServer().start();
    }

    class MessageHandler implements Runnable {
        private Socket socket;

        public MessageHandler(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            BufferedReader br = null;
            PrintWriter pw = null;
            // 用于接收客户端发来的请求
            try {
                br = new BufferedReader(new InputStreamReader(
                        socket.getInputStream()));
                pw = new PrintWriter(new BufferedWriter(new OutputStreamWriter(
                        socket.getOutputStream())), true);
                while (true) {
                    String str = br.readLine();
                    System.out.println("Client Socket Message:" + str);
                    pw.println("Message Received");
                    if (str.equals("END")) {
                        break;
                    }
                    Thread.sleep(1000);
                    pw.flush();
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("Close.");
                try {
                    br.close();
                    pw.close();
                    socket.close();
                } catch (Exception e2) {
                }
            }
        }
    }
}

与之前的方式相比,有了巨大的飞跃,但如果有成千上万个客户端同时与服务器通信,岂不是要建立成千上万个线程?内存吃得消么?不断地创建新的线程,CPU岂不是大量时间要耗费在建新线程中了?所以,这种方式只适合处理有少量客户端同时连接的情况。

三、仅使用一个线程来处理所有的通信?

在单线程情况下,只能支持一个线程处理的原因是,accept一个连接后,一个线程只能够与一个客户端通信。如果有一个线程能够负责与所有的客户端通信,是不是可以解决太多线程的尴尬局面。

于是,我尝试用一个LinkedBlockingQueue存下所有被accept的连接,在一个线程中逐个去处理与这些连接的通信,于是有了下面的代码。

package nio.mynio;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class SingleThreadServer {

    public static int PORT = 8080;

    void start() throws IOException {
        ServerSocket s = null;
        // 设定服务端的端口号
        s = new ServerSocket(PORT);
        BlockingQueue<Socket> sockets = new LinkedBlockingQueue<Socket>();
        new Thread(new MessageHandler(sockets)).start();
        System.out.println("ServerSocket Start:" + s);

        while (true) {
            try {
                // 等待请求,此方法会一直阻塞,直到获得请求才往下走
                System.out.println("waiting for connection...");
                Socket socket = null;
                socket = s.accept();
                sockets.add(socket);
                System.out.println("Connection accept socket:" + socket);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] agrs) throws IOException {
        new SingleThreadServer().start();
    }

    class MessageHandler implements Runnable {
        private BlockingQueue<Socket> sockets;

        public MessageHandler(BlockingQueue<Socket> sockets) {
            this.sockets = sockets;
        }

        @Override
        public void run() {
            BufferedReader br = null;
            PrintWriter pw = null;
            // 用于接收客户端发来的请求
            while (true) {
                Socket socket = null;
                try {
                    System.out.println(sockets.isEmpty());
                    socket = sockets.take();
                    System.out.println("take:");

                    br = new BufferedReader(new InputStreamReader(
                            socket.getInputStream()));
                    pw = new PrintWriter(new BufferedWriter(
                            new OutputStreamWriter(socket.getOutputStream())),
                            true);
                    while (true) {
                        String str = br.readLine();
                        System.out.println("Client Socket Message:" + str);
                        pw.println("Message Received");
                        if (str.equals("END")) {
                            break;
                        }
                        Thread.sleep(1000);
                        pw.flush();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("Close.");
                    try {
                        br.close();
                        pw.close();
                        if (socket != null && !socket.isClosed())
                            socket.close();
                    } catch (Exception e2) {
                    }
                }
            }
        }
    }
}

然而,即便这么做,本质上与单线程没差,若同时又多个客户端接入,是要等上一个连接的数据读完之后,才进行下一个连接的数据处理,所以,这种方式是自欺欺人罢了。究其原因,是因为在调用输入流br.readLine()进行输入时,若没有数据,当前线程会阻塞直到有新的数据,所以,在一个连接没关闭前,是无法进行下一个连接数据的读取的,即便该连接目前并没有数据传输。

那么,若有一个方法如br.readLineNoWait(),若当前没有数据时,返回一个特定值(如-1),而不是阻塞当前状态,我们就可以在这个线程中通过循环读取各个连接来读取数据了。又或者,若有一种机制能够预先将传入的数据读取并放在某个内存区域,并通知我们数据到达了,我们直接从这块区域获取数据而非直接用流来获取,也能够避免等待IO的过程(其实类似于使用channel而非stream来读数据)。

至此,我们可以揣测下NIO的实现方式。

四、java 的nio

第三点中,我想通过一个无阻塞的方式去读inpustream中的数据,这种方式的思路是通过一个线程在循环中不断尝试pull数据来实现对所有连接数据的读入。

nio则是通过订阅-发布模式,首先,我们先新建一个选择器(selector),然后将channel(可以理解为一个连接)注册到选择器中,并且告诉他,当有可读的数据(或者其他事件)时通知我。然后,用一个单独的线程去处理所有订阅的事件,就可以了。

selector的作用,他会帮你管理多个通道(比如多个端口的监控),以及每个通道中的连接(这样能够处理长连接的情况)。

代码如下:

package nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

/**
 * TCP/IP的NIO非阻塞方式 服务器端
 * */
public class Server implements Runnable {

    // 第一个端口
    private Integer port1 = 8099;
    // 第二个端口
    private Integer port2 = 9099;
    // 第一个服务器通道 服务A
    private ServerSocketChannel serversocket1;
    // 第二个服务器通道 服务B
    private ServerSocketChannel serversocket2;
    // 连接1
    private SocketChannel clientchannel1;
    // 连接2
    private SocketChannel clientchannel2;

    // 选择器,主要用来监控各个通道的事件
    private Selector selector;

    // 缓冲区
    private ByteBuffer buf = ByteBuffer.allocate(512);

    Set<SocketChannel> channelSet = new HashSet<SocketChannel>();

    public Server() {
        init();
    }

    /**
     * 这个method的作用 1:是初始化选择器 2:打开两个通道 3:给通道上绑定一个socket 4:将选择器注册到通道上
     * */
    public void init() {
        try {
            // 创建选择器
            this.selector = SelectorProvider.provider().openSelector();
            // 打开第一个服务器通道
            this.serversocket1 = ServerSocketChannel.open();
            // 告诉程序现在不是阻塞方式的
            this.serversocket1.configureBlocking(false);
            // 获取现在与该通道关联的套接字
            this.serversocket1.socket().bind(
                    new InetSocketAddress("localhost", this.port1));
            // 将选择器注册到通道上,返回一个选择键
            // OP_ACCEPT用于套接字接受操作的操作集位
            this.serversocket1.register(this.selector, SelectionKey.OP_ACCEPT);

            // 然后初始化第二个服务端
            this.serversocket2 = ServerSocketChannel.open();
            this.serversocket2.configureBlocking(false);
            this.serversocket2.socket().bind(
                    new InetSocketAddress("localhost", this.port2));
            this.serversocket2.register(this.selector, SelectionKey.OP_ACCEPT);

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    /**
     * 这个方法是连接 客户端连接服务器
     *
     * @throws IOException
     * */
    public void accept(SelectionKey key) throws IOException {
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        // SocketChannel socketChannel = server.accept();
        // channelSet.add(socketChannel);
        if (server.equals(serversocket1)) {
            clientchannel1 = server.accept();
            System.out.println("連接了!");
            clientchannel1.configureBlocking(false);
            // OP_READ用于读取操作的操作集位
            clientchannel1.register(this.selector, SelectionKey.OP_READ);
        } else {
            clientchannel2 = server.accept();
            clientchannel2.configureBlocking(false);
            // OP_READ用于读取操作的操作集位
            clientchannel2.register(this.selector, SelectionKey.OP_READ);
        }
    }

    /**
     * 从通道中读取数据 并且判断是给那个服务通道的
     *
     * @throws IOException
     * */
    public void read(SelectionKey key) throws IOException {

        this.buf.clear();
        // 通过选择键来找到之前注册的通道
        // 但是这里注册的是ServerSocketChannel为什么会返回一个SocketChannel??
        SocketChannel channel = (SocketChannel) key.channel();
        // 从通道里面读取数据到缓冲区并返回读取字节数
        System.out.println(channel.isConnected());
        System.out.println(channel.isOpen());
        int count = -1;
        try {
            count = channel.read(this.buf);
        } catch (IOException e) {
            System.out.println("异常,关闭连接");
        } finally {
            if (count == -1) {
                // 取消这个通道的注册
                System.out.println("取消这个通道的注册");
                key.channel().close();
                key.cancel();
                return;
            }
        }

        // 将数据从缓冲区中拿出来
        String input = new String(this.buf.array()).trim();
        // 那么现在判断是连接的那种服务
        if (channel.equals(this.clientchannel1)) {
            System.out.println("欢迎您使用服务A");
            System.out.println("您的输入为:" + input);
        } else {
            System.out.println("欢迎您使用服务B");
            System.out.println("您的输入为:" + input);
        }

    }

    @Override
    public void run() {
        while (true) {
            try {
                System.out.println("running ... ");
                // 选择一组键,其相应的通道已为 I/O 操作准备就绪。
                this.selector.select();

                // 返回此选择器的已选择键集
                // public abstract Set<SelectionKey> selectedKeys()
                Iterator selectorKeys = this.selector.selectedKeys().iterator();
                while (selectorKeys.hasNext()) {
                    System.out.println("running2 ... ");
                    // 这里找到当前的选择键
                    SelectionKey key = (SelectionKey) selectorKeys.next();
                    // 然后将它从返回键队列中删除
                    selectorKeys.remove();
                    if (!key.isValid()) { // 选择键无效
                        continue;
                    }
                    if (key.isAcceptable()) {
                        // 如果遇到请求那么就响应
                        this.accept(key);
                    } else if (key.isReadable()) {
                        // 读取客户端的数据
                        System.out.println("isReadable");
                        this.read(key);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        Server server = new Server();
        Thread thread = new Thread(server);
        thread.start();
    }
}
package nio;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

/**
 * TCP/IP的NIO非阻塞方式 客户端
 * */
public class Client {

    // 创建缓冲区
    private ByteBuffer buffer = ByteBuffer.allocate(512);

    // 访问服务器

    public void query(String host, int port) throws IOException {
        InetSocketAddress address = new InetSocketAddress(
                InetAddress.getByName(host), port);
        SocketChannel socket = null;
        byte[] bytes = new byte[512];
        socket = SocketChannel.open();
        socket.connect(address);
        int i = 1;
        try {
            while (true) {
                System.in.read(bytes);
                if ("close".equals(bytes)) {
                    socket.close();
                }
                buffer.clear();
                buffer.put(bytes);
                buffer.flip();
                socket.write(buffer);
                buffer.clear();
                if (i++ == 2)
                    break;
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println("finally close");
            if (socket != null) {
                socket.close();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        new Client().query("localhost", 8099);
        // new Client().query("localhost", 9099);
    }
}

看到一个总结感觉很形象,贴出来:

Channel 通道

Buffer 缓冲区

Selector 选择器

其中Channel对应以前的流,Buffer不是什么新东西,Selector是因为nio可以使用异步的非堵塞模式才加入的东西。

以前的流总是堵塞的,一个线程只要对它进行操作,其它操作就会被堵塞,也就相当于水管没有阀门,你伸手接水的时候,不管水到了没有,你就都只能耗在接水(流)上。

nio的Channel的加入,相当于增加了水龙头(有阀门),虽然一个时刻也只能接一个水管的水,但依赖轮换策略,在水量不大的时候,各个水管里流出来的水,都可以得到妥善接纳,这个关键之处就是增加了一个接水工,也就是Selector,他负责协调,也就是看哪根水管有水了的话,在当前水管的水接到一定程度的时候,就切换一下:临时关上当前水龙头,试着打开另一个水龙头(看看有没有水)。

当其他人需要用水的时候,不是直接去接水,而是事前提了一个水桶给接水工,这个水桶就是Buffer。也就是,其他人虽然也可能要等,但不会在现场等,而是回家等,可以做其它事去,水接满了,接水工会通知他们。

这其实也是非常接近当前社会分工细化的现实,也是统分利用现有资源达到并发效果的一种很经济的手段,而不是动不动就来个并行处理,虽然那样是最简单的,但也是最浪费资源的方式。

参考

.java Socket通信(一)

Java NIO 系列教程

时间: 2024-10-04 13:39:16

也讲Java NIO的相关文章

Java NIO 系列教程(转)

原文中说了最重要的3个概念,Channel 通道Buffer 缓冲区Selector 选择器其中Channel对应以前的流,Buffer不是什么新东西,Selector是因为nio可以使用异步的非堵塞模式才加入的东西.以前的流总是堵塞的,一个线程只要对它进行操作,其它操作就会被堵塞,也就相当于水管没有阀门,你伸手接水的时候,不管水到了没有,你就都只能耗在接水(流)上.nio的Channel的加入,相当于增加了水龙头(有阀门),虽然一个时刻也只能接一个水管的水,但依赖轮换策略,在水量不大的时候,各

Java NIO系列教程(三) Buffer

原文链接:http://ifeve.com/buffers/ 声明:Java NIO系列教材并非本人原创,只因阅读原文之后有感于文章之精妙,意欲与诸位共享,故而出此下策,忘原作者见谅.另附上原文地址. Java NIO的通道类似流,但又有些不同: Java NIO中的Buffer用于和NIO通道进行交互.如你所知,数据是从通道读入缓冲区,从缓冲区写入到通道中的. 缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存.这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问

java大文件读写操作,java nio 之MappedByteBuffer,高效文件/内存映射

java处理大文件,一般用BufferedReader,BufferedInputStream这类带缓冲的Io类,不过如果文件超大的话,更快的方式是采用MappedByteBuffer. MappedByteBuffer是java nio引入的文件内存映射方案,读写性能极高.NIO最主要的就是实现了对异步操作的支持.其中一种通过把一个套接字通道(SocketChannel)注册到一个选择器(Selector)中,不时调用后者的选择(select)方法就能返回满足的选择键(SelectionKey

Java NIO 系列教程

转载于http://www.iteye.com/magazines/132-Java-NIO Java NIO(New IO)是从Java 1.4版本开始引入的一个新的IO API,可以替代标准的Java IO API.本系列教程将有助于你学习和理解Java NIO.感谢并发编程网的翻译和投递. (关注ITeye官微,随时随地查看最新开发资讯.技术文章.) Java NIO提供了与标准IO不同的IO工作方式: Channels and Buffers(通道和缓冲区):标准的IO基于字节流和字符流

Java NIO系列教程(一) Java NIO 概述

Java NIO 由以下几个核心部分组成: Channels Buffers Selectors 虽然Java NIO 中除此之外还有很多类和组件,但在我看来,Channel,Buffer 和 Selector 构成了核心的API.其它组件,如Pipe和FileLock,只不过是与三个核心组件共同使用的工具类.因此,在概述中我将集中在这三个组件上.其它组件会在单独的章节中讲到. Channel 和 Buffer 基本上,所有的 IO 在NIO 中都从一个Channel 开始.Channel 有点

Java NIO Channel之FileChannel [ 转载 ]

Java NIO Channel之FileChannel [ 转载 ] @author zachary.guo 对于文件 I/O,最强大之处在于异步 I/O(asynchronous I/O),它允许一个进程可以从操作系统请求一个或多个 I/O 操作而不必等待这些操作的完成.发起请求的进程之后会收到它请求的 I/O 操作已完成的通知.异步 I/O 是一种高级性能,当前的很多操作系统都还不具备.因此,文件通道在多数情况下来说总是阻塞式的,因此不能被置于非阻塞模式. FileChannel 对象不能

转:Java NIO

Java NIO(New IO)是从Java 1.4版本开始引入的一个新的IO API,可以替代标准的Java IO API.本系列教程将有助于你学习和理解Java NIO.感谢并发编程网的翻译和投递. (关注ITeye官微,随时随地查看最新开发资讯.技术文章.) Java NIO提供了与标准IO不同的IO工作方式: Channels and Buffers(通道和缓冲区):标准的IO基于字节流和字符流进行操作的,而NIO是基于通道(Channel)和缓冲区(Buffer)进行操作,数据总是从通

Java NIO Buffer

Java NIO中的Buffer用于和NIO通道进行交互.如你所知,数据是从通道读入缓冲区,从缓冲区写入到通道中的.交互图如下: 缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存.这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该块内存. 下面是NIO Buffer相关的话题列表: 1.Buffer的基本用法 使用Buffer读写数据一般遵循以下四个步骤: 写入数据到Buffer 调用flip()方法 从Buffer中读取数据 调用clear()方法或者com

理解Java NIO

基础概念• 缓冲区操作缓冲区及操作是所有I/O的基础,进程执行I/O操作,归结起来就是向操作系统发出请求,让它要么把缓冲区里的数据排干(写),要么把缓冲区填满(读).如下图• 内核空间.用户空间 上图简单描述了数据从磁盘到用户进程的内存区域移动的过程,其间涉及到了内核空间与用户空间.这两个空间有什么区别呢? 用户空间就是常规进程(如JVM)所在区域,用户空间是非特权区域,如不能直接访问硬件设备.内核空间是操作系统所在区域,那肯定是有特权啦,如能与设备控制器通讯,控制用户区域的进程运行状态.进程执