NIO源码阅读

  自己对着源码敲一遍练习,写上注释。发现NIO编程难度好高啊。。虽然很复杂,但是NIO编程的有点还是很多:

  1、客户端发起的连接操作是异步的,可以通过在多路复用器注册OP_CONNECTION等待后续结果,不需要像BIO的客户端一样被同步阻塞。

  2、SocketChannel的读写操作都是异步的,如果没有可读写的数据它不会同步等待,直接返回,这样I/O通信模型就可以处理其他的链路,不需要同步等待这个链路可用。

  3、线程模型的优化:由于JDK的Selector在Linux等主流操作系统上通过epoll实现,没有连接句柄的限制,那么Selector线程可以同时处理成千上万个客户端连接,而且性能不会随着客户端的增加而线性下降。所以它非常适合做高性能、高负载的网络服务器。

  TimeClient:

 1 package nio;
 2
 3 public class TimeClient {
 4     public static void main(String args[]){
 5         int port = 8080;
 6         if(args != null && args.length > 0){
 7             try{
 8                 port = Integer.valueOf(args[0]);
 9             }catch(NumberFormatException e){
10                 //采用默认值
11             }
12         }
13         new Thread(new TimeClientHandle("120.0.0.1",port),"TimeClient-001").start();
14     }
15 }

TimeClientHandler:

  1 package nio;
  2
  3 import java.io.IOException;
  4 import java.net.InetSocketAddress;
  5 import java.nio.ByteBuffer;
  6 import java.nio.channels.SelectionKey;
  7 import java.nio.channels.Selector;
  8 import java.nio.channels.SocketChannel;
  9 import java.util.Iterator;
 10 import java.util.Set;
 11
 12 public class TimeClientHandle implements Runnable{
 13     private String host;
 14     private int port;
 15     private Selector selector;
 16     private SocketChannel socketChannel;
 17     private volatile boolean stop;
 18
 19     public TimeClientHandle(String host,int port){
 20         this.host = host == null ? "127.0.0.1" : host;
 21         this.port = port;
 22         try{
 23             selector = Selector.open();
 24             socketChannel = SocketChannel.open();
 25             socketChannel.configureBlocking(false);
 26         }catch(IOException e){
 27             e.printStackTrace();
 28             System.exit(1);
 29         }
 30     }
 31
 32
 33     public void run() {
 34         //发送请求连接
 35         try{
 36             doConnect();
 37         }catch(IOException e){
 38             e.printStackTrace();
 39             System.exit(1);
 40         }
 41         while(!stop){
 42             try{
 43                 selector.select(1000);
 44                 Set<SelectionKey> selectedKeys = selector.selectedKeys();
 45                 Iterator<SelectionKey> it = selectedKeys.iterator();
 46                 SelectionKey key = null;
 47                 //当有就绪的Channel时,执行handleInput(key)方法
 48                 while(it.hasNext()){
 49                     key = it.next();
 50                     it.remove();
 51                     try{
 52                         handleInput(key);
 53                     }catch(Exception e){
 54                         if(key != null){
 55                         key.cancel();
 56                             if(key.channel() != null){
 57                                 key.channel().close();
 58                             }
 59                         }
 60                     }
 61                 }
 62             }catch(Exception e){
 63                 e.printStackTrace();
 64                 System.exit(1);
 65             }
 66         }
 67
 68         //多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
 69         if(selector != null){
 70             try{
 71                 selector.close();
 72             }catch(IOException e){
 73                 e.printStackTrace();
 74             }
 75         }
 76
 77     }
 78
 79
 80     private void handleInput(SelectionKey key) throws IOException{
 81         if(key.isValid()){
 82             SocketChannel sc = (SocketChannel)key.channel();
 83             //判断是否连接成功
 84             if(key.isConnectable()){
 85                 if(sc.finishConnect()){
 86                     sc.register(selector, SelectionKey.OP_READ);
 87                 }else{
 88                     System.exit(1);
 89                 }
 90             }
 91
 92             if(key.isReadable()){
 93                 ByteBuffer readBuffer = ByteBuffer.allocate(1024);
 94                 int readBytes = sc.read(readBuffer);
 95                 if(readBytes > 0){
 96                     readBuffer.flip();
 97                         byte[] bytes = new byte[readBuffer.remaining()];
 98                         readBuffer.get(bytes);
 99                         String body = new String(bytes,"UTF-8");
100                         System.out.println("Now is :" + body);
101                         this.stop = true;
102                 }else if(readBytes < 0){
103                     //对端链路关闭
104                     key.cancel();
105                     sc.close();
106                 }else{
107                     ; //读到0字节,忽略
108                 }
109             }
110         }
111     }
112
113     private void doConnect() throws IOException{
114         //如果直接连接成功,则注册到多路复用器上,发送请求信息,读应答
115         if(socketChannel.connect(new InetSocketAddress(host,port))){
116             socketChannel.register(selector, SelectionKey.OP_READ);
117             doWrite(socketChannel);
118         }else{
119             //说明服务器没有返回TCP祸首应答消息,但这并不代表连接失败,当服务器返回TCP syn-ack消息后,Selector就能够轮训这个SocketChannel处于连接就绪状态
120             socketChannel.register(selector, SelectionKey.OP_CONNECT);
121         }
122     }
123
124     private void doWrite(SocketChannel sc) throws IOException{
125         byte[] req = "QUERY TIME ORDER".getBytes();
126         ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
127         writeBuffer.put(req);
128         writeBuffer.flip();
129         sc.write(writeBuffer);
130         if(!writeBuffer.hasRemaining()){
131             System.out.println("Send order 2 server succeed.");
132         }
133     }
134
135 }

