java的nio之:java的nio的服务器实现模型

【nio服务端序列图】

一:nio服务器启动类

 1 package com.yeepay.sxf.testnio;
 2 /**
 3  * nio创建的的timerServer服务器
 4  *
 5  * @author sxf
 6  *
 7  */
 8 public class NIOTimerServer {
 9
10     /**
11      * nio服务器启动的入口
12      * @param args
13      */
14     public static void main(String[] args) {
15         //启动服务器绑定的端口号
16         int port=8000;
17         //获取端口号
18         if(args!=null && args.length>0){
19             try {
20                 port=Integer.valueOf(args[0]);
21             } catch (Exception e) {
22                 e.printStackTrace();
23             }
24         }
25
26         //新建nio服务器类
27         MultiplexerTimerServer timerServer=new MultiplexerTimerServer(port);
28
29         //启动服务类的主线程
30         new Thread(timerServer,"NIO-MultiplexerTimerServer-001").start();
31     }
32 }

二:nio服务器

  1 package com.yeepay.sxf.testnio;
  2
  3 import java.io.BufferedReader;
  4 import java.io.IOException;
  5 import java.net.InetSocketAddress;
  6 import java.nio.ByteBuffer;
  7 import java.nio.channels.SelectionKey;
  8 import java.nio.channels.Selector;
  9 import java.nio.channels.ServerSocketChannel;
 10 import java.nio.channels.SocketChannel;
 11 import java.util.Date;
 12 import java.util.Iterator;
 13 import java.util.Set;
 14
 15 import com.sun.org.apache.xml.internal.utils.StopParseException;
 16
 17 /**
 18  * nio的时间服务器
 19  * @author sxf
 20  *
 21  */
 22 public class MultiplexerTimerServer implements Runnable {
 23
 24     //选择器
 25     private Selector selector;
 26
 27     //
 28     private ServerSocketChannel serverSocketChannel;
 29
 30     private volatile boolean stop;
 31
 32     //启动服务
 33     public MultiplexerTimerServer(int port){
 34         try {
 35             //初始化多路复用器
 36             selector=Selector.open();
 37             //初始化socket通道
 38             serverSocketChannel=ServerSocketChannel.open();
 39             //设置通道为非阻塞模式
 40             serverSocketChannel.configureBlocking(false);
 41             //将该通道绑定地址和端口号
 42             serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
 43             //将该通道注册到多路复用器,并注册链接请求事件
 44             serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
 45             System.out.println("The time server is start in port:"+port);
 46         } catch (Exception e) {
 47             // TODO: handle exception
 48             e.printStackTrace();
 49             System.exit(1);
 50         }
 51     }
 52
 53     /**
 54      * 停止服务器
 55      */
 56     public void stop(){
 57         this.stop=true;
 58     }
 59
 60
 61     /**
 62      * 服务器运行主体
 63      */
 64     @Override
 65     public void run() {
 66         while(!stop){
 67             try {
 68                 System.out.println("MultiplexerTimerServer.run()");
 69                 //select()阻塞到至少有一个通道在你注册的事件上就绪了。
 70                 selector.select();
 71                 //获取注册在这个多路复用器上的已经就绪的通道的集合
 72                 Set<SelectionKey> selectionKeys=selector.selectedKeys();
 73                 //循环迭代已经就绪的通道集合
 74                 Iterator<SelectionKey> it=selectionKeys.iterator();
 75                 SelectionKey key=null;
 76                 while(it.hasNext()){
 77                     key=it.next();
 78                     //防止重复执行通道事件
 79                     it.remove();
 80                     //处理该通道上的事件
 81                     try {
 82                         handleInput(key);
 83                     } catch (Exception e) {
 84                         if(key!=null){
 85                             key.cancel();
 86                             if(key.channel()!=null){
 87                                 key.channel().close();
 88                             }
 89                         }
 90                     }
 91                 }
 92
 93             } catch (Exception e) {
 94                 e.printStackTrace();
 95             }
 96
 97
 98         }
 99     }
100
101
102     /**
103      * 处理请求的事件
104      * @param key
105      * @throws IOException
106      */
107     private void handleInput(SelectionKey key) throws IOException{
108         if(key.isValid()){
109             //处理新接入的请求消息
110             if(key.isAcceptable()){
111                 //请求链接事件就绪
112                 ServerSocketChannel ssc=(ServerSocketChannel) key.channel();
113                 SocketChannel  sc=ssc.accept();
114                 sc.configureBlocking(false);
115                 //在多路复用器上注册一个soketChannel,当有读事件则触发
116                 sc.register(selector, SelectionKey.OP_READ);
117             }
118
119             if(key.isReadable()){
120                 //读事件就绪
121                 SocketChannel sc=(SocketChannel) key.channel();
122                 //声明一个缓冲区
123                 ByteBuffer readBuffer=ByteBuffer.allocate(1024);
124                 //从通道里读取数据写入缓冲区
125                 int readBytes=sc.read(readBuffer);
126                 //readBytes>0:表示读到了字节,对字节进行编解码。
127                 //readBytes=0:没有读取到字节,属于正常场景,忽略
128                 //readBytes=-1;链路已经关闭,需要关闭socketChannel,释放资源
129                 if(readBytes>0){
130                     //将ByteBuffer的limit设置为position,position设置为0
131                     readBuffer.flip();
132                     //编解码数据
133                     byte[] bytes=new byte[readBuffer.remaining()];
134                     //将数据从缓冲区复制到数组里
135                     readBuffer.get(bytes);
136                     //翻译请求的内容
137                     String body=new String(bytes,"UTF-8");
138                     //打印请求的内容
139                     System.out.println("the timerserver receive order:"+body);
140
141                     //处理请求内容
142                     String currentTime=null;
143                     if("shangxiaofei".equals(body)){
144                         currentTime=new Date().toString();
145                     }else{
146                         currentTime="request param is error";
147                     }
148
149                     //将处理的结果响应给客户端
150                     doWrite(sc, currentTime);
151                 }else if(readBytes<0){
152                     //对链路进行关闭
153                     key.cancel();
154                     sc.close();
155                 }else{
156                     //忽略
157                 }
158             }
159         }
160     }
161
162     /**
163      * 响应请求的内容
164      * @param channel
165      * @param response
166      * @throws IOException
167      */
168     private void doWrite(SocketChannel channel,String response) throws IOException{
169         if(response!=null&&response.trim().length()>0){
170             //将响应的内容转化成byte[]
171             byte[] bytes=response.getBytes();
172             //声明缓冲区
173             ByteBuffer writeBuffer=ByteBuffer.allocate(bytes.length);
174             //将数据写入缓冲区
175             writeBuffer.put(bytes);
176             //修改ByteBuffer的imit设置为position,position设置为0
177             writeBuffer.flip();
178             //将数据从缓冲区写入通道
179             channel.write(writeBuffer);
180         }
181     }
182
183
184 }

