通过Tomcat的Http11NioProtocol源码学习Java NIO设计

Tomcat的Http11NioProtocol协议使用Java NIO技术实现高性能Web服务器。本文通过分析Http11NioProtocol源码来学习Java NIO的使用。从中可以了解到阻塞IO和非阻塞IO的配合,NIO的读写操作以及Selector.wakeup的使用。

1. 初始化阶段

Java NIO服务器端实现的第一步是开启一个新的ServerSocketChannel对象。Http11NioProtocol的实现也不例外, 在NioEndPoint类的init方法可以看到这段代码。

代码1:NioEndPoint.init()方法

public void init()
    throws Exception {

    if (initialized )
        return;
    //开启一个新的ServerSocketChannel
    serverSock = ServerSocketChannel.open();
    //设置socket的性能偏好
    serverSock.socket().setPerformancePreferences(socketProperties .getPerformanceConnectionTime(),
                                                  socketProperties.getPerformanceLatency(),
                                                  socketProperties.getPerformanceBandwidth());
    InetSocketAddress addr = ( address!=null ?new InetSocketAddress(address ,port ):new InetSocketAddress(port));
    //绑定端口号,并设置backlog
    serverSock.socket().bind(addr,backlog );
    //将serverSock设置成阻塞IO
    serverSock.configureBlocking(true); //mimic APR behavior

    //初始化acceptor线程数
    if (acceptorThreadCount == 0) {
        // FIXME: Doesn‘t seem to work that well with multiple accept threads
        acceptorThreadCount = 1;
    }
    //初始化poller线程数
    if (pollerThreadCount <= 0) {
        //minimum one poller thread
        pollerThreadCount = 1;
    }

    // 根据需要,初始化SSL
    // 因为主要关注Java NIO, 所以这一块代码就省略掉了
    if (isSSLEnabled()) {
       ......
    }
    //OutOfMemoryError策略
    if (oomParachute >0) reclaimParachute(true);

    //开启NioSelectorPool
    selectorPool.open();
    initialized = true ;
}

在NioEndPoint.init方法中,可以看到ServerSocketChannel被设置成阻塞IO,并且没有注册任何就绪事件。这样可以和阻塞ServerSocket一样方便地使用阻塞accept方法来接收客户端新来的连接。但不同的是当NioEndPoint.Accept线程通过accept方法获得一个新的SocketChannel后会构建一个OP_REGISTER类型的PollerEvent事件并放到Poller.events队列中。而我们使用ServerSocket实现服务器的时候,在接收到新连接后,一般是从线程池中取出一个线程来处理这个连接。

在NioEndPoint.Accept的setSocketOptions方法中可以看到获得SocketChannel后的处理过程。步骤如下:

1)将SocketChannel设置成非阻塞;

2)构建OP_REGISTER类型的PollerEvent对象,并放入到Poller.events队列中。

代码2:NioEndPoint.Accept类的setSocketOptions方法

protected boolean setSocketOptions(SocketChannel socket) {
    try {
       //将客户端Socket设置为非阻塞, APR风格
        socket.configureBlocking( false);
        Socket sock = socket.socket();
        socketProperties.setProperties(sock);
        //从缓存中取NioChannel对象,如果取不到直接构建一个
        NioChannel channel = nioChannels.poll();
        if ( channel == null ) {
            // 如果sslContext不等于null, 需要启动ssl
            if (sslContext != null) {
                ....
            }
            //正常tcp启动
            else {
                //构建NioBufferHandler对象
                NioBufferHandler bufhandler = new NioBufferHandler(socketProperties .getAppReadBufSize(),
                                                                   socketProperties.getAppWriteBufSize(),
                                                                   socketProperties.getDirectBuffer());
                //构建NioChannel对象
                channel = new NioChannel(socket, bufhandler);
            }
        } else {
            //从缓存中取的NioChannel对象,将客户端socket设置进去
            channel.setIOChannel(socket);
            if ( channel instanceof SecureNioChannel ) {
                SSLEngine engine = createSSLEngine();
                ((SecureNioChannel)channel).reset(engine);
            } else {
                channel.reset();
            }
        }
        //注册NioChannel对象
        getPoller0().register(channel);
    } catch (Throwable t) {
        try {
            log.error("" ,t);
        } catch ( Throwable tt){}
        // Tell to close the socket
        return false ;
    }
    return true ;
}

Poller线程会从Poller.events队列中取出PollerEvent对象,并运行PollerEvent.run()方法。在PollerEvent.run()方法中发现是OP_REGISTER事件,则会在Poller.selector上注册SocketChannel对象的OP_READ就绪事件。

