Java中的BIO、NIO、AIO-3

Java中的BIO、NIO、AIO-3

java

这一篇是代码篇,敲代码有助于理解记忆这些抽象的东西:

参考资料:

目录

  • Java BIO代码

    • 服务器
    • 客户端
  • Java NIO
    • 简介
    • 缓冲区Buffer
    • 通道Channel
    • 多路复用器
    • 服务器端代码
    • 客户端代码
  • Java AIO
    • server端代码
    • 客户端

Java BIO代码

服务器

  1. package sock;



  2. import java.io.BufferedReader; 

  3. import java.io.IOException; 

  4. import java.io.InputStreamReader; 

  5. import java.io.PrintWriter; 

  6. import java.net.ServerSocket; 

  7. import java.net.Socket; 

  8. import java.util.ArrayList; 

  9. import java.util.LinkedList; 

  10. import java.util.List; 




  11. /* 

  12. 存在多线程对数据结构并发操作不安全的问题 

  13. */ 

  14. public class socketServerT extends ServerSocket{ 

  15. private static final int port = 2018; 

  16. private static boolean isPrint = false; 

  17. private static List<String> user_list = new ArrayList(); 

  18. private static List<ServerThread> threadlist = new ArrayList<>();//应该使用线程安全的集合 

  19. private static LinkedList<String> message = new LinkedList<>(); 



  20. socketServerT() throws IOException{ 

  21. super(port); 

  22. new PrintOutThread(); 

  23. System.out.println( " server is created"); 

  24. try{ 

  25. while(true){ 

  26. Socket sock = this.accept(); 

  27. new ServerThread(sock); 



  28. }catch( Exception e){ 

  29. e.printStackTrace(); 





  30. class PrintOutThread extends Thread{ 

  31. PrintOutThread(){ 

  32. System.out.println(getName() + "!!!!!"); 

  33. start(); 



  34. public void run(){ 

  35. while(true){ 

  36. if(isPrint){ 

  37. String m = message.getFirst(); 

  38. for(ServerThread i : threadlist){ 

  39. sendMessage(i,m); 



  40. message.removeFirst(); 

  41. isPrint = message.size()>0 ? true:false; 









  42. class ServerThread extends Thread{ 

  43. private BufferedReader rec; 

  44. private PrintWriter send; 

  45. private Socket client; 

  46. private String name; 


  47. ServerThread(Socket sock) throws IOException{ 

  48. client = sock; 

  49. rec = new BufferedReader(new InputStreamReader(client.getInputStream())); 

  50. send = new PrintWriter(client.getOutputStream(),true); 

  51. //rec.readLine(); 

  52. System.out.println(getName() + "is created"); 

  53. send.println("connected to chat room, please input your name!!"); 

  54. start(); 



  55. public PrintWriter getSend(){ 

  56. return send; 



  57. public void run(){ 

  58. try{ 

  59. int flag = 0; 

  60. String line = ""; 

  61. while(!line.contains("bye")){ 

  62. line = rec.readLine(); 

  63. if("showuser".equals(line)){ 

  64. send.println(listOneUsers()); 

  65. //line = rec.readLine(); 

  66. continue; 



  67. if(flag == 0){ 

  68. flag ++; 

  69. name = line; 

  70. user_list.add(name); 

  71. threadlist.add(this); 

  72. send.println(name + " begin to chat"); 

  73. pushMessage("client <" + name+"> enter chat room"); 

  74. }else{ 

  75. pushMessage("client <" + name+"> say :" + line); 



  76. //line = rec.readLine(); 




  77. }catch (Exception e){ 

  78. e.printStackTrace(); 

  79. }finally { 

  80. try{ 

  81. client.close(); 

  82. rec.close(); 

  83. send.close(); 

  84. }catch (IOException e){ 

  85. e.printStackTrace(); 



  86. threadlist.remove(this); 

  87. user_list.remove(name); 

  88. pushMessage("client <" + name+"> exit"); 








  89. public void pushMessage(String mess){ 

  90. message.add(mess); 

  91. isPrint = true; 



  92. public String listOneUsers(){ 

  93. StringBuffer s = new StringBuffer(); 

  94. s.append("---online users---\n"); 

  95. for( String i:user_list){ 

  96. s.append(i + "\n"); 



  97. s.append("---end---\n"); 

  98. return s.toString(); 



  99. public void sendMessage(ServerThread s,String m){ 

  100. //System.out.println("test"); 

  101. PrintWriter p = s.getSend(); 

  102. p.println(m); 

  103. //p.flush(); 



  104. public static void main(String args[]){ 

  105. try{ 

  106. socketServerT s = new socketServerT(); 

  107. }catch(Exception e){ 

  108. e.printStackTrace(); 








客户端

  1. package sock;



  2. import java.io.BufferedReader; 

  3. import java.io.IOException; 

  4. import java.io.InputStreamReader; 

  5. import java.io.PrintWriter; 

  6. import java.net.Socket; 


  7. public class socketClientT extends Socket{ 

  8. private static final String server = "127.0.0.1"; 

  9. private static final int port = 2018; 


  10. private Socket sock; 

  11. private PrintWriter send; 

  12. private BufferedReader rec; 

  13. socketClientT() throws IOException{ 

  14. super(server,port); 

  15. sock = this; 

  16. send = new PrintWriter(sock.getOutputStream(),true); 

  17. rec = new BufferedReader(new InputStreamReader(sock.getInputStream())); 

  18. Thread t = new recvThread(); 

  19. BufferedReader sysBuff = new BufferedReader(new InputStreamReader(System.in)); 

  20. String line = ""; 

  21. while(! line.contains("bye")){ 

  22. line = sysBuff.readLine(); 

  23. send.println(line); 



  24. send.close(); 

  25. rec.close(); 

  26. sysBuff.close(); 

  27. this.close(); 



  28. class recvThread extends Thread{ 

  29. private BufferedReader buff; 

  30. recvThread(){ 

  31. try { 

  32. buff = new BufferedReader(new InputStreamReader(sock.getInputStream())); 

  33. start(); 

  34. } catch (Exception e){ 

  35. e.printStackTrace(); 





  36. public void run(){ 

  37. String res = ""; 

  38. try{ 

  39. while(true){ 

  40. res = buff.readLine(); 

  41. if(res.contains("bye")) 

  42. break; 

  43. System.out.println(res); 



  44. send.close(); 

  45. buff.close(); 

  46. sock.close(); 

  47. }catch (Exception e){ 

  48. e.printStackTrace(); 








  49. public static void main(String args[]){ 

  50. try { 

  51. socketClientT s = new socketClientT(); 

  52. }catch (Exception e){ 

  53. e.printStackTrace(); 








Java NIO

JDK 1.4的java.util.*;包中引入了新的Java I/O库,其目的是提高IO操作的速度。

简介

NIO我们一般认为是New I/O(也是官方的叫法),因为它是相对于老的I/O类库新增的(其实在JDK 1.4中就已经被引入了,但这个名词还会继续用很久,即使它们在现在看来已经是“旧”的了,所以也提示我们在命名时,需要好好考虑),做了很大的改变。但民间跟多人称之为Non-block I/O,即非阻塞I/O,因为这样叫,更能体现它的特点。而下文中的NIO,不是指整个新的I/O库,而是非阻塞I/O。

NIO提供了与传统BIO模型中的Socket和ServerSocket相对应的SocketChannel和ServerSocketChannel两种不同的套接字通道实现。

新增的着两种通道都支持阻塞和非阻塞两种模式。

阻塞模式使用就像传统中的支持一样,比较简单,但是性能和可靠性都不好;非阻塞模式正好与之相反。

对于低负载、低并发的应用程序,可以使用同步阻塞I/O来提升开发速率和更好的维护性;对于高负载、高并发的(网络)应用,应使用NIO的非阻塞模式来开发。

下面会先对基础知识进行介绍。

缓冲区Buffer

Buffer是一个对象,包含一些要写入或者读出的数据。

在NIO库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的;在写入数据时,也是写入到缓冲区中。任何时候访问NIO中的数据,都是通过缓冲区进行操作。

缓冲区实际上是一个数组,并提供了对数据结构化访问以及维护读写位置等信息。

具体的缓存区有这些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他们实现了相同的接口:Buffer。

通道Channel

我们对数据的读取和写入要通过Channel,它就像水管一样,是一个通道。通道不同于流的地方就是通道是双向的,可以用于读、写和同时读写操作。

底层的操作系统的通道一般都是全双工的,所以全双工的Channel比流能更好的映射底层操作系统的API。

Channel主要分两大类:

  • SelectableChannel:用户网络读写
  • FileChannel:用于文件操作

后面代码会涉及的ServerSocketChannel和SocketChannel都是SelectableChannel的子类。

多路复用器

Selector是Java  NIO 编程的基础。

Selector提供选择已经就绪的任务的能力:Selector会不断轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。

一个Selector可以同时轮询多个Channel,因为JDK使用了epoll()代替传统的select实现,所以没有最大连接句柄1024/2048的限制。所以,只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端。

服务器端代码

server:

  1. /**


  2. * server 

  3. */ 

  4. public class server { 


  5. private static int port = 8000; 

  6. private static serverHandle sHandle; 


  7. public static void start() { 

  8. start(port); 



  9. private static synchronized void start(int port) { 

  10. if (sHandle != null) { 

  11. sHandle.setStarted(false); 



  12. sHandle = new serverHandle(port); 

  13. new Thread(sHandle,"server").start(); 




  14. public static void main(String[] args) { 

  15. start(); 





serverHandle

  1. import java.io.IOException;


  2. import java.net.InetSocketAddress; 

  3. import java.net.ServerSocket; 

  4. import java.nio.ByteBuffer; 

  5. import java.nio.channels.SelectionKey; 

  6. import java.nio.channels.Selector; 

  7. import java.nio.channels.ServerSocketChannel; 

  8. import java.nio.channels.SocketChannel; 

  9. import java.util.Iterator; 

  10. import java.util.Set; 


  11. //import jdk.internal.org.objectweb.asm.Handle; 


  12. /** 

  13. * serverHandle 

  14. */ 

  15. public class serverHandle implements Runnable{ 


  16. private ServerSocketChannel serverChannel; 

  17. private Selector selector; 

  18. private volatile boolean started;//各个线程都能看到状态 


  19. public serverHandle(int port) { 

  20. try { 

  21. //创建选择器 

  22. selector = Selector.open(); 

  23. //创建serverSocketChannel 

  24. serverChannel = ServerSocketChannel.open(); 

  25. //如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式 

  26. serverChannel.configureBlocking(false); 

  27. //创建InetSocketAddress 

  28. InetSocketAddress socketAddress = new InetSocketAddress(port); 

  29. //得到ServerSocket 

  30. ServerSocket serverSocket = serverChannel.socket(); 

  31. //绑定ServetSocket到一个具体的端口,并设置backlog 

  32. serverSocket.bind(socketAddress, 1024); 

  33. //向selector注册ServerSocketChannel,设置为监听客户端的连接请求 

  34. serverChannel.register(selector, SelectionKey.OP_ACCEPT); 

  35. //标记服务器状态 

  36. started = true; 

  37. System.out.println("服务器已经启动,端口号:" + port); 


  38. } catch (IOException e) { 

  39. //TODO: handle exception 

  40. e.printStackTrace(); 

  41. System.exit(1); 

  42. }  




  43. public void setStarted(boolean flag) { 

  44. this.started = flag; 



  45. @Override 

  46. public void run() { 

  47. //循环遍历selector 

  48. while (started) { 

  49. try { 

  50. //无论是否有读写事件,selector每个1s唤醒一次 


  51. try { 

  52. selector.select(1000); 

  53. } catch (Exception e) { 

  54. e.printStackTrace(); 



  55. //获得状态为ready的selectorkey 

  56. Set<SelectionKey> keys = selector.selectedKeys(); 

  57. Iterator<SelectionKey> iter = keys.iterator(); 

  58. SelectionKey key = null; 

  59. while (iter.hasNext()) { 

  60. key = iter.next(); 

  61. iter.remove(); 

  62. try { 

  63. handle(key); 

  64. } catch (Exception e) { 


  65. if (key != null) { 

  66. key.cancel(); 

  67. if (key.channel() != null) { 

  68. key.channel().close(); 









  69. } catch (Throwable t) { 


  70. t.printStackTrace(); 





  71. //关闭selector 

  72. if (selector != null) { 

  73. try { 

  74. selector.close(); 

  75. } catch (Exception e) { 


  76. e.printStackTrace(); 







  77. private void handle(SelectionKey key) throws IOException { 

  78. //判断kei是否是有效的 

  79. if (key.isValid()) { 


  80. //处理新接入的请求消息, 

  81. if (key.isAcceptable()) { 

  82. //通过selectionkey得到ServerSocketChannel,注意转型 

  83. ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); 

  84. //通过serversocketchannel的accept方法创建SocketChannel实例 

  85. SocketChannel client = serverSocketChannel.accept(); 

  86. //设置client为非阻塞模式 

  87. client.configureBlocking(false); 

  88. //把client注册到selector,注册事件为读 

  89. client.register(selector, SelectionKey.OP_READ); 




  90. //读消息 

  91. if (key.isReadable()) { 

  92. SocketChannel sc = (SocketChannel) key.channel(); 

  93. //创建byteBuffer,大小为1M 

  94. ByteBuffer buffer = ByteBuffer.allocate(1024); 

  95. //读取请求码流,返回读取到的字节数 

  96. int readBytes = sc.read(buffer); 

  97. //读取到字节,对字节进行编码 

  98. if (readBytes > 0) { 

  99. //将缓冲区buffer的position设为0,用于后续对缓冲区的读操作 

  100. buffer.flip(); 

  101. //根据缓冲区可读字节数创建字节数组 

  102. byte[] bytes = new byte[buffer.remaining()]; 

  103. //将缓冲区可读字节数组复制到新建的数组中 

  104. buffer.get(bytes); 

  105. String expresString = new String(bytes, "UTF-8"); 

  106. System.out.println("服务器收到的消息:" + expresString); 


  107. //处理数据 

  108. String res = null; 

  109. res = new StringBuffer(expresString).reverse().toString(); 

  110. //写入返回消息 

  111. dowrite(sc,res); 

  112. } else if (readBytes < 0) { 

  113. //链路关闭释放资源 

  114. key.cancel(); 

  115. sc.close(); 









  116. private void dowrite(SocketChannel sc, String res) throws IOException { 

  117. //把字符串编码为字节数组 

  118. byte[] bytes = res.getBytes(); 

  119. //根据数组容量创建ByteBuffer 

  120. ByteBuffer wBuffer = ByteBuffer.allocate(bytes.length); 

  121. //把字节数组复制到buffer中 

  122. wBuffer.put(bytes); 

  123. //flip操作,更改position为0,方便后续的写操作从头开始 

  124. wBuffer.flip(); 

  125. //发送缓冲区的数据 

  126. sc.write(wBuffer); 





客户端代码

client

  1. import java.util.Scanner;



  2. /** 

  3. * clientChannel 

  4. */ 

  5. public class clientChannel { 

  6. private static String host = "127.0.0.1"; 

  7. private static int port = 8000; 

  8. private static clientHandle cHandle; 


  9. public static void start() { 

  10. start(host,port); 



  11. public static synchronized void start(String host, int port) { 

  12. if (cHandle != null) { 

  13. cHandle.stop(); 



  14. cHandle = new clientHandle(host, port); 

  15. new Thread(cHandle, "client").start();; 




  16. public static Boolean sendMsg(String msg) throws Exception { 

  17. if (msg.contains("q")) { 

  18. return false; 



  19. cHandle.sendMsg(msg); 

  20. return true; 



  21. public static void main(String[] args) { 

  22. try { 

  23. start(); 

  24. Scanner s = new Scanner(System.in); 

  25. String tmp; 

  26. while ((tmp = s.nextLine())!= null) { 

  27. sendMsg(tmp); 



  28. } catch (Exception e) { 

  29. //TODO: handle exception 

  30. e.printStackTrace(); 



  31. //start(); 





clientHandle

  1. import java.net.InetSocketAddress;


  2. import java.nio.ByteBuffer; 

  3. import java.nio.channels.SelectionKey; 

  4. import java.nio.channels.Selector; 

  5. import java.nio.channels.SocketChannel; 

  6. import java.util.Iterator; 

  7. import java.util.Set; 

  8. import java.io.IOException; 

  9. /** 

  10. * clientHandle 

  11. */ 

  12. public class clientHandle implements Runnable{ 


  13. private String host; 

  14. private int port; 

  15. private Selector selector; 

  16. private SocketChannel socketChannel; 

  17. private volatile boolean started; 


  18. public clientHandle(String ip, int port) { 

  19. this.host = ip; 

  20. this.port = port; 


  21. try { 

  22. //创建选择器 

  23. this.selector = Selector.open(); 

  24. //创建socketchannel 

  25. this.socketChannel = SocketChannel.open(); 

  26. //配置socketChannel为非阻塞模式 

  27. this.socketChannel.configureBlocking(false); 

  28. this.started = true; 


  29. } catch (Exception e) { 

  30. //TODO: handle exception 

  31. e.printStackTrace(); 

  32. System.exit(1); 







  33. public void stop(){ 

  34. this.started = false; 




  35. public void doConnection() throws Exception{ 

  36. InetSocketAddress address = new InetSocketAddress(this.host, this.port); 

  37. if (socketChannel.connect(address)) { 

  38. System.out.println("连接服务器成功!!!"); 

  39. } else { 

  40. System.out.println("未连接成功,下一轮继续!!!"); 

  41. //向selector注册socketChannel的连接操作 

  42. socketChannel.register(selector, SelectionKey.OP_CONNECT); 






  43. public void handleInput(SelectionKey key) throws Exception{ 

  44. if (key.isValid()) { 

  45. //获取SeclectionKey对应的socketChannel 

  46. SocketChannel sc = (SocketChannel) key.channel(); 

  47. //测试key对应的socketChannel通道是否已完成或未能完成其套接连接操作 

  48. if (key.isConnectable()) { 

  49. //完成连接过程,返回true表示channel已经建立了连接,false表示建立连接失败 

  50. //当使用SocketChannel的connect()函数进行连接的时候,当处于非阻塞模式的情况下,可能连接不是立刻完成的,需要使用 

  51. //finidhConnect()来检查连接是否建立 

  52. if (sc.finishConnect()) { 

  53. } else { 

  54. System.exit(1); 

  55. }  



  56. //读消息,判断key对应的channel是否可读 

  57. if (key.isReadable()) { 

  58. //创建一个缓冲区,用来存读取的数据, 

  59. ByteBuffer buffer = ByteBuffer.allocate(1024); 

  60. //读取数据,返回读取的字节数 

  61. int readSize = sc.read(buffer); 

  62. //读取到字节,对字节进行编码 

  63. if (readSize > 0) { 

  64. //设置缓冲区的limit和position,方面后面读取数据 

  65. buffer.flip(); 

  66. //根据缓冲区的可读字节数创建字节数组 

  67. byte[] bytes = new byte[buffer.remaining()]; 

  68. //把缓冲区的内容复制到字节数组中去 

  69. buffer.get(bytes); 

  70. String res = new String(bytes, "UTF-8"); 

  71. System.out.println("客户端收到的数据为:" + res); 

  72. } else if (readSize < 0) { 

  73. //这种情况说明链路已经关闭,释放资源 

  74. key.cancel(); 

  75. sc.close(); 










  76. public void doWrite(SocketChannel sc, String request) throws Exception { 

  77. //将数据转换为字节数组 

  78. byte[] bytes = request.getBytes(); 

  79. //创建字节缓冲区 

  80. ByteBuffer buffer = ByteBuffer.allocate(bytes.length); 

  81. //将字节数组放入字节缓冲区 

  82. buffer.put(bytes); 

  83. //flip操作,调整limit和position 

  84. buffer.flip(); 

  85. //将数据写入到channel中 

  86. sc.write(buffer); 



  87. public void sendMsg(String msg) throws Exception{ 

  88. //这里还没明白为什么要先注册读操作?需要注册读操作才能,知道读状态是否就绪,方便handelInput函数处理!! 

  89. //但是还有一个疑问,什么时候使用OP_WRITE 

  90. socketChannel.register(selector, SelectionKey.OP_READ); 

  91. doWrite(socketChannel, msg); 



  92. @Override  

  93. public void run() {  

  94. try{  

  95. doConnection();  

  96. }catch(Exception e){  

  97. e.printStackTrace();  

  98. System.exit(1);  

  99. }  

  100. //循环遍历selector  

  101. while(started){  

  102. try{  

  103. //无论是否有读写事件发生,selector每隔1s被唤醒一次  

  104. selector.select(1000);  

  105. //阻塞,只有当至少一个注册的事件发生的时候才会继续.  

  106. // selector.select();  

  107. Set<SelectionKey> keys = selector.selectedKeys();  

  108. Iterator<SelectionKey> it = keys.iterator();  

  109. SelectionKey key = null;  

  110. while(it.hasNext()){  

  111. key = it.next();  

  112. it.remove();  

  113. try{  

  114. handleInput(key);  

  115. }catch(Exception e){  

  116. if(key != null){  

  117. key.cancel();  

  118. if(key.channel() != null){  

  119. key.channel().close();  

  120. }  

  121. }  

  122. }  

  123. }  

  124. }catch(Exception e){  

  125. e.printStackTrace();  

  126. System.exit(1);  

  127. }  

  128. }  

  129. //selector关闭后会自动释放里面管理的资源  

  130. if(selector != null)  

  131. try{  

  132. selector.close();  

  133. }catch (Exception e) {  

  134. e.printStackTrace();  

  135. }  





##测试及解析

测试代码:

  1. import java.util.Scanner;



  2. /** 

  3. * Test 

  4. */ 

  5. public class Test { 


  6. public static void main(String[] args) { 

  7. server.start(); 

  8. try { 

  9. Thread.sleep(3000); 

  10. } catch (Exception e) { 

  11. //TODO: handle exception 

  12. e.printStackTrace(); 



  13. clientChannel.start(); 

  14. Scanner s = new Scanner(System.in); 

  15. String tmp; 

  16. try { 

  17. while ((tmp = s.nextLine()) != null) { 

  18. clientChannel.sendMsg(tmp); 



  19. } catch (Exception e) { 

  20. //TODO: handle exception 

  21. e.printStackTrace(); 









可以看到,创建NIO服务端的主要步骤如下:

  1. 打开ServerSocketChannel,监听客户端连接
  2. 绑定监听端口,设置连接为非阻塞模式
  3. 创建Reactor线程,创建多路复用器并启动线程
  4. 将ServerSocketChannel注册到Reactor线程中的Selector上,监听ACCEPT事件
  5. Selector轮询准备就绪的key
  6. Selector监听到新的客户端接入,处理新的接入请求,完成TCP三次握手,简历物理链路
  7. 设置客户端链路为非阻塞模式
  8. 将新接入的客户端连接注册到Reactor线程的Selector上,监听读操作,读取客户端发送的网络消息
  9. 异步读取客户端消息到缓冲区
  10. 对Buffer编解码,处理半包消息,将解码成功的消息封装成Task
  11. 将应答消息编码为Buffer,调用SocketChannel的write将消息异步发送给客户端

Java AIO

Java nio 2.0的主要改进就是引入了异步IO(包括文件和网络),这里主要介绍下异步网络IO API的使用以及框架的设计,以TCP服务端为例。首先看下为了支持AIO引入的新的类和接口:

** java.nio.channels.AsynchronousChannel**

标记一个channel支持异步IO操作。

** java.nio.channels.AsynchronousServerSocketChannel**

ServerSocket的aio版本,创建TCP服务端,绑定地址,监听端口等。

** java.nio.channels.AsynchronousSocketChannel**

面向流的异步socket channel,表示一个连接。

** java.nio.channels.AsynchronousChannelGroup**

异步channel的分组管理,目的是为了资源共享。一个AsynchronousChannelGroup绑定一个线程池,这个线程池执行两个任务:处理IO事件和派发CompletionHandlerAsynchronousServerSocketChannel创建的时候可以传入一个AsynchronousChannelGroup,那么通过AsynchronousServerSocketChannel创建的AsynchronousSocketChannel将同属于一个组,共享资。

** java.nio.channels.CompletionHandler**

异步IO操作结果的回调接口,用于定义在IO操作完成后所作的回调工作。AIO的API允许两种方式来处理异步操作的结果:返回的Future模式或者注册CompletionHandler,推荐用CompletionHandler的方式,这些handler的调用是由AsynchronousChannelGroup的线程池派发的。显然,线程池的大小是性能的关键因素AsynchronousChannelGroup允许绑定不同的线程池,通过三个静态方法来创建:

 public static AsynchronousChannelGroup withFixedThreadPool(int nThreads, ThreadFactory threadFactory) throws IOException

 public static AsynchronousChannelGroup withCachedThreadPool(ExecutorService executor, int initialSize)

 public static AsynchronousChannelGroup withThreadPool(ExecutorService executor) throws IOException

需要根据具体应用相应调整,从框架角度出发,需要暴露这样的配置选项给用户。

在介绍完了aio引入的TCP的主要接口和类之后,我们来设想下一个aio框架应该怎么设计。参考非阻塞nio框架的设计,一般都是采用Reactor模式,Reactor负责事件的注册、select、事件的派发;相应地,异步IO有个Proactor模式,Proactor负责CompletionHandler的派发,查看一个典型的IO写操作的流程来看两者的区别:

Reactor:  send(msg) -> 消息队列是否为空,如果为空  -> 向Reactor注册OP_WRITE,然后返回 -> Reactor select -> 触发Writable,通知用户线程去处理 ->先注销Writable(很多人遇到的cpu 100%的问题就在于没有注销),处理Writeable,如果没有完全写入,继续注册OP_WRITE。注意到,写入的工作还是用户线程在处理。

Proactor: send(msg) -> 消息队列是否为空,如果为空,发起read异步调用,并注册CompletionHandler,然后返回。 -> 操作系统负责将你的消息写入,并返回结果(写入的字节数)给Proactor -> Proactor派发CompletionHandler。可见,写入的工作是操作系统在处理,无需用户线程参与。事实上在aio的API中,AsynchronousChannelGroup就扮演了Proactor的角色。

CompletionHandler有三个方法,分别对应于处理成功、失败、被取消(通过返回的Future)情况下的回调处理:

public interface CompletionHandler<V,A> {

     void completed(V result, A attachment);

    void failed(Throwable exc, A attachment);

    void cancelled(A attachment);
}

其中的泛型参数V表示IO调用的结果,而A是发起调用时传入的attchment。

server端代码

server

  1. //package aio;



  2. public class Server { 

  3. public static int clientCount = 0; 

  4. public static int port = 8000; 

  5. public static String hoString = "127.0.0.1"; 


  6. public static void start() { 

  7. start(Server.port); 



  8. public static void start(int port) { 

  9. AsyncServerHandler serverHandler = new AsyncServerHandler(port); 

  10. Thread t1 = new Thread(serverHandler); 

  11. t1.start(); 



  12. public static void main(String[] args) { 

  13. start(); 






AsyncServerHandler

  1. //package aio;



  2. import java.io.IOException; 

  3. import java.net.InetSocketAddress; 

  4. import java.nio.channels.AsynchronousServerSocketChannel; 

  5. import java.nio.channels.CompletionHandler; 

  6. import java.util.concurrent.CountDownLatch; 


  7. public class AsyncServerHandler implements Runnable{ 

  8. private AsynchronousServerSocketChannel serverSocketChannel; 

  9. private CountDownLatch latch; 

  10. public AsyncServerHandler(int port) { 

  11. // TODO Auto-generated constructor stub 

  12. InetSocketAddress address = new InetSocketAddress(port); 

  13. try { 

  14. serverSocketChannel = AsynchronousServerSocketChannel.open(); 

  15. serverSocketChannel.bind(address); 

  16. } catch (IOException e) { 

  17. // TODO Auto-generated catch block 

  18. e.printStackTrace(); 



  19. System.out.println("服务器已经启动,端口号:" + port); 




  20. @Override 

  21. public void run() { 

  22. // TODO Auto-generated method stub 

  23. //CountDownLatch初始化  

  24. //它的作用:在完成一组正在执行的操作之前,允许当前的现场一直阻塞  

  25. //此处,让现场在此阻塞,防止服务端执行完成后退出  

  26. //也可以使用while(true)+sleep  

  27. //生成环境就不需要担心这个问题,以为服务端是不会退出的  

  28. this.latch = new CountDownLatch(1); 

  29. serverSocketChannel.accept(this, new AcceptHandler(this.latch)); 


  30. try { 

  31. latch.await(); 

  32. } catch (InterruptedException e) { 

  33. // TODO Auto-generated catch block 

  34. e.printStackTrace(); 






  35. public AsynchronousServerSocketChannel getServerSocketChannel() { 

  36. return serverSocketChannel; 




  37. public void setServerSocketChannel(AsynchronousServerSocketChannel serverSocketChannel) { 

  38. this.serverSocketChannel = serverSocketChannel; 




  39. public CountDownLatch getLatch() { 

  40. return latch; 




  41. public void setLatch(CountDownLatch latch) { 

  42. this.latch = latch; 







AcceptHandler

  1. //package aio;



  2. import java.nio.ByteBuffer; 

  3. import java.nio.channels.AsynchronousServerSocketChannel; 

  4. import java.nio.channels.AsynchronousSocketChannel; 

  5. import java.nio.channels.CompletionHandler; 

  6. import java.util.concurrent.CountDownLatch; 


  7. public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncServerHandler> { 


  8. private CountDownLatch latch; 

  9. public AcceptHandler(CountDownLatch latch) { 

  10. // TODO Auto-generated constructor stub 

  11. this.latch = latch; 



  12. @Override 

  13. public void completed(AsynchronousSocketChannel socketChannel, AsyncServerHandler serverHandler) { 

  14. // TODO Auto-generated method stub 

  15. //进入这个函数说明说明事件处理成功,已经成功的拿到socketChanne 

  16. Server.clientCount ++; 

  17. System.out.println("当前连接的客户数:" + Server.clientCount); 

  18. //继续接受其他客户机的连接, 

  19. AsynchronousServerSocketChannel channel = serverHandler.getServerSocketChannel(); 

  20. channel.accept(serverHandler, this); 

  21. //创建新的buffer,为读取数据做准备 

  22. ByteBuffer buffer = ByteBuffer.allocate(1024); 

  23. socketChannel.read(buffer, buffer, new serverReadHandler(socketChannel)); 





  24. @Override 

  25. public void failed(Throwable exc, AsyncServerHandler attachment) { 

  26. // TODO Auto-generated method stub 

  27. exc.printStackTrace(); 

  28. this.latch.countDown(); 








serverReadHandler

  1. //package aio;



  2. import java.io.IOException; 

  3. import java.io.UnsupportedEncodingException; 

  4. import java.nio.ByteBuffer; 

  5. import java.nio.channels.AsynchronousSocketChannel; 

  6. import java.nio.channels.CompletionHandler; 


  7. public class serverReadHandler implements CompletionHandler<Integer,ByteBuffer> { 

  8. //用于读取半包消息和应答消息 

  9. private AsynchronousSocketChannel serverChannel; 

  10. public serverReadHandler(AsynchronousSocketChannel channel) { 

  11. // TODO Auto-generated constructor stub 

  12. this.serverChannel = channel; 





  13. @Override 

  14. public void completed(Integer result, ByteBuffer buffer) { 

  15. // TODO Auto-generated method stub 

  16. //操作系统读取IO就绪之后,进入这个函数 

  17. //调整limit和position关系,方便读取 

  18. //buffer.flip(); 

  19. if (buffer.hasRemaining()) { 

  20. byte[] bytes = new byte[buffer.remaining()]; 

  21. buffer.get(bytes); 

  22. String msg = null; 

  23. try { 

  24. msg = new String(bytes, "UTF-8"); 

  25. } catch (UnsupportedEncodingException e) { 

  26. // TODO Auto-generated catch block 

  27. e.printStackTrace(); 



  28. System.out.println("服务器收到消息:" + msg); 


  29. String calResult = null; 

  30. StringBuffer stringBuffer = new StringBuffer(msg); 

  31. calResult = stringBuffer.reverse().toString(); 

  32. //向客户端发送结果 

  33. byte[] resultBytes = calResult.getBytes(); 

  34. ByteBuffer rBuffer = ByteBuffer.allocate(resultBytes.length); 

  35. rBuffer.put(resultBytes); 

  36. this.serverChannel.write(rBuffer, rBuffer, new ServerWriteHandler(this.serverChannel)); 

  37. }else { 

  38. System.out.println("服务器没有读取到数据"); 






  39. @Override 

  40. public void failed(Throwable exc, ByteBuffer buffer) { 

  41. // TODO Auto-generated method stub 

  42. try { 

  43. this.serverChannel.close(); 

  44. System.out.println("服务器socket关闭~~~"); 

  45. } catch (IOException e) { 

  46. // TODO Auto-generated catch block 

  47. e.printStackTrace(); 









serverWriteHandler

  1. //package aio;



  2. import java.io.IOException; 

  3. import java.nio.ByteBuffer; 

  4. import java.nio.channels.AsynchronousSocketChannel; 

  5. import java.nio.channels.CompletionHandler; 


  6. public class ServerWriteHandler implements CompletionHandler<Integer,ByteBuffer> { 

  7. private AsynchronousSocketChannel serverChannel; 

  8. public ServerWriteHandler(AsynchronousSocketChannel channel) { 

  9. // TODO Auto-generated constructor stub  

  10. this.serverChannel = channel; 



  11. @Override 

  12. public void completed(Integer result, ByteBuffer buffer) { 

  13. // TODO Auto-generated method stub 

  14. //调整limit和position的位置,方便下面输出使用 

  15. //buffer.flip(); 

  16. if (buffer.hasRemaining()) { 

  17. System.out.println("服务器输出数据~~~"); 

  18. buffer.clear(); 

  19. //向客户端写入数据 

  20. this.serverChannel.write(buffer, buffer, this); 

  21. } else { 

  22. //读取数据 

  23. ByteBuffer readBuffer = ByteBuffer.allocate(1024); 

  24. this.serverChannel.read(readBuffer, readBuffer, new serverReadHandler(this.serverChannel)); 





  25. @Override 

  26. public void failed(Throwable exc, ByteBuffer buffer) { 

  27. // TODO Auto-generated method stub 

  28. //出现异常关闭socketchannel 

  29. try { 

  30. this.serverChannel.close(); 

  31. } catch (IOException e) { 

  32. // TODO Auto-generated catch block 

  33. e.printStackTrace(); 









客户端

client

  1. //package aio;



  2. import java.util.Scanner; 


  3. public class client { 

  4. private static String DEFAULT_HOST = "127.0.0.1";  

  5. private static int DEFAULT_PORT = 8000;  

  6. private static AsyncClientHandler clientHandle;  

  7. public static void start(){  

  8. start(DEFAULT_HOST,DEFAULT_PORT);  

  9. }  

  10. public static synchronized void start(String ip,int port){  

  11. if(clientHandle!=null)  

  12. return;  

  13. clientHandle = new AsyncClientHandler(ip,port);  

  14. new Thread(clientHandle,"Client").start();  

  15. }  

  16. //向服务器发送消息  

  17. public static boolean sendMsg(String msg) throws Exception{  

  18. if(msg.equals("q")) return false;  

  19. clientHandle.sendMsg(msg);  

  20. return true;  

  21. }  

  22. //@SuppressWarnings("resource")  

  23. public static void main(String[] args) throws Exception{  

  24. //System.out.println("请输入请求消息:");  

  25. start();  

  26. System.out.println("请输入请求消息:");  

  27. Scanner scanner = new Scanner(System.in);  

  28. String tmp = null; 

  29. for (int i = 0; i < 10; i++) { 

  30. tmp = scanner.nextLine(); 

  31. clientHandle.sendMsg(tmp); 




  32. }  




AsyncClientHandler

  1. //package aio;



  2. import java.io.IOException; 

  3. import java.net.InetSocketAddress; 

  4. import java.nio.ByteBuffer; 

  5. import java.nio.channels.AsynchronousSocketChannel; 

  6. import java.nio.channels.CompletionHandler; 

  7. import java.util.concurrent.CountDownLatch; 


  8. public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable{ 

  9. private AsynchronousSocketChannel clientChannel; 

  10. private String host; 

  11. private int port; 

  12. private CountDownLatch latch; 


  13. public AsyncClientHandler(String host, int port) { 

  14. // TODO Auto-generated constructor stub 

  15. this.host = host; 

  16. this.port = port; 

  17. try { 

  18. //创建异步客户端通道 

  19. clientChannel = AsynchronousSocketChannel.open(); 

  20. } catch (Exception e) { 

  21. // TODO: handle exception 

  22. e.printStackTrace(); 







  23. @Override 

  24. public void run() { 

  25. // TODO Auto-generated method stub 

  26. latch = new CountDownLatch(1); 

  27. //创建InetScoketAddress 

  28. InetSocketAddress address = new InetSocketAddress(this.host, this.port); 

  29. //发起异步连接操作,回调参数是这个类本身,如果连接成功会回调completed方法 

  30. clientChannel.connect(address, this, this); 

  31. try { 

  32. latch.await(); 

  33. } catch (InterruptedException e) { 

  34. // TODO Auto-generated catch block 

  35. e.printStackTrace(); 



  36. try { 

  37. clientChannel.close(); 

  38. } catch (IOException e) { 

  39. // TODO Auto-generated catch block 

  40. e.printStackTrace(); 






  41. @Override 

  42. public void completed(Void result, AsyncClientHandler attachment) { 

  43. // TODO Auto-generated method stub 

  44. //连接服务器成功,就会调用这个函数 

  45. System.out.println("连接服务器成功!!!!"); 




  46. @Override 

  47. public void failed(Throwable exc, AsyncClientHandler attachment) { 

  48. // TODO Auto-generated method stub 

  49. //连接服务器失败,会调用这个函数 

  50. System.out.println("连接服务器失败!!!"); 

  51. exc.printStackTrace(); 


  52. try { 

  53. clientChannel.close(); 

  54. latch.countDown(); 

  55. } catch (IOException e) { 

  56. // TODO Auto-generated catch block 

  57. e.printStackTrace(); 






  58. //向服务器发送消息 

  59. public void sendMsg(String msg) { 

  60. System.out.println(msg); 

  61. byte[] bytes = msg.getBytes(); 

  62. ByteBuffer buffer = ByteBuffer.allocate(bytes.length); 

  63. buffer.put(bytes); 

  64. //进行flip操作 

  65. buffer.flip(); 

  66. //异步写 

  67. clientChannel.write(buffer, buffer, new WriteHandler(clientChannel,latch)); 











writeHandler

  1. //package aio;



  2. import java.io.IOException; 

  3. import java.nio.ByteBuffer; 

  4. import java.nio.channels.AsynchronousSocketChannel; 

  5. import java.nio.channels.CompletionHandler; 

  6. import java.util.concurrent.CountDownLatch; 


  7. public class WriteHandler implements CompletionHandler<Integer, ByteBuffer>{ 


  8. private AsynchronousSocketChannel clientChannel; 

  9. private CountDownLatch latch; 

  10. public WriteHandler(AsynchronousSocketChannel channel, CountDownLatch latch) { 

  11. // TODO Auto-generated constructor stub 

  12. this.clientChannel = channel; 

  13. this.latch = latch; 



  14. @Override 

  15. public void completed(Integer result, ByteBuffer buffer) { 

  16. // TODO Auto-generated method stub 

  17. System.out.println("发送数据成功!~~~"); 

  18. //进行flip操作 

  19. //buffer.flip(); 

  20. //判断buffer中是否有需要发送的数据,如果有数据就进行发送,如果没有就进行读取操作 

  21. if (buffer.hasRemaining()) { 

  22. System.out.println("进入写数据!!!"); 

  23. clientChannel.write(buffer, buffer, this); 

  24. System.out.println("发送数据成功~~~"); 



  25. //读取数据 

  26. System.out.println("进入读取数据"); 

  27. ByteBuffer readBuffer = ByteBuffer.allocate(1024); 

  28. clientChannel.read(readBuffer, readBuffer, new ReadHandle(clientChannel,latch)); 





  29. @Override 

  30. public void failed(Throwable exc, ByteBuffer buffer) { 

  31. // TODO Auto-generated method stub 

  32. System.err.println("发送数据失败~~~"); 

  33. try { 

  34. clientChannel.close(); 

  35. latch.countDown(); 

  36. } catch (IOException e) { 

  37. // TODO Auto-generated catch block 

  38. e.printStackTrace(); 










ReadHandle

  1. //package aio;



  2. import java.io.IOException; 

  3. import java.io.UnsupportedEncodingException; 

  4. import java.nio.ByteBuffer; 

  5. import java.nio.channels.AsynchronousSocketChannel; 

  6. import java.nio.channels.CompletionHandler; 

  7. import java.util.concurrent.CountDownLatch; 


  8. public class ReadHandle implements CompletionHandler<Integer,ByteBuffer> { 

  9. private AsynchronousSocketChannel clientChannel; 

  10. private CountDownLatch latch; 

  11. public ReadHandle(AsynchronousSocketChannel channel, CountDownLatch latch) { 

  12. // TODO Auto-generated constructor stub 

  13. this.latch = latch; 

  14. this.clientChannel = channel; 



  15. @Override 

  16. public void completed(Integer result, ByteBuffer buffer) { 

  17. // TODO Auto-generated method stub 

  18. //调整buffer的limit和position方便获取收到的数据 

  19. //buffer.flip(); 

  20. System.out.println("读取数据成功!!!"); 

  21. byte[] bytes = new byte[buffer.remaining()]; 

  22. buffer.get(bytes); 

  23. String res; 

  24. try { 

  25. res = new String(bytes, "UTF-8"); 

  26. System.out.println("收到的数据: " + res); 

  27. } catch (UnsupportedEncodingException e) { 

  28. // TODO Auto-generated catch block 

  29. e.printStackTrace(); 









  30. @Override 

  31. public void failed(Throwable exc, ByteBuffer attachment) { 

  32. // TODO Auto-generated method stub 

  33. System.err.println("读取数据失败~~~"); 


  34. try { 

  35. clientChannel.close(); 

  36. this.latch.countDown(); 

  37. } catch (IOException e) { 

  38. // TODO Auto-generated catch block 

  39. e.printStackTrace(); 










原文地址:https://www.cnblogs.com/chailinbo/p/9226651.html

时间: 2024-08-29 00:25:06

Java中的BIO、NIO、AIO-3的相关文章

JAVA中的BIO,NIO,AIO

在了解BIO,NIO,AIO之前先了解一下IO的几个概念: 1.同步 用户进程触发IO操作并等待或者轮询的去查看IO操作是否就绪, 例如自己亲自出马持银行卡到银行取钱 2.异步 用户触发IO操作以后,可以干别的事,IO操作完成以后再通知当前线程,例如让小弟去银行帮你取钱,你可以干别的事 3.阻塞 当试图进读写文件的时候,发现不可读取或没东西读,则进入等待状态知道可读,ATM排队取钱 4.非阻塞 用户进程访问数据时,会马上返回一个状态值(可读不可读),比如在银行柜台办理业务,先取个号,然后坐在椅子

初理解Java中的BIO,NIO,AIO

初识: java 中的 BIO.NIO和 AIO 理解为是 Java 语言对操作系统的各种 IO 模型的封装.程序员在使用这些 API 的时候,不需要关心操作系统层面的知识,也不需要根据不同操作系统编写不同的代码.只需要使用Java的API就可以了. 在讲 BIO,NIO,AIO 之前先来回顾一下这样几个概念:同步与异步,阻塞与非阻塞. 同步与异步: 同步: 同步就是发起一个调用后,被调用者未处理完请求之前,调用不返回. 异步: 异步就是发起一个调用后,立刻得到被调用者的回应表示已接收到请求,但

java并发之bio nio aio

最近在进行tomcat优化,发现tomcat connector并发支持bio nio apr,发现想要理解tomcat并发离不开java io的理解.所有本文先探讨java对io的支持.java的io需要操作系统的支持,本文描述linux系统对io的支持,windows系统因为java生成环境使用少不再论述. 一.linux操作系统io的支持 1.同步阻塞 I/O(bio) 2.同步非阻塞I/O(nio) 3.异步非阻塞 I/O(aio) 二.java 包对io的支持 原文地址:https:/

JAVA 中BIO,NIO,AIO的理解

JAVA 中BIO,NIO,AIO的理解 博客分类: 网络编程 [转自]http://qindongliang.iteye.com/blog/2018539 在高性能的IO体系设计中,有几个名词概念常常会使我们感到迷惑不解.具体如下: 序号 问题 1 什么是同步? 2 什么是异步? 3 什么是阻塞? 4 什么是非阻塞? 5 什么是同步阻塞? 6 什么是同步非阻塞? 7 什么是异步阻塞? 8 什么是异步非阻塞? 散仙不才,在查了一部分资料后,愿试着以通俗易懂的方式解释下这几个名词.如有不足之处,还

也谈BIO | NIO | AIO (Java版--转)

关于BIO | NIO | AIO的讨论一直存在,有时候也很容易让人混淆,就我的理解,给出一个解释: BIO | NIO | AIO,本身的描述都是在Java语言的基础上的.而描述IO,我们需要从两个层面: 编程语言 实现原理 底层基础 从编程语言层面 BIO | NIO | AIO 以Java的角度,理解,linux c里也有AIO的概念(库),这些概念不知道什么原因被炒火起来,这里只从Java角度入手. BIO,同步阻塞式IO,简单理解:一个连接一个线程 NIO,同步非阻塞IO,简单理解:一

一站式学习Java网络编程 全面理解BIO/NIO/AIO

第1章 [开宗明义]网络编程三剑客BIO.NIO.AIO网络编程是RPC的奠基,RPC编程贯穿了程序员生涯的始终.本章首先分析为什么要学网络编,本课为谁设计,然后介绍课程内容主线脉络,让大家清晰知道本课程并非光说不练的假把式,而是处处有实战,实战项目步步优化,最后通过综合项目巩固所学.... 第2章 网络层的解析与协议本章首先对网络中涉及的网络链路层的解析进行讲解,进一步引出网络基本协议知识.使学员了解分层思想,对三种协议的定位及作用有所了解. 第3章 解读java.io专业术语也可以变得生动精

3. 彤哥说netty系列之Java BIO NIO AIO进化史.md

你好,我是彤哥,本篇是netty系列的第三篇. 欢迎来我的公从号彤哥读源码系统地学习源码&架构的知识. 先说两个事 (1)上周五的那篇文章发重复了,是定时任务设置错误导致,给大家带来干扰,这里说声抱歉. (2)之前的问卷调查结果出来了,认为先讲案例的票数较多,所以后面的文章都是先讲案例,再以案例展开讲解组件. 简介 上一章我们介绍了IO的五种模型,实际上Java只支持其中的三种,即BIO/NIO/AIO. 本文将介绍Java中这三种IO的进化史,并从使用的角度剖析它们背后的故事. Java BI

JAVA 004 网络编程 BIO NIO AIO

目录(图片来自于网络) 多路复用Linux环境下底层机制 多路复用模式Reacotor和Proactor BIO,NIO,AIO的简单介绍 多路复用Linux环境下底层机制 多路复用模式Reacotor和Proactor Reactor和Proactor模式的主要区别就是真正的读取和写入操作是由谁来完成的 Reactor中需要应用程序自己读取或者写入数据 Proactor模式,应用程序不需要进行实际的读写过程,它只需要从缓存区读取或者写入即可,操作系统会读取缓存区或者写入缓存区到真正的IO的设备

一站式学习Java网络编程 全面理解BIO/NIO/AIO完整版

一站式学习Java网络编程 全面理解BIO/NIO/AIO 资源获取链接:点击获取完整教程 网络层编程,是每一个开发者都要面对的技术.课程为解决大家学习网络层知识的难题,以创新性的“对比式学习”搭建网络编程课程,课程主线清晰(网络层基础铺垫-->java网络编程前置技术讲解-->阻塞式编程BIO-->非阻塞式编程NIO-->异步编程AIO-->综合实战)适合每一位需要理解网络编程的同学们学习.以“项目驱动”为导向的学习,与企业刚需灵魂契合. 适合人群 网络编程作为编程者的必备