NIO基础篇(二)

  Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。

  下面的例子是客户端从服务器端下载文件,客户端使用了多线程技术模拟同时下载。Selector可以同时处理多个客户端的连接事件并通知服务器端进行响应操作。

  服务器端的代码为:  

package nio;

import java.io.File;
import java.io.FileInputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;

//模拟下载服务
public class DownloadServer<T> implements Callable<T>{
    //A selector may be created by invoking the open method of this class, which will use the system‘s default selector provider to create a new selector.
    //A selector may also be created by invoking the openSelector method of a custom selector provider.
    //A selector remains open until it is closed via its close method.
    //A selectable channel‘s registration with a selector is represented by a SelectionKey object.
    private Selector selector;//创建全局selector
    private Map<SocketChannel, Handle> map = new HashMap<SocketChannel, Handle>();//socketChannel和handle之间的映射

    //创建一个服务器serverSocketChannel,并与selector进行注册
    public DownloadServer() throws Exception {
        selector = Selector.open();
        //ServerSocketChannel is a selectable channel for stream-oriented listening sockets.
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //if false then it will be placed non-blocking mode
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.socket().bind(new InetSocketAddress(2361));
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //服务端接收客户端连接事件
    }

    //对selector.select进行迭代,并依次进行处理
    public T call() throws Exception {
        System.out.println("startTo listen in 2361....");
        for(; ;) {
            //Selects a set of keys whose corresponding channels are ready for I/O operations
            //select() performs a blocking selection operation. It returns only after at least one channel is selected
            selector.select();
            Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
            while(keyIterator.hasNext()) {
                //A selectionkey is a token representing the registration of a SelectableChannel with a Selector.
                //A selectionkey is created each time a channel is registered with a selector.
                SelectionKey key = keyIterator.next();
                if(key.isValid())
                    handle(key);
                keyIterator.remove();
            }
        }
    }

    //处理每个key,对于acceptable的key,由主类进行处理,而其他事件,则由内部类进行处理
    private void handle(final SelectionKey key) throws Exception {
        //Tests whether this key‘s channel is ready to accept a new socket connection.
        if(key.isAcceptable()) {
            ServerSocketChannel channel = (ServerSocketChannel) key.channel();
            //Accepts a connection made to this channel‘s socket.
            SocketChannel socketChannel = channel.accept();
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_READ);//注册读事件
            map.put(socketChannel, new Handle());//把socket和handle进行绑定
        }
        //用map中的handle处理read和write事件,以模拟多个文件同时进行下载
        if(key.isReadable() || key.isWritable()) {
            SocketChannel socketChannel = (SocketChannel) key.channel();
            final Handle handle = map.get(socketChannel);
            if(handle != null)
                handle.handle(key);
        }
    }

    //内部类,模拟一个内部类处理一个文件下载服务,多个类可以处理多个文件下载服务
    private class Handle{
        private StringBuilder message;
        private boolean writeOK = true;
        private ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        private FileChannel fileChannel;
        private String fileName;

        private void handle(SelectionKey key) throws Exception {
            if(key.isReadable()) {
                SocketChannel socketChannel = (SocketChannel) key.channel();
                if(writeOK)
                    message = new StringBuilder();
                while(true) {
                    byteBuffer.clear();
                    int r = socketChannel.read(byteBuffer);
                    if(r == 0)
                        break;
                    if(r == -1) {
                        socketChannel.close();
                        key.cancel();
                        return;
                    }
                    message.append(new String(byteBuffer.array(), 0, r));
                }
                //将接收到的信息转化成文件名,以映射到服务器上的指定文件
                if(writeOK && invokeMessage(message)) {
                    socketChannel.register(selector, SelectionKey.OP_WRITE);
                    writeOK = false;
                }
            }
            //向客户端写数据
            if(key.isWritable()) { //读方法中的socketChannel和写方法中的socketChannel应该是一个,不需要flip吗?
                if(!key.isValid())
                    return;
                SocketChannel socketChannel = (SocketChannel) key.channel();
                if(fileChannel == null)
                    fileChannel = new FileInputStream(fileName).getChannel();
                byteBuffer.clear();
                int w = fileChannel.read(byteBuffer);
                //如果文件已写完,则关掉key和socket
                if(w <= 0) {
                    fileName = null;
                    fileChannel.close();
                    fileChannel = null;
                    writeOK = true;
                    socketChannel.close();
                    key.channel();
                    return;
                }
                byteBuffer.flip();
                socketChannel.write(byteBuffer);
            }
        }

        //将信息转化成文件名
        private boolean invokeMessage(StringBuilder message) {
            String m = message.toString();
            try {
                File f = new File(m);
                if(!f.exists())
                    return false;
                fileName = m;
                return true;
            } catch(Exception e) {
                return false;
            }
        }

    }

    public static void main(String[] args) throws Exception {
        /*
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.submit(new DownloadServer<Object>());
        executorService.shutdown();
        */
        new DownloadServer().call();
    }
}

  客户端的代码为:

package nio;

import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SelectorClient<T> implements Callable<T>{
    private FileChannel fileChannel;
    private static Selector selector;
    private ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    private String serverFileName;//服务器上的文件
    private String localFileName;//下载到客户端的文件名

    public SelectorClient(String serverFileName, String localFileName) {
        this.serverFileName = serverFileName;
        this.localFileName = localFileName;
    }

    public T call() throws Exception {
        //开启selector,并建立socket到指定端口的连接
        if(selector == null)
            selector = Selector.open();
        SocketChannel channel = SocketChannel.open();
        channel.configureBlocking(false);
        channel.connect(new InetSocketAddress("172.16.72.181", 2361));
        channel.register(selector, SelectionKey.OP_CONNECT); //客户端连接服务端事件
        //进行信息读取
        for(; ;) {
            selector.select();
            Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
            while(keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove();
                //连接事件
                if(key.isConnectable()) {
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    if(socketChannel.isConnectionPending())
                        socketChannel.finishConnect();
                    socketChannel.write(ByteBuffer.wrap(serverFileName.getBytes()));//向服务器发信息,信息中即服务器上的文件名
                    socketChannel.register(selector, SelectionKey.OP_READ);  // 读事件
                }
                //读事件
                if(key.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    byteBuffer.clear();
                    if(!socketChannel.isConnected())
                        return null;
                    //向本机下载文件创建文件channel
                    if(fileChannel == null)
                        fileChannel = new RandomAccessFile(localFileName, "rw").getChannel();
                    int r = socketChannel.read(byteBuffer);
                    //如果文件下载完毕,则关掉channel,同时关掉socketChannel
                    if(r <= 0) {
                        if(fileChannel != null)
                            fileChannel.close();
                        channel.close();
                        key.cancel();
                        return null;
                    }
                    byteBuffer.flip();
                    //写到下载文件中
                    fileChannel.write(byteBuffer);
                }
            }
        }
    }

    //客户端用10个线程向服务器端下载文件,并保存为不同的文件
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for(int i = 0; i < 10; i++) {
            executorService.submit(new SelectorClient<Object>("test.txt", "d:/down" + i + ".txt"));
        }
        executorService.shutdown();
    }
}
时间: 2025-01-21 21:05:34

NIO基础篇(二)的相关文章

JMS基础篇(二)

简介 异构集成是消息发挥作用的一个领域,大型公司内部可能会遇到很多的平台,Java,.net或者公司自己的平台等. 传送消息还应该支持异步机制,以提高系统整体的性能.异步传输一条消息意味着,发送者不必等到接收者接收或者处理消息,可以接着做后续的处理. 应用程序发送消息至另外一个应用程序,需要使用到消息中间件.消息中间件应提供容错,负载均衡,可伸缩的事务性等特性. JMS与JDBC类似,是一种与厂商无关的API.应用程序开发者可以使用同样的API来访问不同的系统. 可以认为JMS是一种标准,各消息

php基础篇-二维数组排序 array_multisort

原文:php基础篇-二维数组排序 array_multisort 对2维数组或者多维数组排序是常见的问题,在php中我们有个专门的多维数组排序函数,下面简单介绍下: array_multisort(array1,sorting order, sorting type,array2,array3..)是对多个数组或多维数组进行排序的函数. array1 必需.规定输入的数组. sorting order 可选.规定排列顺序.可能的值是 SORT_ASC 和 SORT_DESC. sorting t