代码3:PollerEvent.run()方法代码片段

public void run() {
   if ( interestOps == OP_REGISTER ) {
       try {
           //在Poller.selector上注册OP_READ就绪事件
           socket.getIOChannel().register(socket .getPoller().getSelector(), SelectionKey.OP_READ , key );
       } catch (Exception x) {
           log.error("" , x);
       }
   }
   ......
}

至此,一个客户端连接准备工作就已经完成了。我们获得了一个客户端的SocketChannel, 并注册OP_READ就绪事件到Poller.selector上(如图1)。下面就可以进行数据读写了。

图1:ServerSocketChannel和SocketChannel的初始化状态

2. Poller.selector的wakeup方法

Poller线程会做如下工作:

1) 通过selection操作获取已经选中的SelectionKey数量;

2) 执行Poller.events队列中的PollerEvent;

3) 处理已经选中的SelectionKey。

当有新PollerEvent对象加入Poller.events队列中,需要尽快执行第二步,而不应该阻塞的selection操作中。所以就需要配合Selector.wakeup()方法来实现这个需求。Tomcat使用信号量wakeupCounter来控制Selector.wakeup()方法,阻塞Selector.select()方法和非阻塞Selector.selectNow()方法的使用。

当有新PollerEvent对象加入Poller.events队列中,会按照如下条件执行Selector.wakeup()方法。

  • 当wakeupCounter加1后等于0,说明Poller.selector阻塞在selection操作,这时才需要调用Selector.wakeup()方法。
  • 当wakeupCounter加1后不等于0,说明Poller.selector没有阻塞在selection操作,则不需要调用Selector.wakeup()方法。并且为了尽快执行第二步,Poller线程在下一次直接调用非阻塞方法Selector.selectNow()。

代码4:Poller.addEvent()方法,实现将PollerEvent对象加入Poller.events队列中。

public void addEvent(Runnable event) {
   events.offer(event);
   //如果wakeupCount加1后等于0,则调用wakeup方法
   if ( wakeupCounter .incrementAndGet() == 0 ) selector.wakeup();
}

代码5: Poller线程的selection操作代码

if (wakeupCounter .get()>0) {
   keyCount = selector.selectNow();
 else {
   wakeupCounter.set(-1);
   keyCount = selector.select(selectorTimeout );
}
wakeupCounter.set(0);

这样的设计因为Java NIO的wakeup有如下的特性:

  • 在Selector对象上调用wakeup()方法将会导致第一个没有返回的selection操作立即返回。如果当前没有进行的selection操作,那么下一次的select()方法的调用将立即返回。而这个将wakeup行为延迟到下一个select()方法经常不是我们想要的(当然也不是Tomcat想要的)。我们一般只是想从sleeping的线程wakeup,但允许接下来的selection操作正常处理。

所以,Tomcat通过wakeupCounter信号量的变化来控制只有阻塞在selection操作的时候才调用Selector.wakeup()方法。当有新PollerEvent对象加入Poller.events队列中,并且没有处于阻塞在selection操作中,则直接调用非阻塞方法Selector.selectNow()。

3. 读(写)数据

Poller线程会调用Poller.processKey()方法处理已经选中的SelectionKey。

该方法会完成下面工作:

1)取消在Poller.selector上注册的OP_READ就绪事件;

2)启动工作线程来处理网络请求;

2-1)读取和解析http请求数据

2-2)如果是动态内容,则会调用用户自定义的Servlet类处理并返回结果给浏览器;如果是静态内容,则会直接返回静态资源数据给浏览器。

我们在这就不详细讨论http协议的实现以及Servlet的使用,直接跳到网络IO读写实现类NioSelectorPool。

NioSelectorPool类也提供了产生Selector对象的功能,通过NioSelectorPool.get()方法就可以获得一个Selector对象。

根据命令行参数-Dorg.apache.tomcat.util.net.NioSelectorShared的设置决定是否在SocketChannel中共享Selector。

  • 若会True(默认), 则所有的SocketChannel共享一个Selector;
  • 若为False,  则每一个SocketChannel使用不同的Selector(开启的Selector对象最多不超过NioSelectorPool.maxSelectors)。

从NioSelectorPool类中获得的Selector对象会传入到NioSelectorPool的read和write方法,并在网络IO读写时候使用。

NioSelectorPool类的读写方法提供了两种模式。通过方法的最后一个入参block控制。

