Java NIO 之 Selector 练习

目的:本编文章主要想分享一下NIO方面的知识,由于最近几天工作不忙,趁机学习了下Java NIO Selector的相关知识;主要是实践操作的;具体的理论知识,可以参考网上的文章。

  • 测试用例主要有三种方式

其实,是服务器端的逻辑不变,客户端有三种方式而已。

服务器端:2个selector + channel, 客户端:一个channel

服务器端:2个selector + channel, 客户端:多个channel(多线程方式)

服务器端:2个selector + channel, 客户端:1个selector + channel

服务端,如果想要一个selector+channel的话,直接在initAndRegister()方法中,注释掉相关代码即可了,当然,客户端也要修改端口部分

  • 服务端代码:
package xingej.selector.test002;

//基本思路逻辑:
//------------------------------------------------------------------------------
//1、创建一个通道选择器Selector
//2、创建服务器端的ServerSocketChannel通道
//      设置ServerSocketChannel属性,
//      端口号的绑定
// 3、将通道选择器 与  ServerSocketChannel通道进行绑定,并向通道选择器注册感兴趣的事件
//------------------------------------------------------------------------------
// 4、通道选择器开始工作监听管道事件,调用select()方法,死循环的方式调用
//      如果用户感兴趣的事件发生,就去处理
//      否则,就阻塞在这里

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

public class NIOSelectorServer {
    //这里声明了两个缓存区,发送和接收缓冲区
    //其实,一个就可以了
    private static ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
    private static ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);
    private Selector selector;

    public void initAndRegister() throws Exception {
        //监听两个服务,因此需要两个端口的
        int listenPortA = 8081;
        int listenPortB = 8082;

        //创建第一个ServerSocketChannel对象实例
        ServerSocketChannel serverSocketChannelA = builderServerSocketChannel(listenPortA);
        //创建第二个ServerSocketChannel对象实例
        ServerSocketChannel serverSocketChannelB = builderServerSocketChannel(listenPortB);

        //创建通道选择器Selector
        selector = Selector.open();

        //将serverSocketChannelA 通道注册到通道选择器Selector里
        register(selector, serverSocketChannelA);
        //将serverSocketChannelB 通道注册到通道选择器Selector里
        register(selector, serverSocketChannelB);
    }

    //开始业务监听了
    public void listen() throws Exception {

        System.out.println("-----服务器-------开始接收请求-------OK--------");

        while (true) {
            int readyChannelNum = selector.select();
            if (0 == readyChannelNum) {
                continue;
            }
            //从选择器中的selectedKeys,可以获取此时已经准备好的管道事件
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                //从迭代器移除刚选好的键
                iterator.remove();
                dealSelectionKey(selector, selectionKey);
            }

            Thread.sleep(2000);

        }
    }

    //处理具体事件
    private void dealSelectionKey(Selector selector, SelectionKey selectionKey) throws Exception {
        if (selectionKey.isAcceptable()) {

            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
            SocketChannel clientSocketChannel = serverSocketChannel.accept();
            clientSocketChannel.configureBlocking(false);
            clientSocketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
        } else //读取客户端的内容
            if (selectionKey.isReadable()) {

                SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                receiveBuffer.clear();
                StringBuilder msg = new StringBuilder();
                //将客户端发送过来的数据,从管道中读取到或者说写到 接收缓存里
                while (socketChannel.read(receiveBuffer) > 0) {
                    receiveBuffer.flip();
                    msg.append(new String(receiveBuffer.array()));
                    receiveBuffer.clear();//清楚数据,下次可以重新写入
                }
                socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                //打印输出从客户端读取到的信息
                System.out.println("------>:\t" + msg.toString());

//                socketChannel.close();
            } else
                //向客户端 发送数据
                if (selectionKey.isWritable()) {
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    sendBuffer.flip();
                    socketChannel.write(sendBuffer);
                    selectionKey.interestOps(SelectionKey.OP_READ);
                }
    }

    //将ServerSocketChannel 向 Selector进行注册,也就是将两者绑定在一起,
    private void register(Selector selector, ServerSocketChannel serverSocketChannel) throws Exception {
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    }

    //创建ServerSocketChannel对象,并进行属性设置
    private ServerSocketChannel builderServerSocketChannel(int port) throws Exception {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //设置属性,如非阻塞模式
        serverSocketChannel.configureBlocking(false);
        //绑定端口号
        serverSocketChannel.bind(new InetSocketAddress(port));
        return serverSocketChannel;
    }

    public static void main(String[] args) throws Exception {
        NIOSelectorServer nioSelectorServer = new NIOSelectorServer();
        //初始化 并 注册
        nioSelectorServer.initAndRegister();
        //开始监听
        nioSelectorServer.listen();
    }
}
  • 客户端请求方式一:

模型如下:

代码如下:

package xingej.selector.test002;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class NIOClient {
    public static void main(String[] args) throws Exception {
        SocketChannel clientChannel = SocketChannel.open();
        clientChannel.connect(new InetSocketAddress("localhost", 8081));
        clientChannel.configureBlocking(false);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        buffer.put(new String ("hello, server! ").getBytes());
        buffer.flip();
        clientChannel.write(buffer);
        clientChannel.close();
    }
}
  • 客户端请求方式二:

模型如下:

代码如下:

package xingej.selector.test002;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Random;

public class NIOClient2 {
    public static void main(String[] args) throws Exception {

        String msg = "hello, NIO Server, I‘m ";

        int[] ports = {8081, 8082};
        for (int i = 0; i < 10; i++) {
            int index = i % 2;
            int port = ports[index];
            new Thread(new SocketChannelThread(msg + i +" client", port)).start();
        }
    }
}

class SocketChannelThread implements Runnable {
    //向服务器发送的消息体
    private String msg;
    private int port;

    private SocketChannel clientChannel;

    public SocketChannelThread(String msg, int port) {
        this.msg = msg;
        this.port = port;
    }

    @Override
    public void run() {
        try {
            //创建一个SocketChannel对象实例
            clientChannel = SocketChannel.open();
            //链接服务器
            clientChannel.connect(new InetSocketAddress("localhost", port));
            //设置通道未非阻塞模式
            clientChannel.configureBlocking(false);
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int sendNum = new Random().nextInt(5) + 1;
            for(int i = 0; i < sendNum; i++) {
                buffer.put(new String(msg).getBytes());
                buffer.flip();
                //将缓冲区的内容发送到通道里
                clientChannel.write(buffer);
                //清理缓存区,下次重新写入
                buffer.clear();
                //每次发送完成后,休息几秒中,就是为了测试
                Thread.sleep(sendNum * 1000);
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try{
                //如果此通过处于开通状态的话,就关闭此通道
               if (clientChannel.isOpen()) {
                   System.out.println("-----关闭通道了------");
                   clientChannel.close();
               }
            }catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
  • 客户端请求方式三:

模型如下:

代码如下:

package xingej.selector.test002;
//创建SocketChannel
//      链接服务器
//向服务器发送消息

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

//
public class NIOSelectorClient {
    private static Selector selector;
    private static boolean flag = false;
    private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);

    public void initAndRegister() throws Exception{
        selector = Selector.open();
        createAndRegister(5);
    }

    private void createAndRegister(int socketChannelNum) throws Exception{
        ExecutorService socketThreadPool = Executors.newFixedThreadPool(5);
        CountDownLatch _latchs = new CountDownLatch(socketChannelNum);
        Integer[] ports = {8081, 8082};

        for(int i = 0; i < socketChannelNum; i++) {
            int port = ports[i % 2];
           socketThreadPool.submit(new SocketChannelThread(port, _latchs));
        }
        _latchs.await();
        socketThreadPool.shutdown();
        flag = true;

    }

    class SocketChannelThread implements Runnable{
        private CountDownLatch _latch;
        private int port;
        private SocketChannel socketChannel;

        public SocketChannelThread(int port, CountDownLatch _latch) {
            this.port = port;
            this._latch = _latch;
        }
        @Override
        public void run() {
            try {
                socketChannel= SocketChannel.open();
                socketChannel.configureBlocking(false);
                //1到10秒钟,随机休息
                //这里,添加时间的目的,是想模拟一下,不想同一时间,向服务器发起请求
                int time = (new Random().nextInt(10) + 1) * 1000;
                System.out.println("----此通道----休息的时间是------:\t" + time / 1000 + " 秒");
                Thread.sleep(time);
                System.out.println("--------2-------port:\t" + port);
                socketChannel.connect(new InetSocketAddress("localhost", port));
                System.out.println("--------3-------");
                socketChannel.register(selector, SelectionKey.OP_CONNECT);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //计数器,减一
                _latch.countDown();
            }
        }
    }

    public void listen() throws Exception{

        while (true) {
            System.out.println("-----客户端----准备好了----:\t");
            int readyChannelNum = selector.select();

            System.out.println("-----客户端----准备好的管道数量是-----:\t" + readyChannelNum);
            if (0 == readyChannelNum) {
                continue;
            }

            Set<SelectionKey> selectionKeys = selector.selectedKeys();

            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
               //下面的方法,就可以将selectionKey 键移除
                iterator.remove();
                if (selectionKey.isConnectable()) {
                    if (socketChannel.isConnectionPending()) {
                        socketChannel.finishConnect();
                        System.out.println("----客户端----链接完毕了-----");
                    }
                    socketChannel.register(selector, SelectionKey.OP_WRITE);
                }else if (selectionKey.isWritable()) {
                    sendBuffer.clear();
                    sendBuffer.put("hello, server, I‘m client! Are you OK!!!".getBytes());
                    //flip()必须有的
                    sendBuffer.flip();
                    socketChannel.write(sendBuffer);
                    System.out.println("----客户端---向服务器---发送消息-----完毕----OK-----");
                    //这里注册的事件是write,
                    //效果就是,客户端不断的发送消息
                    //当然,也可以修改成其他事件,如SelectionKey.OP_READ
                    selectionKey.interestOps(SelectionKey.OP_WRITE);
                }

            }
            //每隔1秒中,就向服务器发送信息
            Thread.sleep(1000);
        }

    }

    public static void main(String[] args) throws Exception{
        NIOSelectorClient nioSelectorClient = new NIOSelectorClient();
        nioSelectorClient.initAndRegister();

        //死循环的方式,来监听标志位,
        //一旦标志位发生改变,就开始监听
        while (true) {
            if (flag) {
                nioSelectorClient.listen();
                break;
            }
        }
    }
}
  • 总结:
  • 1、在调用Selector.select()方法之前,最好将要使用的一个SocketChannel或者多个SocketChannel 完成注册功能;也就是说,所有SocketChannel完成注册事件后,才能调用select方法;

不然,很容易出现死锁现象。

如下图所示:

解决措施方式一: 客户端请求方式三,刚开始并没有添加

CountDownLatch 计数器

,针对死锁才添加的。

主线程再调用监听方法时,最好使用观察者模式,目前这里使用了死循环的方式监听,感觉不太好。

  • 2、SocketChannel 通道属于长链接方式,客户端不再发送消息时,通道依旧存在,因此,可以调用Channel.close方法进行关闭
  • 学习方式的建议

如果想更加深入的了解NIO,Selector的话,最好还是不断的进行测试,

如在客户端添加Channel.close(),修改感兴趣的事件,等等

去观察客户端,服务器端的现象,

去总结,去研究源码,

研究源码的目的,不光光是搞清楚背后的原理,

还希望能够学到背后优秀的设计模式,设计思路,使用场景等等,

扩展眼界

代码已分享到git上

https://github.com/xej520/xingej-nio

时间: 2024-10-05 13:58:49

Java NIO 之 Selector 练习的相关文章

Java NIO 选择器(Selector)的内部实现(poll epoll)

http://blog.csdn.net/hsuxu/article/details/9876983 之前强调这么多关于linux内核的poll及epoll,无非是想让大家先有个认识: Java NIO中的选择器依赖操作系统内核的这些系统调用,我们这里只讲解与linux内核相关的NIO实现,当然,windows或其他操作系统实现大体上是类似的,相信大家也可以触类旁通. 那么,本文从这里将从简到难,一步一步为大家讲解选择器的点点滴滴吧. 选择器的宏观理解“有这么一种检查员,她工作在养鸡场,每天的工

Java NIO类库Selector机制解析--转

一.  前言 自从J2SE 1.4版本以来,JDK发布了全新的I/O类库,简称NIO,其不但引入了全新的高效的I/O机制,同时,也引入了多路复用的异步模式.NIO的包中主要包含了这样几种抽象数据类型: Buffer:包含数据且用于读写的线形表结构.其中还提供了一个特殊类用于内存映射文件的I/O操作. Charset:它提供Unicode字符串影射到字节序列以及逆映射的操作. Channels:包含socket,file和pipe三种管道,都是全双工的通道. Selector:多个异步I/O操作集

Java NIO之Selector

选择器是JavaNIO重磅推出的一个概念:在旧有的系统中为了跟踪多端口消息,需要为每一个端口配备一个线程做监听:但是有了selector就不需要了,一个Selector可以管理一众渠道(channel). 选择器的本质就是:让监听的工作由选择起来做:它会定时执行来获取注册到他那里的渠道是否已经准备就绪,比如socketServerChannel是否有新的消息过来(是否做好了accept的准备):然后他会把这个channel放入到一个集合中(SelectedKeys),遍历一遍之后,就可以,你就可

Java NIO (6) Selector

Java NIO Selector A Selector is a Java NIO component which can examine one or more NIO Channel's, and determine which channels are ready for e.g. reading or writing. This way a single thread can manage multiple channels, and thus multiple network con

JAVA NIO 之 Selector 组件

NIO 重要功能就是实现多路复用.Selector是SelectableChannel对象的多路复用器.一些基础知识: 选择器(Selector):选择器类管理着一个被注册的通道集合的信息和它们的就绪状态. 可选择通道(SelectableChannel):这个抽象类提供了实现通道的可选择性所需要的公共方法.它是所有支持就绪检查的通道类的 父类.例如:ServerSocketChannel.SocketChannel.可选择通道可以被注册到选择器上. 选择键(SelectionKey):选择键封

Java NIO 之 Selector

Selector是SelectableChannel的多路选择器,配合SelectableChannel实现非阻塞IO. 详见代码 /** * Selector 是 SelectableChannel的多路选择器</p> * SelectableChannel 通过register函数注册到Selector上</p> * * Selector 维护三个key集合:</br> * 1. 指代当前注册到selector上的channel的key集合</br> *

【JAVA】【NIO】7、Java NIO Selector

selector是Java NIO的组件可以检查一个或多个NIO的channel,并且决定哪个channel是为读写准备好了.这种方式,单个线程可以管理多个channel,也就是多个网络连接. 为什么使用选择器 优点就是更少的线程去处理多个通道.实际上,你可以使用一个线程去处理所有的通道.操作系统中线程的切换是很费资源的,而且每个线程本身也占用了一些资源(内存).所以使用的线程越少越好! 现在的操作系统和CPU在多任务上变得越来越好,所以多线程的开销也变得更小了.事实上,如果一个CPU有多个核心

(四:NIO系列) Java NIO Selector

出处:Java NIO Selector 1.1. Selector入门 1.1.1. Selector的和Channel的关系 Java NIO的核心组件包括: (1)Channel(通道) (2)Buffer(缓冲区) (3)Selector(选择器) 其中Channel和Buffer比较好理解 ,联系也比较密切,他们的关系简单来说就是:数据总是从通道中读到buffer缓冲区内,或者从buffer写入到通道中. 选择器和他们的关系又是什么? 选择器(Selector) 是 Channel(通

7. 彤哥说netty系列之Java NIO核心组件之Selector

<p align="right">--日拱一卒,不期而至!</p> 你好,我是彤哥,本篇是netty系列的第七篇. 简介 上一章我们一起学习了Java NIO的核心组件Buffer,它通常跟Channel一起使用,但是它们在网络IO中又该如何使用呢,今天我们将一起学习另一个NIO核心组件--Selector,没有它可以说就干不起来网络IO. 概念 我们先来看两段Selector的注释,见类java.nio.channels.Selector. 注释I A mul