php基础篇-二维数组排序姐妹篇

前面介绍了php多维数组排序的一个函数array_multisort() ,想了解的人可以点击 二维数组排序 array_multisort 下面介绍下不适用array_multisort()进行多维数组的排序. 这里介绍下2个php排序函数,一个是asort,一个是arsort. asort(array,sorttype) 函数对数组进行排序并保持索引关系.主要用于对那些单元顺序很重要的结合数组进行排序. 可选的第二个参数包含了附加的排序标识. SORT_REGULAR - 默认.以它们原来的

Qt入门之基础篇 ( 二 ) :Qt项目建立、编译、运行和发布过程解析

转载请注明出处:CN_Simo. 题解: 本篇内容主讲Qt应用从创建到发布的整个过程,旨在帮助读者能够快速走进Qt的世界. 本来计划是讲解Qt源码静态编译,如此的话读者可能并不能清楚地知道为何要静态编译,所以借此篇内容说明一下原由并为之后文章的学习做准备. 即使本片内容只是在围绕一个小小的HelloWorld程序开展,但还是希望朋友们不要急于求成,"欲速则不达". 文章整体思路: 我们循序渐进地来看,一个Qt应用的完成有以下一个重要的步骤: 项目创建->源码编译->程序运行

JavaScript笔记基础篇(二)

基础篇主要是总结一些工作中遇到的技术问题是如何解决的,应为本人属于刚入行阶段技术并非大神如果笔记中有哪些错误,或者自己的一些想法希望大家多多交流互相学习. 1.ToFixed()函数 今天在做Birt报表时, 要显示一列百分比的数据,但因一些特别的原因,不能使用使用百分比样式,即如果数据是0.9538不能显示成“95.38%”的样式,必须显示成“95.38”. 开始时想使用javascript的内置函数Math.round(),可Math.round()只能显示为整数,而不能保留小数. 再网上搜

通过Rancher部署并扩容Kubernetes集群基础篇二

接上一篇通过Rancher部署并扩容Kubernetes集群基础篇一 7. 使用ConfigMap配置redis https://github.com/kubernetes/kubernetes.github.io/blob/master/docs/user-guide/configmap/redis/redis-config redis-config maxmemory 2mb     maxmemory-policy allkeys-lru # kubectl create configma

Hybrid APP基础篇(二)-&gt;Native、Hybrid、React Native、Web App方案的分析比较

说明 Native.Hybrid.React.Web App方案的分析比较 目录 前言 参考来源 前置技术要求 楔子 几种APP开发模式 概述 Native App Web App Hybrid App React Native App 分析 各大开发模式直观对比 如何选择开发模式 另类的app方案 微网页 微信小程序 其它 前言 参考来源 前人栽树,后台乘凉,本文参考了以下来源 对当前主流hybrid app.web app与native app工具的初步比较与分析 H5.React Nati

NIO基础篇(三)

NIO里对性能提升最显著的是内存映射(memory mapping),内存访问的速度往往比文件访问的速度快几个数量级. 在内存映射之前,还需要看NIO的一些其他的特性. 缓冲区分片 slice()方法根据现有的缓冲区创建一个子缓冲区.也就是说,它创建一个新的缓冲区,新缓冲区与原来的缓冲区的一部分共享数据.

自定义View时,用到Paint Canvas的一些温故,讲讲平时一些效果是怎么画的(基础篇 二,图像遮盖,Canvas静态变化)

转载请注明出处:王亟亟的大牛之路 上一篇把简单的一些概念理一理,还画了个圈,那这一篇讲一下图像遮盖"Xfermode"和Canvas的旋转.平移等效果 Xfermode: AvoidXfermode 指定了一个颜色和容差,强制Paint避免在它上面绘图(或者只在它上面绘图). PixelXorXfermode 当覆盖已有的颜色时,应用一个简单的像素异或操作. PorterDuffXfermode 这是一个非常强大的转换模式,使用它,可以使用图像合成的16条Porter-Duff规则的任