1)读方法read():

  • block为False, 则是非阻塞模式。如果读不到数据,则直接返回了;如果读到数据则继续读。
  • block为True, 则是阻塞模式。如果第一次读取不到数据,会在NioSelectorPool提供的Selector对象上注册OP_READ就绪事件,并循环调用Selector.select(long)方法,超时等待OP_READ就绪事件。如果OP_READ事件已经就绪,并且接下来读到数据,则会继续读。read()方法整体会根据readTimeout设置进行超时控制。若超时,则会抛出SocketTimeoutException异常。

2)写方法write():

  • block为False, 则是非阻塞模式。写数据之前不会监听OP_WRITE事件。如果没有成功,则直接返回。
  • block为True, 则是阻塞模式。第一次写数据之前不会监听OP_WRITE就绪事件。如果没有写成功,则会在NioSelectorPool提供的selector注册OP_WRITE事件。并循环调用Selector.select(long)方法,超时等待OP_WRITE就绪事件。如果OP_WRITE事件已经就绪,并且接下来写数据成功,则会继续写数据。write方法整体会根据writeTimeout设置进行超时控制。如超时,则会抛出SocketTimeoutException异常。

另外如果是共享Selector(NioSelectorShared=true)并且阻塞模式(block=true),则会使用NioBlockingSelector类实现读写数据。该类与NioSelectorPool使用Java NIO的策略是类似的,但实现略有不同,本文就不详细分析了。

图2:事件注册在读写时候发生的变化

下面是NioSelectorPool的read方法,实现从网络IO中读取数据。该方法有5个参数:

  • buf 保存从网络IO中读取到的数据;
  • socket NioChannel对象,其中封装了SocketChannel;
  • selector 为block模式使用的Selector对象,在实际调用的时候,会将NioSelectorPool类提供的selector对象传进去;
  • readTimout 读超时时间;
  • block 是否是阻塞模式,上面已经说明阻塞和非阻塞模式的区别。

代码6:NioSelectorPool的read()方法

