Java IO多路复用技术简介

<span style="font-size:18px;">package com.winwill.nio;

/**
 * @author qifuguang
 * @date 15-2-4 下午2:07
 */
public class TimeServerMain {
    public static void main(String[] args) throws Exception {
        // 启动时间服务器
        new Thread(new SelectorTimeServer()).start();
    }
}</span>

<span style="font-size:18px;">package com.winwill.nio;

/**
 * @author qifuguang
 * @date 15-2-4 下午2:09
 */
public class TimeClientMain {
    public static void main(String[] args) throws Exception {
        // 创建100个客户端连接到服务器
        for (int i = 0; i < 100; i++) {
            new Thread(new SelectorTimeClient(i + 1)).start();
        }
    }
}
</span>
<span style="font-size:18px;">package com.winwill.nio;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;

/**
 * @author qifuguang
 * @date 15-2-4 下午1:21
 */
public class SelectorTimeServer implements Runnable {
    private static final String TIME_ORDER = "Query Time";
    private Selector selector;
    private ServerSocketChannel serverChannel;
    private volatile boolean stop = false;

    /**
     * 创建Selector, 创建ServerSocketChannel,并设置为非阻塞模式, 注册到selector.
     *
     * @throws Exception
     */
    public SelectorTimeServer() throws Exception {
        selector = Selector.open();
        serverChannel = ServerSocketChannel.open();
        serverChannel.socket().bind(new InetSocketAddress(8080));
        serverChannel.configureBlocking(false);
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    }