TimeServer:

 1 package nio;
 2
 3 import java.io.IOException;
 4
 5 public class TimeServer {
 6
 7     public static void main(String[] args) throws IOException{
 8         int port = 8080;
 9         if(args != null && args.length >0){
10             try{
11                 port = Integer.valueOf(args[0]);
12             }catch(NumberFormatException e){
13                 //采用默认值
14             }
15         }
16         //多路复用类,是一个独立的线程,负责轮训多路复用器Selctor,处理多个客户端的并发接入。
17         MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
18         new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();
19         }
20 }

MultiplexerTimeServer:

  1 package nio;
  2
  3 import java.io.IOException;
  4 import java.net.InetSocketAddress;
  5 import java.nio.ByteBuffer;
  6 import java.nio.channels.SelectionKey;
  7 import java.nio.channels.Selector;
  8 import java.nio.channels.ServerSocketChannel;
  9 import java.nio.channels.SocketChannel;
 10 import java.util.Iterator;
 11 import java.util.Set;
 12
 13 public class MultiplexerTimeServer implements Runnable {
 14
 15     private Selector selector;
 16
 17     private ServerSocketChannel servChannel;
 18
 19     private volatile boolean stop;
 20
 21     public MultiplexerTimeServer(int port){
 22         try{
 23
 24             selector = Selector.open();
 25             servChannel.configureBlocking(false);
 26             //将ServerSocketChannel 设置为异步非阻塞,backlog设置为1024
 27             servChannel.socket().bind(new InetSocketAddress(port),1024);
 28             //将ServerSocket Channel注册到Selector,监听SelectionKey.OP_ACCEPT操作位,如果初始化失败,则退出
 29             servChannel.register(selector,SelectionKey.OP_ACCEPT);
 30             System.out.println("The time server is start in port:" + port);
 31         }catch(IOException e){
 32             e.printStackTrace();
 33             System.exit(1);
 34         }
 35     }
 36
 37     public void stop(){
 38         this.stop = true;
 39     }
 40
 41     public void run() {
 42         while(!stop){
 43             try{
 44                 //遍历时间设置1秒,每隔一秒唤醒一次,当有处于就绪状态的Channel时,selector将返回就绪状态的Channel的SelectionKey集合
 45                 selector.select(1000);
 46                 Set<SelectionKey> selectedKeys = selector.selectedKeys();
 47                 Iterator<SelectionKey> it = selectedKeys.iterator();
 48                 SelectionKey key = null;
 49                 //通过对就绪状态的Channel集合进行迭代,可以进行网络的异步读写操作
 50                 while(it.hasNext()){
 51                     key = it.next();
 52                     it.remove();
 53                     try{
 54                         handleInput(key);
 55                     }catch(Exception e){
 56                         if(key != null){
 57                             key.cancel();
 58                             if(key.channel() != null){
 59                                 key.channel().close();
 60                             }
 61                         }
 62                     }
 63                 }
 64             }catch(Throwable t){
 65                 t.printStackTrace();
 66             }
 67         }
 68
 69         //多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
 70         if(selector != null){
 71             try{
 72                 selector.close();
 73             }catch(IOException e){
 74                 e.printStackTrace();
 75             }
 76         }
 77     }
 78
 79     //处理新接入的请求消息
 80     private void handleInput(SelectionKey key) throws IOException{
 81         if(key.isValid()){
 82
 83             //根据SelectionKey的操作位进行判断即可获知网络事件的类型,通过accept接收客户端的连接请求并创建SocketChannel实例,完成上述操作相当于
 84             //完成了TCP的三次握手,TCP物理链路正式建立
 85             if(key.isAcceptable()){
 86                 ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
 87                 SocketChannel sc = ssc.accept();
 88                 sc.configureBlocking(false);
 89                 //Add the new connection tothe selector
 90                 sc.register(selector, SelectionKey.OP_READ);
 91             }
 92
 93             if(key.isReadable()){
 94                 //Read the data
 95
 96                 SocketChannel sc = (SocketChannel)key.channel();
 97                 ByteBuffer readBuffer = ByteBuffer.allocate(1024);
 98                 int readBytes = sc.read(readBuffer);
 99                 if(readBytes > 0){
100                     //将缓冲区当前的limit设置为position,position设置为0,用于后续对缓冲区的读取操作
101                     readBuffer.flip();
102                     byte[] bytes = new byte[readBuffer.remaining()];
103                     readBuffer.get(bytes);
104                     String body = new String(bytes,"UTF-8");
105                     System.out.println("The time server receive order: + body");
106                     String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER";
107                     doWrite(sc,currentTime);
108                 }else if(readBytes < 0){
109                     //对端链路关闭
110                     key.cancel();
111                     sc.close();
112                 }else{
113                     ; //读到0字节,忽略
114                 }
115             }
116         }
117     }
118
119     private void doWrite(SocketChannel channel,String response) throws IOException{
120         if(response != null && response.trim().length() >0){
121             byte[] bytes = response.getBytes();
122             ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
123             writeBuffer.put(bytes);
124             writeBuffer.flip();
125             channel.write(writeBuffer);
126         }
127     }
128 }
时间: 2024-08-25 16:01:08