public int read(ByteBuffer buf, NioChannel socket, Selector selector, long readTimeout, boolean block) throws IOException {
    //如果是共享Selector和阻塞模式,则使用NioBlockingSelector实现数据读取
    if ( SHARED && block ) {
        return blockingSelector .read(buf,socket,readTimeout);
    }
    SelectionKey key = null;
    int read = 0;
    boolean timedout = false;
    //一开始我们认为是可以读的
    int keycount = 1; //assume we can read
    //开始时间
    long time = System.currentTimeMillis(); //start the timeout timer
    try {
       //当没有超时,则继续读数据
        while ( (!timedout) ) {
            int cnt = 0;
            if ( keycount > 0 ) { //only read if we were registered for a read
                cnt = socket.read(buf);
                if (cnt == -1) throw new EOFException();
                read += cnt;
                //如果读取到数据,则继续读
                if (cnt > 0) continue; //read some more
                //如果没有读取到数据,并且不是block模式,则直接break
                if (cnt==0 && (read>0 || (!block) ) ) break; //we are done reading
            }
            if ( selector != null ) {//perform a blocking read
                //在NioSelectionPool提供的selector上注册OP_READ事件
                if (key==null) key = socket.getIOChannel().register(selector, SelectionKey.OP_READ);
                else key.interestOps(SelectionKey.OP_READ);
                //调用Selector.select方法
                keycount = selector.select(readTimeout);
            }
            //计算是否超时
            if (readTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=readTimeout;
        } //while
          //如果超时,抛出SocketTimeoutException异常
        if ( timedout ) throw new SocketTimeoutException();
    } finally {
        //在返回前,取消SelectionKey, 并将所有的key从selector中删掉
        if (key != null) {
            key.cancel();
            if (selector != null) selector.selectNow();//removes the key from this selector
        }
    }
    return read;
}

下面是NioSelectorPool的写方法,实现向网络IO中写数据。该方法有5个参数:

  • buf 保存需要写入的数据;
  • socket NioChannel对象,其中封装了SocketChannel;
  • selector 为block模式使用的Selector对象,在实际调用的时候,会将NioSelectorPool类提供的selector对象传进去;
  • writeTimeout 写超时时间;
  • block 是否是阻塞模式,上面已经说明阻塞和非阻塞模式的区别;
  • lastWrite 最近写入数据的byte数量。

代码7:NioSelectorPool.write()方法

public int write(ByteBuffer buf, NioChannel socket, Selector selector,
                 long writeTimeout, boolean block,MutableInteger lastWrite) throws IOException {
    //如果是共享Selector和阻塞模式,则使用NioBlockingSelector实现写数据
    if ( SHARED && block ) {
        return blockingSelector.write(buf,socket,writeTimeout,lastWrite);
    }
    SelectionKey key = null;
    int written = 0;
    boolean timedout = false;
   //一开始我们认为是可以读的
   int keycount = 1; //assume we can write
   //记录开始时间
    long time = System.currentTimeMillis(); //start the timeout timer
    try {
        while ( (!timedout) && buf.hasRemaining() ) {
            int cnt = 0;
            if ( keycount > 0 ) { //only write if we were registered for a write
                cnt = socket.write(buf); //write the data
                if (lastWrite!=null) lastWrite.set(cnt);
                if (cnt == -1) throw new EOFException();
              
                written += cnt;
                 //如果写数据成功,重新记录超时开始时间,并继续读
                if (cnt > 0) {
                    time = System. currentTimeMillis(); //reset our timeout timer
                    continue; //we successfully wrote, try again without a selector
                }
                //如果写入数据为0,并且是非阻塞模式,则直接退出
                if (cnt==0 && (!block)) break; //don‘t block
            }
            if ( selector != null ) {
                //在NioSelectorPool的selector注册OP_WRITE事件
                if (key==null) key = socket.getIOChannel().register(selector, SelectionKey.OP_WRITE);
                else key.interestOps(SelectionKey.OP_WRITE);
                keycount = selector.select(writeTimeout);
            }
            //是否超时
            if (writeTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=writeTimeout;
        } //while
          //如果超时,则直接抛出SocketTimeoutException异常
        if ( timedout ) throw new SocketTimeoutException();
    } finally {
         //在返回前,取消SelectionKey, 并将所有的key从selector中删掉
        if (key != null) {
            key.cancel();
            if (selector != null) selector.selectNow();//removes the key from this selector
        }
    }
    return written;
}

4. 总结

Tomcat在使用Java NIO的时候,将ServerSocketChannel配置成阻塞模式,这样可以方便地对ServerSocketChannel编写程序。当accept方法获得一个SocketChannel,并没有立即从线程池中取出一个线程来处理这个SocketChannel,而是构建一个OP_REGISTER类型的PollerEvent,并放到Poller.events队列中。Poller线程会处理这个PollerEvent,发现是OP_REGISTER类型,会在Poller.selector上注册一个这个SocketChannel的OP_READ就绪事件。如图1所示。

因为Java NIO的wakeup特性,使用wakeupCount信号量控制Selector.wakeup()方法,非阻塞方法Selector.selectNow()和阻塞方法Selector.select()的调用。我们在编写Java NIO程序时候也可以参考这种方式。

在SocketChannel上读的时候,分成非阻塞模式和阻塞模式。

  • 非阻塞模式,如果读不到数据,则直接返回了;如果读到数据则继续读。
  • 阻塞模式。如果第一次读取不到数据,会在NioSelectorPool提供的Selector对象上注册OP_READ就绪事件,并循环调用Selector.select(long)方法,超时等待OP_READ就绪事件。如果OP_READ事件已经就绪,并且接下来读到数据,则会继续读。read()方法整体会根据readTimeout设置进行超时控制。若超时,则会抛出SocketTimeoutException异常。

在SocketChannel上写的时候也分成非阻塞模式和阻塞模式。

  • 非阻塞模式,写数据之前不会监听OP_WRITE事件。如果没有成功,则直接返回。
  • 阻塞模式。第一次写数据之前不会监听OP_WRITE就绪事件。如果没有写成功,则会在NioSelectorPool提供的selector注册OP_WRITE事件。并循环调用Selector.select(long)方法,超时等待OP_WRITE就绪事件。如果OP_WRITE事件已经就绪,并且接下来写数据成功,则会继续写数据。write方法整体会根据writeTimeout设置进行超时控制。如超时,则会抛出SocketTimeoutException异常。

在写数据的时候,开始没有监听OP_WRITE就绪事件,直接调用write()方法。这是一个乐观设计,估计网络大部分情况都是正常的,不会拥塞。如果第一次写没有成功,则说明网络可能拥塞,那么再等待OP_WRITE就绪事件。

阻塞模式的读写方法没有在原有的Poller.selector上注册就绪事件,而是使用NioSelectorPool类提供的Selector对象注册就绪事件。这样的设计可以将各个Channel的就绪事件分散注册到不同的Selector对象中,避免大量Channel集中注册就绪事件到一个Selector对象,影响性能。

5. 参考资料

1)Tomcat6.0.18源码

2)Ron Hitchens的Java NIO

本博客系博主原创,转载请附上原博客地址:http://blog.csdn.net/jeff_fangji/article/details/43909677

时间: 2024-07-31 14:31:19

通过Tomcat的Http11NioProtocol源码学习Java NIO设计的相关文章

Java源码学习 -- java.lang.StringBuilder,java.lang.StringBuffer,java.lang.AbstractStringBuilder