【nio客户端序列图】

三:nio服务器客户端启动类

 1 package com.yeepay.sxf.testnio;
 2
 3
 4 /**
 5  * 向TimerServer发送请求的客户端
 6  * @author sxf
 7  *
 8  */
 9 public class NIOTimerClient {
10
11     public static void main(String[] args) {
12         int port=8000;
13
14         if(args!=null&&args.length>0){
15             port=Integer.valueOf(args[0]);
16         }
17         new Thread(new TimerClientHandler("127.0.0.1", port),"TimeClient-001").start();
18     }
19 }

四:nio服务器的客户端

  1 package com.yeepay.sxf.testnio;
  2
  3 import java.io.IOException;
  4 import java.net.InetSocketAddress;
  5 import java.nio.ByteBuffer;
  6 import java.nio.channels.SelectionKey;
  7 import java.nio.channels.Selector;
  8 import java.nio.channels.SocketChannel;
  9 import java.util.Iterator;
 10 import java.util.Set;
 11
 12 /**
 13  * timerclient请求线程
 14  * @author sxf
 15  *
 16  */
 17 public class TimerClientHandler implements Runnable{
 18     //链接timer服务器的ip地址
 19     private String host;
 20     //链接timer服务器服务的端口号
 21     private int port;
 22     //多路复用器
 23     private Selector selector;
 24     //通道
 25     private SocketChannel socketChannel;
 26     //当前请求线程是否停止
 27     private volatile boolean stop;
 28
 29
 30     public TimerClientHandler(String host,int port) {
 31         this.host=host==null?"127.0.0.1":host;
 32         this.port=port;
 33         try {
 34             this.selector=Selector.open();
 35             this.socketChannel=SocketChannel.open();
 36             socketChannel.configureBlocking(false);
 37         } catch (Exception e) {
 38             e.printStackTrace();
 39             System.exit(1);
 40         }
 41     }
 42
 43     /**
 44      * 链接时间服务器
 45      * @throws IOException
 46      */
 47     private void doConnect() throws IOException{
 48         if(socketChannel.connect(new InetSocketAddress(host, port))){
 49             socketChannel.register(selector, SelectionKey.OP_READ);
 50             //doWrite(socketChannel);
 51         }else{
 52             socketChannel.register(selector, SelectionKey.OP_CONNECT);
 53         }
 54     }
 55
 56     /**
 57      * 向时间服务器发送请求
 58      * @param sc
 59      * @throws IOException
 60      */
 61     private void doWrite(SocketChannel sc) throws IOException{
 62         //发送请求的请求内容
 63         byte[] req="shangxiaofei".getBytes();
 64         //声明缓冲区
 65         ByteBuffer writeBuffer=ByteBuffer.allocate(req.length);
 66         //将请求体写入缓冲区
 67         writeBuffer.put(req);
 68         //设置limit
 69         writeBuffer.flip();
 70         //将缓冲区的内容写入通道
 71         sc.write(writeBuffer);
 72         if(!writeBuffer.hasRemaining()){
 73             System.out.println("send order to server success........");
 74         }
 75
 76     }
 77
 78
 79     private void handleInput(SelectionKey key) throws IOException{
 80         if(key.isValid()){
 81             //判断链接是否成功
 82             SocketChannel sc=(SocketChannel) key.channel();
 83
 84                 //链接事件就绪
 85                 if(sc.finishConnect()){
 86                     //是否链接完成
 87                     sc.register(selector, SelectionKey.OP_READ);
 88                     doWrite(sc);
 89                 }else{
 90                     //链接失败,进程退出
 91                     System.exit(1);
 92                 }
 93
 94                 if(key.isReadable()){
 95                     //读事件就绪
 96                     ByteBuffer readBuffer=ByteBuffer.allocate(1024);
 97                     int readBytes=sc.read(readBuffer);
 98                     if(readBytes>0){
 99                         readBuffer.flip();
100                         byte[] bytes=new byte[readBuffer.remaining()];
101                         readBuffer.get(bytes);
102                         String body=new String(bytes,"UTF-8");
103                         System.out.println("TimerServer response:"+body);
104                         this.stop=true;
105                     }else if(readBytes<0){
106                         //对端链路关闭
107                         key.cancel();
108                         sc.close();
109                     }else{
110                         //读到0字节,忽略
111                     }
112                 }
113
114         }
115     }
116
117     @Override
118     public void run() {
119         try {
120             //链接并发送请求
121             doConnect();
122         } catch (Exception e) {
123             // TODO: handle exception
124             e.printStackTrace();
125         }
126
127         while(!stop){
128             try {
129                 //等待响应
130                 selector.select();
131                 //获取已经就绪的通道事件集合,在这个多路复用器上
132                 Set<SelectionKey> selectedKeys=selector.selectedKeys();
133                 //循环迭代处理事件集合
134                 Iterator<SelectionKey> it=selectedKeys.iterator();
135                 SelectionKey key=null;
136                 while (it.hasNext()) {
137                     key=it.next();
138                     it.remove();
139                     try {
140                         handleInput(key);
141                     } catch (Exception e) {
142                         e.printStackTrace();
143                     }
144
145                 }
146             } catch (Exception e) {
147                 e.printStackTrace();
148             }
149         }
150
151         //多路复用器关闭后,所有注册在上面的channel和Pipe等资源都会被自动去注册并关闭
152         //所以不需要重复释放资源
153 //        if(selector!=null){
154 //            try {
155 //                selector.close();
156 //            } catch (Exception e) {
157 //                e.printStackTrace();
158 //            }
159 //        }
160
161     }
162
163
164 }