NIO源码阅读的相关文章

JDK 源码 阅读 - 2 - 设计模式 - 创建型模式

A.创建型模式 抽象工厂(Abstract Factory) javax.xml.parsers.DocumentBuilderFactory DocumentBuilderFactory通过FactoryFinder实例化具体的Factory. 使用例子: DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance(); DocumentBuilder docBuilder = docBuilder

Java源码阅读

源码阅读目的是为了了解Java原理,学习优秀的类设计,整体阅读顺序和侧重主要参考基础类和常用类,参考网上整体归纳如下: 包 java.lang 1) Object 1 2) String 1 3) AbstractStringBuilder 1 4) StringBuffer 1 5) StringBuilder 1 6) Boolean 2 7) Byte 2 8) Double 2 9) Float 2 10) Integer 2 11) Long 2 12) Short 2 13) Thr

CI框架源码阅读笔记3 全局函数Common.php

从本篇开始,将深入CI框架的内部,一步步去探索这个框架的实现.结构和设计. Common.php文件定义了一系列的全局函数(一般来说,全局函数具有最高的加载优先权,因此大多数的框架中BootStrap引导文件都会最先引入全局函数,以便于之后的处理工作). 打开Common.php中,第一行代码就非常诡异: if ( ! defined('BASEPATH')) exit('No direct script access allowed'); 上一篇(CI框架源码阅读笔记2 一切的入口 index

淘宝数据库OceanBase SQL编译器部分 源码阅读--生成逻辑计划

body, td { font-family: tahoma; font-size: 10pt; } 淘宝数据库OceanBase SQL编译器部分 源码阅读--生成逻辑计划 SQL编译解析三部曲分为:构建语法树,生成逻辑计划,指定物理执行计划.第一步骤,在我的上一篇博客淘宝数据库OceanBase SQL编译器部分 源码阅读--解析SQL语法树里做了介绍,这篇博客主要研究第二步,生成逻辑计划. 一. 什么是逻辑计划?我们已经知道,语法树就是一个树状的结构组织,每个节点代表一种类型的语法含义.如

JDK部分源码阅读与理解

本文为博主原创,允许转载,但请声明原文地址:http://www.coselding.cn/article/2016/05/31/JDK部分源码阅读与理解/ 不喜欢重复造轮子,不喜欢贴各种东西.JDK代码什么的,让整篇文章很乱...JDK源码谁都有,没什么好贴的...如果你没看过JDK源码,建议打开Eclipse边看源码边看这篇文章,看过的可以把这篇文章当成是知识点备忘录... JDK容器类中有大量的空指针.数组越界.状态异常等异常处理,这些不是重点,我们关注的应该是它的一些底层的具体实现,这篇

如何阅读Java源码 阅读java的真实体会

刚才在论坛不经意间,看到有关源码阅读的帖子.回想自己前几年,阅读源码那种兴奋和成就感(1),不禁又有一种激动. 源码阅读,我觉得最核心有三点:技术基础+强烈的求知欲+耐心. 说到技术基础,我打个比方吧,如果你从来没有学过Java,或是任何一门编程语言如C++,一开始去啃<Core Java>,你是很难从中吸收到营养的,特别是<深入Java虚拟机>这类书,别人觉得好,未必适合现在的你. 虽然Tomcat的源码很漂亮,但我绝不建议你一开始就读它.我文中会专门谈到这个,暂时不展开. 强烈

Memcache-Java-Client-Release源码阅读(之七)

一.主要内容 本章节的主要内容是介绍Memcache Client的Native,Old_Compat,New_Compat三个Hash算法的应用及实现. 二.准备工作 1.服务器启动192.168.0.106:11211,192.168.0.106:11212两个服务端实例. 2.示例代码: String[] servers = { "192.168.0.106:11211", "192.168.0.106:11212" }; SockIOPool pool =

源码阅读笔记 - 1 MSVC2015中的std::sort

大约寒假开始的时候我就已经把std::sort的源码阅读完毕并理解其中的做法了,到了寒假结尾,姑且把它写出来 这是我的第一篇源码阅读笔记,以后会发更多的,包括算法和库实现,源码会按照我自己的代码风格格式化,去掉或者展开用于条件编译或者debug检查的宏,依重要程度重新排序函数,但是不会改变命名方式(虽然MSVC的STL命名实在是我不能接受的那种),对于代码块的解释会在代码块前(上面)用注释标明. template<class _RanIt, class _Diff, class _Pr> in

CI源码阅读

CodeIgniter源码分析 http://calixwu.com/2014/11/codeigniter-yuanmafenxi.html CI框架源码阅读笔记 http://www.cnblogs.com/ohmygirl/p/4052686.html