    /**
     * 轮询监听selector.
     */
    @Override
    public void run() {
        try {
            System.out.println("时间服务器启动!");
            while (!stop) {
                selector.select(1000);
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    handleKey(key);
                }
            }
            if (selector != null) {
                selector.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 处理每一种selector感兴趣的事件.
     *
     * @param key 轮询监听得到的SelectionKey.
     */
    private void handleKey(SelectionKey key) {
        try {
            if (key.isValid()) {  // 如果连接成功
                if (key.isAcceptable()) {  // 监听到有新客户端连接
                    SocketChannel accept = ((ServerSocketChannel) key.channel()).accept(); // 建立与客户端的连接
                    accept.configureBlocking(false);  // 设置该连接为非阻塞模式
                    accept.register(selector, SelectionKey.OP_READ); // 将该连接注册到selector
                    System.out.println("发现有新客户端连接...");
                }
                if (key.isReadable()) {    // 监听到有客户端发送请求
                    SocketChannel channel = (SocketChannel) key.channel();
                    // 读取客户端发来的请求
                    ByteBuffer buff = ByteBuffer.allocate(1024);
                    int size = channel.read(buff);
                    if (size > 0) {
                        byte[] b = new byte[size];
                        buff.flip();
                        buff.get(b);
                        String order = new String(b, "UTF-8");
                        System.out.println("收到客户端命令:" + order);
                        String content = "";
                        if (order.equalsIgnoreCase(TIME_ORDER)) {
                            content = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
                        } else {
                            content = "命令错误";
                        }
                        // 根据客户端发来的请求做出相应的动作,并将处理结果返回给客户端
                        doWrite(channel, content);
                    } else if (size < 0) {
                        channel.close();
                        key.cancel();
                    } else {
                        ;
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 向指定的SocketChannel发送指定的消息。
     *
     * @param sc      需要向哪一个SocketChannel发送消息
     * @param content 需要发送的消息
     * @throws Exception
     */
    private void doWrite(SocketChannel sc, String content) throws Exception {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        buffer.put(content.getBytes("UTF-8"));
        buffer.flip();
        sc.write(buffer);
        if (!buffer.hasRemaining()) {
            System.out.println("下发消息给客户端:" + content);
        }
    }
}
</span>
<span style="font-size:18px;">package com.winwill.nio;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
 * @author qifuguang
 * @date 15-2-4 下午1:21
 */
public class SelectorTimeClient implements Runnable {
    private static final String TIME_ORDER = "Query Time";
    private SocketChannel channel;
    private Selector selector;
    private volatile boolean stop = false;
    private Integer index;

    /**
     * 创建Selector, SocketChannel.
     *
     * @param index 客户端编号.
     * @throws Exception
     */
    public SelectorTimeClient(Integer index) throws Exception {
        selector = Selector.open();
        channel = SocketChannel.open();
        channel.configureBlocking(false);
        this.index = index;
    }

    /**
     * 轮询监听selector刚兴趣的事件.
     */
    @Override
    public void run() {
        try {
            System.out.println("第" + index + "个客户端启动!");
            // 先尝试异步连接服务器, 如果连接成功,则只需要把channel注册到selector的READ事件,
            // 读取服务器返回的结果. 如果不成功(客户端已经向服务器发送了sync包,但是服务器没有返回ack包, 物理链路还没建立成功)
            // 则把该channel注册到selector的CONNECT事件, 等待服务器返回的ack包.
            if (channel.connect(new InetSocketAddress(8080))) {
                channel.register(selector, SelectionKey.OP_READ);
                doWrite(channel, TIME_ORDER);
            } else {
                channel.register(selector, SelectionKey.OP_CONNECT);
            }
            while (!stop) {
                selector.select(1000);
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    SocketChannel sc = (SocketChannel) key.channel();
                    iterator.remove();
                    if (key.isValid()) {
                        if (key.isReadable()) {  // 监听到可读事件, 读取服务器返回的处理结果.
                            ByteBuffer buff = ByteBuffer.allocate(1024);
                            int size = sc.read(buff);
                            if (size > 0) {
                                byte[] b = new byte[size];
                                buff.flip();
                                buff.get(b);
                                System.out.println("第" + index + "个客户端获取服务器返回时间:" + new String(b));
                                stop = true;
                            } else if (size < 0) {
                                sc.close();
                                key.cancel();
                            } else {
                                ;
                            }
                        }
                        if (key.isConnectable()) {  //监听到服务器返回了ack包, 准备完成连接的建立
                            if (sc.finishConnect()) {  // 调用此方法完成物理链路连接的建立
                                sc.register(selector, SelectionKey.OP_READ); // 建立连接之后注册监听READ事件
                                doWrite(sc, TIME_ORDER);
                            } else {
                                System.exit(1);  //否则,程序退出
                            }
                        }
                    }
                }
            }
            if (selector != null) {
                selector.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 向指定的channel发送指定的消息.
     *
     * @param channel 向哪一个channel发送消息
     * @param content 需要发送的消息
     * @throws Exception
     */
    private void doWrite(SocketChannel channel, String content) throws Exception {
        ByteBuffer buff = ByteBuffer.allocate(1024);
        buff.put(content.getBytes("UTF-8"));
        buff.flip();
        channel.write(buff);
        if (!buff.hasRemaining()) {
            System.out.println("第" + index + "个客户端成功发送请求到服务器:" + content);
        }
    }
}
</span>

时间: 2024-10-03 23:16:06

Java IO多路复用技术简介的相关文章

Java IO流技术

Java IO流技术 Java IO基本总结 Java的IO技术就是用来处理数据的.比如从文件中读取数据,或者将数据写到文件中.在网络中传输的都是二进制,即字节码,我们需要在传输之前,将我们程序中的数据转换成字节码才能进行传输.Java的IO包内就提供了完成这些工作的工具类. 总的来说:当需要以二进制的形式来处理文件时,即不涉及文本内容的操作时,我们可以选择InputStream和OutputStream来完成我们的工作.当需要完成处理字符串相关的工作时,就需要使用Reader和Writer来完

java IO流技术 之 File

IO流技术 概念:input - output 输入输出流: 输入:将文件读到内存中: 输出:将文件从内存中写出到其他地方 作用:主要就是解决设备和设备之间的数据传输问题. File :文件类的使用十分重要 (一)file的构造方法 1 public static void test1(){ 2 3 //三种拼接路径方式 4 //1.直接拼接 5 File filePath = new File("C:\\123\\aaa"); 6 System.out.println("是

druid.io使用技术简介: Hyperloglog

druid.io 使用Hyperloglog 估计基数 参照如下连接 http://blog.codinglabs.org/articles/algorithms-for-cardinality-estimation-part-i.html http://blog.codinglabs.org/articles/algorithms-for-cardinality-estimation-part-ii.html http://blog.codinglabs.org/articles/algori

Java IO流-NIO简介

2017-11-05 22:09:04 NIO NIO:new IO就是新IO的意思,JDK4开始出现新IO,新IO和传统的IO有相同的目的,都是用于进行输入输出的,但是新IO使用了不同的方式来处理输入输出,采用内存映射文件的方式,将文件或者文件中的一段区域映射到内存中,就可以相访问内存一样来访问文件了,这种方式的效率比旧IO要高很多,但是目前好多地方我们看到还是旧IO为主. 一个小例子: Path:路径 Paths:有一个静态方法返回一个路径       public static Path

11 非阻塞套接字与IO多路复用(进阶)

1.非阻塞套接字 第一部分 基本IO模型 1.普通套接字实现的服务端的缺陷 一次只能服务一个客户端! 2.普通套接字实现的服务端的瓶颈!!! accept阻塞! 在没有新的套接字来之前,不能处理已经建立连接的套接字的请求. recv 阻塞! 在没有接受到客户端请求数据之前, 不能与其他客户端建立连接! 3.普通服务器的IO模型 第二部分 非阻塞套接字 1.非阻塞套接字与普通套接字的区别 >>> import socket >>> server = socket.sock

非阻塞套接字与IO多路复用

我们了解了socket之后已经知道,普通套接字实现的服务端的缺陷:一次只能服务一个客户端! 并且,为了使一个客户端能够不断收发消息,我们还要使用while循环来轮询,这极大地降低了我们的效率 accept阻塞! 在没有新的套接字来之前,不能处理已经建立连接的套接字的请求 recv 阻塞! 在没有接受到客户端请求数据之前,不能与其他客户端建立连接 可以用非阻塞接口来尝试解决这个问题! 阻塞IO模型 阻塞IO(blocking IO)的特点:就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被

进程,线程,协程,io多路复用 总结

并发:要做到同时服务多个客户端,有三种技术 1. 进程并行,只能开到当前cpu个数的进程,但能用来处理计算型任务 ,开销最大 2. 如果并行不必要,那么可以考虑用线程并发,单位开销比进程小很多 线程:并发(轮询调度,遇到阻塞就切换) 只要是网络,就会有延迟,有延迟就阻塞,所以比一般的单路要好些 3. 如果轮询不必要,可考虑是否可以只需要遇到阻塞切换 就可以用IO多路复用技术 + 协程来实现阻塞切换,消耗资源很少,并发量最高 原文地址:https://www.cnblogs.com/cxhzy/p

nginx 多进程 + io多路复用 实现高并发

一.nginx 高并发原理 简单介绍:nginx 采用的是多进程(单线程) + io多路复用(epoll)模型 实现高并发 二.nginx 多进程 启动nginx 解析初始化配置文件后会 创建(fork)一个master进程 之后 这个进程会退出 master 进程会 变为孤儿进程 由init进程托管.(可以通过python 或php 启动后创建子进程,然后杀死父进程得见子进程会由init进程托管) 如下图可以看到nginx master 进程由init(ppid 为1 )进程管理. maste

【Java】Java Servlet 技术简介

Java 开发人员兼培训师 Roy Miller 将我们现有的 servlet 介绍资料修改成了这篇易于学习的实用教程.Roy 将介绍并解释 servlet 是什么,它们是如何工作的,如何使用它们来创建您能够想像到的任意复杂度的 Web 应用程序,以及作为一名专业编程人员,您如何才能最有效地使用 servlet. 5 评论: Roy W. Miller ([email protected]), 独立的软件开发辅导员.程序员和作者, RoleModel Software 2004 年 12 月 2