时间: 2024-11-10 01:34:41

java的nio之:java的nio的服务器实现模型的相关文章

开源一个基于nio的java网络程序

因为最近要从公司离职,害怕用nio写的网络程序没有人能看懂(或许是因为写的不好吧),就调整成了mina(这样大家接触起来非常方便,即使没有socket基础,用起来也不难),所以之前基于nio写的网络程序就开放出来好了! 写的比较挫,大家见谅! 首先是PollServer类,主要处理select,做网络事件的监听和基于FutureTask的数据发送,代码如下: package gs.gate; import gs.gate.handle.ClientHandle; import java.util

我的Java开发学习之旅------&gt;Java NIO 报java.nio.charset.MalformedInputException: Input length = 1异常

今天在使用Java NIO的Channel和Buffer进行文件操作时候,报了java.nio.charset.MalformedInputException: Input length = 1异常,具体如下: java.nio.charset.MalformedInputException: Input length = 1 at java.nio.charset.CoderResult.throwException(CoderResult.java:260) at java.nio.char

Java.nio vs Java.io

Java.nio vs Java.io By Nino Guarnacci on Jun 18, 2009 --- posted by Davide Pisano This document is not a Java.io or a Java.nio manual, or a technical document about Java.io and Java.nio use. It only attempts to compare these two packages, highlightin