一直以来,都是看到网上说“ StringBuilder是线程不安全的,但运行效率高:StringBuffer 是线程安全的,但运行效率低”,然后默默记住:一个是线程安全.一个线程不安全,但对内在原因并不了解.这两天终于下定决心看了下源代码,才深刻理解为啥一个线程安全.一个非线程安全. 一名话总结:java.lang.StringBuilder 与 java.lang.StringBuffer 同是继承于 java.lang.AbstractStringBuilder,具体在功能实现大多在 Abs

tomcat源码学习(2)&#160;&#160;关于apache&#160;digest

好久不写博文,罪过罪过.因为最近公司比较忙加上琐事有点多,所以隔了好久才来更新博文. apache digest本来是struts2框架中来加载xml文件并实例化对象的一个jar包,后来使用的越来越多. 我们都知道tomcat的conf文件夹下有一个server.xml配置文件,我们经常会其中的来进行配置以来运行一个java web项目,也经常修改中的port属性以来实现修改tomcat监听的端口.其实每个标签基本上都对应着一个对象,那tomcat是如何将这些对象实例化到java 虚拟机的运行内

Java多线程之JUC包:ReentrantReadWriteLock源码学习笔记

若有不正之处请多多谅解,并欢迎批评指正. 请尊重作者劳动成果,转载请标明原文链接: http://www.cnblogs.com/go2sea/p/5634701.html ReentrantLock提供了标准的互斥操作,但在应用中,我们对一个资源的访问有两种方式:读和写,读操作一般不会影响数据的一致性问题.但如果我们使用ReentrantLock,则在需要在读操作的时候也独占锁,这会导致并发效率大大降低.JUC包提供了读写锁ReentrantReadWriteLock,使得读写锁分离,在上述情

Java多线程之JUC包:Semaphore源码学习笔记

若有不正之处请多多谅解,并欢迎批评指正. 请尊重作者劳动成果,转载请标明原文链接: http://www.cnblogs.com/go2sea/p/5625536.html Semaphore是JUC包提供的一个共享锁,一般称之为信号量. Semaphore通过自定义的同步器维护了一个或多个共享资源,线程通过调用acquire获取共享资源,通过调用release释放. 源代码: /* * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to lic

JDK1.8源码学习之 HashMap.java

///JDK1.8源码学习之HashMap.java package java.util; import java.io.IOException; import java.io.InvalidObjectException; import java.io.Serializable; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.util.function.BiConsu

Java集合源码学习笔记(二)ArrayList分析

Java集合源码学习笔记(二)ArrayList分析 >>关于ArrayList ArrayList直接继承AbstractList,实现了List. RandomAccess.Cloneable.Serializable接口,为什么叫"ArrayList",因为ArrayList内部是用一个数组存储元素值,相当于一个可变大小的数组,也就是动态数组. (1)继承和实现继承了AbstractList,实现了List:ArrayList是一个数组队列,提供了相关的添加.删除.修

Java并发包源码学习之AQS框架(一)概述

AQS其实就是java.util.concurrent.locks.AbstractQueuedSynchronizer这个类. 阅读Java的并发包源码你会发现这个类是整个java.util.concurrent的核心之一,也可以说是阅读整个并发包源码的一个突破口. 比如读ReentrantLock的源码你会发现其核心是它的一个内部类Sync: 整个包中很多类的结构都是如此,比如Semaphore,CountDownLatch都有一个内部类Sync,而所有的Sync都是继承自AbstractQ

Java集合专题总结(1):HashMap 和 HashTable 源码学习和面试总结

2017年的秋招彻底结束了,感觉Java上面的最常见的集合相关的问题就是hash--系列和一些常用并发集合和队列,堆等结合算法一起考察,不完全统计,本人经历:先后百度.唯品会.58同城.新浪微博.趣分期.美团点评等都在1.2--面的时候被问过无数次,都问吐了&_&,其他公司笔试的时候,但凡有Java的题,都有集合相关考点,尤其hash表--现在总结下. Java集合概述 HashMap介绍 HashMap源码学习 关于HashMap的几个经典问题 HashTable介绍和源码学习 Hash

Java Collection源码学习

Java集合类的顶层是Collection<E>接口, Collection接口是最基本的容器接口,继承至Iterable接口(主要通过其进行产生迭代器逐一的进行元素访问).其中的元素允许重复,可以无序. JDK没有提供直接实现Collection接口的实现类,它提供更具体的子接口如List.Set等. 继承自它的子接口包括BeanContext, BeanContextServices, BlockingDeque<E>, BlockingQueue<E>, Dequ