Java NIO服务器端开发

一、NIO类库简介

  1、缓冲区Buffer

  Buffer是一个对象,包含一些要写入和读出的数据。

  在NIO中,所有的数据都是用缓冲区处理的,读取数据时,它是从通道(Channel)直接读到缓冲区中,在写入数据时,也是从缓冲区写入到通道。

  缓冲区实质上是一个数组,通常是一个字节数组(ByteBuffer),也可以是其它类型的数组,此外缓冲区还提供了对数据的结构化访问以及维护读写位置等信息。

  Buffer类的继承关系如下图所示:

  

  2、通道Channel

  Channel是一个通道,网络数据通过Channel读取和写入。通道和流的不同之处在于通道是双向的(通道可以用于读、写后者二者同时进行),流只是在一个方向上移动。

  Channel大体上可以分为两类:用于网络读写的SelectableChannel(ServerSocketChannel和SocketChannel就是其子类)、用于文件操作的FileChannel。

  下面的例子给出通过FileChannel来向文件中写入数据、从文件中读取数据,将文件数据拷贝到另一个文件中:

public class NioTest
{
    public static void main(String[] args) throws IOException
    {
        copyFile();
    }
    //拷贝文件
    private static void copyFile()
    {
        FileInputStream in=null;
        FileOutputStream out=null;
        try
        {
            in=new FileInputStream("src/main/java/data/in-data.txt");
            out=new FileOutputStream("src/main/java/data/out-data.txt");
            FileChannel inChannel=in.getChannel();
            FileChannel outChannel=out.getChannel();
            ByteBuffer buffer=ByteBuffer.allocate(1024);
            int bytesRead = inChannel.read(buffer);
            while (bytesRead!=-1)
            {
                buffer.flip();
                outChannel.write(buffer);
                buffer.clear();
                bytesRead = inChannel.read(buffer);
            }
        }
        catch (FileNotFoundException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    //写文件
    private static void writeFileNio()
    {
        try
        {
            RandomAccessFile fout = new RandomAccessFile("src/main/java/data/nio-data.txt", "rw");
            FileChannel fc=fout.getChannel();
            ByteBuffer buffer=ByteBuffer.allocate(1024);
            buffer.put("hi123".getBytes());
            buffer.flip();
            try
            {
                fc.write(buffer);
            } catch (IOException e)
            {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        catch (FileNotFoundException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    //读文件
    private static void readFileNio()
    {
        FileInputStream fileInputStream;
        try
        {
            fileInputStream = new FileInputStream("src/main/java/data/nio-data.txt");
            FileChannel fileChannel=fileInputStream.getChannel();//从 FileInputStream 获取通道
            ByteBuffer byteBuffer=ByteBuffer.allocate(1024);//创建缓冲区
            int bytesRead=fileChannel.read(byteBuffer);//将数据读到缓冲区
            while(bytesRead!=-1)
            {
                /*limit=position
                 * position=0;
                 */
                byteBuffer.flip();
                //hasRemaining():告知在当前位置和限制之间是否有元素
                while (byteBuffer.hasRemaining())
                {
                    System.out.print((char) byteBuffer.get());
                }
                /*
                 * 清空缓冲区
                 * position=0;
                 * limit=capacity;
                 */
                byteBuffer.clear();
                bytesRead = fileChannel.read(byteBuffer);
            }
        } catch (FileNotFoundException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

  3、多路复用器Selector

  多路复用器提供选择已经就绪的任务的能力。Selector会不断的轮询注册在其上的Channel,如果某个Channel上面发送读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。

  一个多路复用器Selector可以同时轮询多个Channel,由于JDK使用了epoll代替了传统的select实现,所以它没有最大连接句柄1024/2048的限制,意味着只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端。其模型如下图所示:

  

  用单线程处理一个Selector。要使用Selector,得向Selector注册Channel,然后调用它的select()方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件,事件的例子有如新连接进来,数据接收等。

  注:

  1、什么select模型?

  select是事件触发机制,当等待的事件发生就触发进行处理,多用于Linux实现的服务器对客户端的处理。

  可以阻塞地同时探测一组支持非阻塞的IO设备,是否有事件发生(如可读、可写,有高优先级错误输出等),直至某一个设备触发了事件或者超过了指定的等待时间。也就是它们的职责不是做IO,而是帮助调用者寻找当前就绪的设备。

  2、什么是epoll模型?

  epoll的设计思路,是把select/poll单个的操作拆分为1个epoll_create+多个epoll_ctrl+一个wait。此外,内核针对epoll操作添加了一个文件系统”eventpollfs”,每一个或者多个要监视的文件描述符都有一个对应的eventpollfs文件系统的inode节点,主要信息保存在eventpoll结构体中。而被监视的文件的重要信息则保存在epitem结构体中。所以他们是一对多的关系。

二、NIO服务器端开发

  功能说明:开启服务器端,对每一个接入的客户端都向其发送hello字符串。

  使用NIO进行服务器端开发主要有以下几个步骤:

  1、创建ServerSocketChannel,配置它为非阻塞模式

    serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);

  2、绑定监听,配置TCP参数,如backlog大小

    serverSocketChannel.socket().bind(new InetSocketAddress(8080));

  3、创建一个独立的I/O线程,用于轮询多路复用器Selector

  4、创建Selector,将之前创建的ServerSocketChannel注册到Selector上,监听SelectionKey.ACCEPT

  selector=Selector.open();
   serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

  5、启动I/O线程,在循环体内执行Selector.select()方法,轮询就绪的Channel

  while(true)
   {
        try
        {
           //select()阻塞到至少有一个通道在你注册的事件上就绪了
           //如果没有准备好的channel,就在这一直阻塞
           //select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。
           selector.select();
        }
        catch (IOException e)
        {
           // TODO Auto-generated catch block
           e.printStackTrace();
           break;
         } }

  6、当轮询到了处于就绪状态的Channel时,需对其进行判断,如果是OP_ACCEPT状态,说明是新的客户端接入,则调用ServerSocketChannel.accept()方法接受新的客户端

      //返回已经就绪的SelectionKey,然后迭代执行
            Set<SelectionKey> readKeys=selector.selectedKeys();
            for(Iterator<SelectionKey> it=readKeys.iterator();it.hasNext();)
            {
                SelectionKey key=it.next();
                it.remove();
                try
                {
                    if(key.isAcceptable())
                    {
                        ServerSocketChannel server=(ServerSocketChannel) key.channel();
                        SocketChannel client=server.accept();
                        client.configureBlocking(false);
                        client.register(selector,SelectionKey.OP_WRITE);
                    }
                    else if(key.isWritable())
                    {
                        SocketChannel client=(SocketChannel) key.channel();
                        ByteBuffer buffer=ByteBuffer.allocate(20);
                        String str="hello";
                        buffer=ByteBuffer.wrap(str.getBytes());
                        client.write(buffer);
                        key.cancel();
                    }
                }catch(IOException e)
                {
                    e.printStackTrace();
                    key.cancel();
                    try
                    {
                        key.channel().close();
                    } catch (IOException e1)
                    {
                        // TODO Auto-generated catch block
                        e1.printStackTrace();
                    }

                }
            }    

  7、设置新接入的客户端链路SocketChannel为非阻塞模式,配置其他的一些TCP参数

  if(key.isAcceptable())
    {
        ServerSocketChannel server=(ServerSocketChannel) key.channel();
        SocketChannel client=server.accept();
        client.configureBlocking(false);
        ...
    }

  8、将SocketChannel注册到Selector,监听OP_WRITE

  client.register(selector,SelectionKey.OP_WRITE);

  9、如果轮询的Channel为OP_WRITE,则说明要向SockChannel中写入数据,则构造ByteBuffer对象,写入数据包

  else if(key.isWritable())
    {
        SocketChannel client=(SocketChannel) key.channel();
        ByteBuffer buffer=ByteBuffer.allocate(20);
        String str="hello";
        buffer=ByteBuffer.wrap(str.getBytes());
        client.write(buffer);
        key.cancel();
    } 

  完整代码如下:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class ServerSocketChannelDemo
{
    public static void main(String[] args)
    {
        ServerSocketChannel serverSocketChannel;
        Selector selector=null;
        try
        {
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(8080));
            selector=Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        }
        catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        while(true)
        {
            try
            {
                //select()阻塞到至少有一个通道在你注册的事件上就绪了
                //如果没有准备好的channel,就在这一直阻塞
                //select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。
                selector.select();
            }
            catch (IOException e)
            {
                // TODO Auto-generated catch block
                e.printStackTrace();
                break;
            }
            //返回已经就绪的SelectionKey,然后迭代执行
            Set<SelectionKey> readKeys=selector.selectedKeys();
            for(Iterator<SelectionKey> it=readKeys.iterator();it.hasNext();)
            {
                SelectionKey key=it.next();
                it.remove();
                try
                {
                    if(key.isAcceptable())
                    {
                        ServerSocketChannel server=(ServerSocketChannel) key.channel();
                        SocketChannel client=server.accept();
                        client.configureBlocking(false);
                        client.register(selector,SelectionKey.OP_WRITE);
                    }
                    else if(key.isWritable())
                    {
                        SocketChannel client=(SocketChannel) key.channel();
                        ByteBuffer buffer=ByteBuffer.allocate(20);
                        String str="hello";
                        buffer=ByteBuffer.wrap(str.getBytes());
                        client.write(buffer);
                        key.cancel();
                    }
                }catch(IOException e)
                {
                    e.printStackTrace();
                    key.cancel();
                    try
                    {
                        key.channel().close();
                    } catch (IOException e1)
                    {
                        // TODO Auto-generated catch block
                        e1.printStackTrace();
                    }

                }
            }
        }
    }
}

  我们用telnet localhost 8080模拟出多个客户端:

  

  程序运行结果如下:

  

三、参考资料

  1、netty权威指南(李林峰)

  

时间: 2024-07-30 10:20:35

Java NIO服务器端开发的相关文章

通过Tomcat的Http11NioProtocol源码学习Java NIO设计

Tomcat的Http11NioProtocol协议使用Java NIO技术实现高性能Web服务器.本文通过分析Http11NioProtocol源码来学习Java NIO的使用.从中可以了解到阻塞IO和非阻塞IO的配合,NIO的读写操作以及Selector.wakeup的使用. 1. 初始化阶段 Java NIO服务器端实现的第一步是开启一个新的ServerSocketChannel对象.Http11NioProtocol的实现也不例外, 在NioEndPoint类的init方法可以看到这段代

基于Java NIO的多人在线聊天工具源码实现(登录,单聊,群聊)

近来在学习Java NIO网络开发知识,写了一个基于Java NIO的多人在线聊天工具练练手.源码公开在Coding上: https://coding.net/u/hust_wsh/p/MyChat/git ,开发环境是Ubuntu14.04+Eclipse Mars+JDK1.8. 要想编写一个基于Java NIO的多人在线聊天工具,我总结需要以下几方面的地址:客户端服务器模型,Java NIO中的Selector,SocketChannel,ByteBuffer,Collections以及序

我的Java开发学习之旅------&gt;Java NIO 报java.nio.charset.MalformedInputException: Input length = 1异常

今天在使用Java NIO的Channel和Buffer进行文件操作时候,报了java.nio.charset.MalformedInputException: Input length = 1异常,具体如下: java.nio.charset.MalformedInputException: Input length = 1 at java.nio.charset.CoderResult.throwException(CoderResult.java:260) at java.nio.char

Java NIO开发需要注意的陷阱(转)

陷阱1:处理事件忘记移除key在select返回值大于0的情况下,循环处理Selector.selectedKeys集合,每处理一个必须从Set中移除 Iterator<SelectionKey> it=set.iterator(); While(it.hasNext()){ SelectionKey key=it.next(); it.remove(); //切记移除 ??处理事件 } 不移除的后果是本次的就绪的key集合下次会再次返回,导致无限循环,CPU消耗100% 陷阱2:Select

高吞吐高并发Java NIO服务的架构(NIO架构及应用之一)

高吞吐高并发Java NIO服务的架构(NIO架构及应用之一) http://maoyidao.iteye.com/blog/1149015 Java NIO成功的应用在了各种分布式.即时通信和中间件Java系统中.证明了基于NIO构建的通信基础,是一种高效,且扩展性很强的通信架构. 基于Reactor模式的高可扩展性架构这个架构的基本思路在“基于高可用性NIO服务器架构”(http://today.java.net/pub/a/today/2007/02/13/architecture-of-

Java NIO实现的C/S模式多人聊天工具

小弟初学NIO,做了个控制台聊天工具,不知道代码写的如何,望大神们批评指点. 服务器端,两个线程,一个处理客户端请求和转发消息,另一个处理服务器管理员指令,上代码: package kindz.onlinechat; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.cha

java NIO的多路复用及reactor模式【转载】

本文转载自:http://www.blogjava.net/hello-yun/archive/2012/10/17/389729.html java nio从1.4版本就出现了,而且依它优异的性能赢得了广大java开发爱好者的信赖.我很纳闷,为啥我到现在才接触,难道我不是爱好者,难道nio不优秀.经过长达半分钟的思考,我意识到:时候未到.以前总是写那些老掉牙的web程序,唉,好不容易翻身啦,现在心里好受多了.因为真不想自己到了30岁,还在说,我会ssh,会ssi,精通javascript,精通

JAVA NIO non-blocking模式实现高并发服务器

JAVA NIO non-blocking模式实现高并发服务器 分类: JAVA NIO2014-04-14 11:12 1912人阅读 评论(0) 收藏 举报 目录(?)[+] Java自1.4以后,加入了新IO特性,NIO. 号称new IO. NIO带来了non-blocking特性. 这篇文章主要讲的是如何使用NIO的网络新特性,来构建高性能非阻塞并发服务器. 文章基于个人理解,我也来搞搞NIO.,求指正. 在NIO之前 服务器还是在使用阻塞式的java socket. 以Tomcat最

从底层入手,图解 Java NIO BIO MIO AIO 四大IO模型与原理

目录 写在前面 1.1. Java IO读写原理 1.1.1. 内核缓冲与进程缓冲区 1.1.2. java IO读写的底层流程 1.2. 四种主要的IO模型 1.3. 同步阻塞IO(Blocking IO) 1.4. 同步非阻塞NIO(None Blocking IO) 1.5. IO多路复用模型(I/O multiplexing) 1.6. 异步IO模型(asynchronous IO) 小结一下: 写在最后 疯狂创客圈 百万级流量 高并发实战 疯狂创客圈 Java 分布式聊天室[ 亿级流量