Java NIO笔记(一):NIO介绍

Java NIO即Java Non-blocking IO(Java非堵塞I/O),由于是在Jdk1.4之后添加的一套新的操作I/O工具包,所以通常会被叫做Java New IO.NIO是为提供I/O吞吐量而专门设计.其卓越的性能甚至能够与C媲美. NIO是通过Reactor模式的事件驱动机制来达到Non blocking的,那么什么是Reactor模式呢?Reactor翻译成中文是"反应器",就是我们将事件注冊到Reactor中,当有对应的事件发生时,Reactor便会告知我们有哪些

Java NIO:IO与NIO的区别

一.概念 NIO即New IO,这个库是在JDK1.4中才引入的.NIO和IO有相同的作用和目的,但实现方式不同,NIO主要用到的是块,所以NIO的效率要比IO高很多.在Java API中提供了两套NIO,一套是针对标准输入输出NIO,另一套就是网络编程NIO. 二.NIO和IO的主要区别 下表总结了Java IO和NIO之间的主要区别: IO NIO 面向流 面向缓冲 阻塞IO 非阻塞IO 无 选择器 1.面向流与面向缓冲 Java IO和NIO之间第一个最大的区别是,IO是面向流的,NIO是

疯狂Java学习笔记(76)------------NIO.2第二篇

上一篇地址http://write.blog.csdn.net/postedit/46386609 在该系列的上一篇中我演示了NIO.2的三个方法:文件拷贝.文件和目录的删除和文件移动.在这篇文章中,我将向大家展示路径相关的方法(如获取路径.检索路径信息).文件和目录测试方法(如文件或目录的存在性测试)以及面向属性的方法. 获取路径 问:怎样获得一个 java.nio.file.Path 对象? 答:你可以调用 java.nio.file.Paths 类的下列任何一静态个方法,通过文件或目录在文

疯狂Java学习笔记(75)-----------NIO.2第一篇

Java 7引入了NIO.2,NIO.2是继承自NIO框架,并增加了新的功能(例如:处理软链接和硬链接的功能).这篇帖子包括三个部分,我将使用NIO.2的一些示例,由此向大家演示NIO.2的基本使用方法. 下一篇地址http://blog.csdn.net/u011225629/article/details/46386599 文件拷贝 Q:怎样拷贝一个文件? A:你可以使用java.nio.file.Files类的public static Path copy(Path source, Pat

JAVA NIO non-blocking模式实现高并发服务器

JAVA NIO non-blocking模式实现高并发服务器 分类: JAVA NIO2014-04-14 11:12 1912人阅读 评论(0) 收藏 举报 目录(?)[+] Java自1.4以后,加入了新IO特性,NIO. 号称new IO. NIO带来了non-blocking特性. 这篇文章主要讲的是如何使用NIO的网络新特性,来构建高性能非阻塞并发服务器. 文章基于个人理解,我也来搞搞NIO.,求指正. 在NIO之前 服务器还是在使用阻塞式的java socket. 以Tomcat最

java nio学习三:NIO 的非阻塞式网络通信

一.阻塞和非阻塞 传统的 IO 流都是阻塞式的.也就是说,当一个线程调用 read() 或 write()时,该线程被阻塞,直到有一些数据被读取或写入,该线程在此期间不能执行其他任务.因此,在完成网络通信进行 IO 操作时,由于线程会阻塞,所以服务器端必须为每个客户端都提供一个独立的线程进行处理,当服务器端需要处理大量客户端时,性能急剧下降.Java NIO 是非阻塞模式的.当线程从某通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务.线程通常将非阻塞 IO 的空闲时间用于在其他通道上

Java网络编程和NIO详解2:JAVA NIO一步步构建IO多路复用的请求模型

Java网络编程与NIO详解2:JAVA NIO一步步构建IO多路复用的请求模型 知识点 nio 下 I/O 阻塞与非阻塞实现 SocketChannel 介绍 I/O 多路复用的原理 事件选择器与 SocketChannel 的关系 事件监听类型 字节缓冲 ByteBuffer 数据结构 场景 接着上一篇中的站点访问问题,如果我们需要并发访问10个不同的网站,我们该如何处理? 在上一篇中,我们使用了java.net.socket类来实现了这样的需求,以一线程处理一连接的方式,并配以线程池的控制