基于JDK7 NIO2的高性能web服务器实践之二(转)

前一篇博客,我简单提了下怎么为NIO2增加TransmitFile支持,文件传送吞吐量是一个性能关注点,此外,并发连接数也是重要的关注点。

不过JDK7中又一次做了简单的实现,不支持同时投递多个AcceptEx请求,只支持一次一个,返回后再投递。这样,客户端连接的接受速度必然大打折扣。不知道为什么sun会做这样的实现,WSASend()/WSAReceive()一次只允许一个还是可以理解,毕竟简化了编程,不用考虑封包乱序问题。
也降低了内存耗尽的风险。AcceptEx却没有这样的理由了。

于是再一次为了性能,我增加了同时投递多个的支持。

另外,在JDK7的默认实现中,AcceptEx返回后,为了设置远程和本地InetSocketAddress也采用了效率很低的方法。4次通过JNI调用getsockname,2次为了取sockaddr,2次为了取port. 这些操作本人采用GetAcceptExSockaddrs一次完成,进一步提高效率。

先看Java部分的代码,框架跟JDK7的一样,细节处理不一样:

/**
 * 
 */
package sun.nio.ch;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.AcceptPendingException;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler;
import java.nio.channels.NotYetBoundException;
import java.nio.channels.ShutdownChannelGroupException;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import sun.misc.Unsafe;

/**
 * This class enable multiple ‘AcceptEx‘ post on the completion port, hence improve the concurrent connection number.
 * @author Yvon
 *
 */
public class WindowsMultiAcceptSupport {

WindowsAsynchronousServerSocketChannelImpl schannel;

private static final Unsafe unsafe = Unsafe.getUnsafe();

// 2 * (sizeof(SOCKET_ADDRESS) + 16)
    private static final int ONE_DATA_BUFFER_SIZE = 88;

private long handle;
    private Iocp iocp;

// typically there will be zero, or one I/O operations pending. In rare
    // cases there may be more. These rare cases arise when a sequence of accept
    // operations complete immediately and handled by the initiating thread.
    // The corresponding OVERLAPPED cannot be reused/released until the completion
    // event has been posted.
    private PendingIoCache ioCache;

private Queue<Long> dataBuffers;
    // the data buffer to receive the local/remote socket address
    //        private final long dataBuffer;

private AtomicInteger pendingAccept;
    private int maxPending;

Method updateAcceptContextM;
    Method acceptM;

WindowsMultiAcceptSupport() {
        //dummy for JNI code.
    }

public void close() throws IOException {

schannel.close();

for (int i = 0; i < maxPending + 1; i++)//assert there is maxPending+1 buffer in the queue
        {
            long addr = dataBuffers.poll();
            // release  resources
            unsafe.freeMemory(addr);
        }

}

/**
     * 
     */
    public WindowsMultiAcceptSupport(AsynchronousServerSocketChannel ch, int maxPost) {
        if (maxPost <= 0 || maxPost > 1024)
            throw new IllegalStateException("maxPost can‘t less than 1 and greater than 1024");
        this.schannel = (WindowsAsynchronousServerSocketChannelImpl) ch;
        maxPending = maxPost;
        dataBuffers = new ConcurrentLinkedQueue<Long>();
        for (int i = 0; i < maxPending + 1; i++) {
            dataBuffers.add(unsafe.allocateMemory(ONE_DATA_BUFFER_SIZE));
        }

pendingAccept = new AtomicInteger(0);
        try {
            Field f = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("handle");
            f.setAccessible(true);
            handle = f.getLong(schannel);

f = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("iocp");
            f.setAccessible(true);
            iocp = (Iocp) f.get(schannel);

f = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("ioCache");
            f.setAccessible(true);
            ioCache = (PendingIoCache) f.get(schannel);

f = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("accepting");
            f.setAccessible(true);
            AtomicBoolean accepting = (AtomicBoolean) f.get(schannel);

accepting.set(true);//disable accepting by origin channel.

} catch (Exception e) {
            e.printStackTrace();
        }

}

@SuppressWarnings("unchecked")
    public final <A> void accept(A attachment,
        CompletionHandler<AsynchronousSocketChannel, ? super A> handler) {
        if (handler == null)
            throw new NullPointerException("‘handler‘ is null");
        implAccept(attachment, (CompletionHandler<AsynchronousSocketChannel, Object>) handler);
    }

/**
     * Task to initiate accept operation and to handle result.
     */
    private class AcceptTask implements Runnable, Iocp.ResultHandler {

private final WindowsAsynchronousSocketChannelImpl channel;
        private final AccessControlContext acc;
        private final PendingFuture<AsynchronousSocketChannel, Object> result;
        private final long dataBuffer;

AcceptTask(WindowsAsynchronousSocketChannelImpl channel, AccessControlContext acc,
            long dataBuffer, PendingFuture<AsynchronousSocketChannel, Object> result) {
            this.channel = channel;
            this.acc = acc;
            this.result = result;
            this.dataBuffer = dataBuffer;
        }

void enableAccept() {
            pendingAccept.decrementAndGet();
            dataBuffers.add(dataBuffer);
        }

void closeChildChannel() {
            try {
                channel.close();
            } catch (IOException ignore) {
            }
        }

// caller must have acquired read lock for the listener and child channel.
        void finishAccept() throws IOException {
            /**
             * JDK7 use 4 calls to getsockname  to setup
             * local& remote address, this is very inefficient.
             * 
             * I change this to use GetAcceptExSockaddrs
             */

InetAddress[] socks = new InetAddress[2];
            int[] ports = new int[2];
            updateAcceptContext(handle, channel.handle(), socks, ports, dataBuffer);
            InetSocketAddress local = new InetSocketAddress(socks[0], ports[0]);
            final InetSocketAddress remote = new InetSocketAddress(socks[1], ports[1]);
            channel.setConnected(local, remote);

// permission check (in context of initiating thread)
            if (acc != null) {
                AccessController.doPrivileged(new PrivilegedAction<Void>() {

public Void run() {
                        SecurityManager sm = System.getSecurityManager();
                        sm.checkAccept(remote.getAddress().getHostAddress(), remote.getPort());

return null;
                    }
                }, acc);
            }
        }

/**
         * Initiates the accept operation.
         */
        @Override
        public void run() {
            long overlapped = 0L;

try {
                // begin usage of listener socket
                schannel.begin();
                try {
                    // begin usage of child socket (as it is registered with
                    // completion port and so may be closed in the event that
                    // the group is forcefully closed).
                    channel.begin();

synchronized (result) {
                        overlapped = ioCache.add(result);

int n = accept0(handle, channel.handle(), overlapped, dataBuffer);//Be careful for the buffer address
                        if (n == IOStatus.UNAVAILABLE) {
                            return;
                        }

// connection accepted immediately
                        finishAccept();

// allow another accept before the result is set
                        enableAccept();
                        result.setResult(channel);
                    }
                } finally {
                    // end usage on child socket
                    channel.end();
                }
            } catch (Throwable x) {
                // failed to initiate accept so release resources
                if (overlapped != 0L)
                    ioCache.remove(overlapped);
                closeChildChannel();
                if (x instanceof ClosedChannelException)
                    x = new AsynchronousCloseException();
                if (!(x instanceof IOException) && !(x instanceof SecurityException))
                    x = new IOException(x);
                enableAccept();
                result.setFailure(x);
            } finally {
                // end of usage of listener socket
                schannel.end();
            }

// accept completed immediately but may not have executed on
            // initiating thread in which case the operation may have been
            // cancelled.
            if (result.isCancelled()) {
                closeChildChannel();
            }

// invoke completion handler
            Invoker.invokeIndirectly(result);
        }

/**
         * Executed when the I/O has completed
         */
        @Override
        public void completed(int bytesTransferred, boolean canInvokeDirect) {
            try {
                // connection accept after group has shutdown
                if (iocp.isShutdown()) {
                    throw new IOException(new ShutdownChannelGroupException());
                }

// finish the accept
                try {
                    schannel.begin();
                    try {
                        channel.begin();
                        finishAccept();
                    } finally {
                        channel.end();
                    }
                } finally {
                    schannel.end();
                }

// allow another accept before the result is set
                enableAccept();
                result.setResult(channel);
            } catch (Throwable x) {
                enableAccept();
                closeChildChannel();
                if (x instanceof ClosedChannelException)
                    x = new AsynchronousCloseException();
                if (!(x instanceof IOException) && !(x instanceof SecurityException))
                    x = new IOException(x);
                result.setFailure(x);
            }

// if an async cancel has already cancelled the operation then
            // close the new channel so as to free resources
            if (result.isCancelled()) {
                closeChildChannel();
            }

// invoke handler (but not directly)
            Invoker.invokeIndirectly(result);
        }

@Override
        public void failed(int error, IOException x) {
            enableAccept();
            closeChildChannel();

// release waiters
            if (schannel.isOpen()) {
                result.setFailure(x);
            } else {
                result.setFailure(new AsynchronousCloseException());
            }
            Invoker.invokeIndirectly(result);
        }
    }

Future<AsynchronousSocketChannel> implAccept(Object attachment,
        final CompletionHandler<AsynchronousSocketChannel, Object> handler) {
        if (!schannel.isOpen()) {
            Throwable exc = new ClosedChannelException();
            if (handler == null)
                return CompletedFuture.withFailure(exc);
            Invoker.invokeIndirectly(schannel, handler, attachment, null, exc);
            return null;
        }
        if (schannel.isAcceptKilled())
            throw new RuntimeException("Accept not allowed due to cancellation");

// ensure channel is bound to local address
        if (schannel.localAddress == null)
            throw new NotYetBoundException();

// create the socket that will be accepted. The creation of the socket
        // is enclosed by a begin/end for the listener socket to ensure that
        // we check that the listener is open and also to prevent the I/O
        // port from being closed as the new socket is registered.
        WindowsAsynchronousSocketChannelImpl ch = null;
        IOException ioe = null;
        try {
            schannel.begin();
            ch = new WindowsAsynchronousSocketChannelImpl(iocp, false);
        } catch (IOException x) {
            ioe = x;
        } finally {
            schannel.end();
        }
        if (ioe != null) {
            if (handler == null)
                return CompletedFuture.withFailure(ioe);
            Invoker.invokeIndirectly(this.schannel, handler, attachment, null, ioe);
            return null;
        }

// need calling context when there is security manager as
        // permission check may be done in a different thread without
        // any application call frames on the stack
        AccessControlContext acc =
            (System.getSecurityManager() == null) ? null : AccessController.getContext();

PendingFuture<AsynchronousSocketChannel, Object> result =
            new PendingFuture<AsynchronousSocketChannel, Object>(schannel, handler, attachment);

// check and set flag to prevent concurrent accepting
        if (pendingAccept.get() >= maxPending)
            throw new AcceptPendingException();
        pendingAccept.incrementAndGet();
        AcceptTask task = new AcceptTask(ch, acc, dataBuffers.poll(), result);
        result.setContext(task);

// initiate I/O
        if (Iocp.supportsThreadAgnosticIo()) {
            task.run();
        } else {
            Invoker.invokeOnThreadInThreadPool(this.schannel, task);
        }
        return result;
    }

//    //reimplements for performance
    static native void updateAcceptContext(long listenSocket, long acceptSocket,
        InetAddress[] addresses, int[] ports, long dataBuffer) throws IOException;

static native int accept0(long handle, long handle2, long overlapped, long dataBuffer);

}

对应的CPP代码如下:

/*
 * Class:     sun_nio_ch_WindowsMultiAcceptSupport
 * Method:    updateAcceptContext
 * Signature: (JJ[Ljava/net/InetAddress;[IJ)V
 */
JNIEXPORT void JNICALL Java_sun_nio_ch_WindowsMultiAcceptSupport_updateAcceptContext
(JNIEnv *env , jclass clazz, jlong listenSocket, jlong acceptSocket, jobjectArray sockArray,jintArray portArray,jlong buf)
{
    SOCKET s1 = (SOCKET)jlong_to_ptr(listenSocket);
    SOCKET s2 = (SOCKET)jlong_to_ptr(acceptSocket);
    PVOID outputBuffer = (PVOID)jlong_to_ptr(buf);
    INT iLocalAddrLen=0;
    INT iRemoteAddrLen=0;
    SOCKETADDRESS* lpLocalAddr;
    SOCKETADDRESS* lpRemoteAddr;
    jobject localAddr;
    jobject remoteAddr;
    jint ports[2]={0};

setsockopt(s2, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char *)&s1, sizeof(s1));

(lpGetAcceptExSockaddrs)(outputBuffer,
        0,
        sizeof(SOCKETADDRESS)+16,
        sizeof(SOCKETADDRESS)+16,
        (LPSOCKADDR*)&lpLocalAddr,
        &iLocalAddrLen,
        (LPSOCKADDR*)&lpRemoteAddr,
        &iRemoteAddrLen);

localAddr=lpNET_SockaddrToInetAddress(env,(struct sockaddr *)lpLocalAddr,(int *)ports);
    remoteAddr=lpNET_SockaddrToInetAddress(env,(struct sockaddr *)lpRemoteAddr,(int *)(ports+1));

env->SetObjectArrayElement(sockArray,0,localAddr);
    env->SetObjectArrayElement(sockArray,1,remoteAddr);
    env->SetIntArrayRegion(portArray,0,2,ports);

}

/*
 * Class:     sun_nio_ch_WindowsMultiAcceptSupport
 * Method:    accept0
 * Signature: (JJJJ)I
 */
jint JNICALL Java_sun_nio_ch_WindowsMultiAcceptSupport_accept0
  (JNIEnv *env, jclass clazz, jlong listenSocket, jlong acceptSocket, jlong ov, jlong buf)
{

BOOL res;
    SOCKET s1 = (SOCKET)jlong_to_ptr(listenSocket);
    SOCKET s2 = (SOCKET)jlong_to_ptr(acceptSocket);
    PVOID outputBuffer = (PVOID)jlong_to_ptr(buf);

DWORD nread = 0;
    OVERLAPPED* lpOverlapped = (OVERLAPPED*)jlong_to_ptr(ov);
    ZeroMemory((PVOID)lpOverlapped, sizeof(OVERLAPPED));

//why use SOCKETADDRESS?
    //because client may use IPv6 to connect to server.
    res = (lpAcceptEx)(s1,
        s2,
        outputBuffer,
        0,
        sizeof(SOCKETADDRESS)+16,
        sizeof(SOCKETADDRESS)+16,
        &nread,
        lpOverlapped);

if (res == 0) {
        int error = WSAGetLastError();
        
        if (error == ERROR_IO_PENDING) {
            
            return NIO2_IOS_UNAVAILABLE;
        }
    
    
        return NIO2_THROWN;
    }

return 0;

}

这里用到的lpNET_SockaddrToInetAddress是JDK7中NET.DLL暴露的方法,从DLL里加载。相应代码如下:

*
 * Class:     com_yovn_jabhttpd_utilities_SunPackageFixer
 * Method:    initFds
 * Signature: ()V
 */
JNIEXPORT void JNICALL Java_com_yovn_jabhttpd_utilities_SunPackageFixer_initFds
  (JNIEnv *env, jclass clazz)
{

GUID GuidAcceptEx = WSAID_ACCEPTEX;
    GUID GuidTransmitFile = WSAID_TRANSMITFILE;
    GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
    SOCKET s;
    int rv;
    DWORD dwBytes;
    HMODULE hModule;

s = socket(AF_INET, SOCK_STREAM, 0);
    if (s == INVALID_SOCKET) {
        JNU_ThrowByName(env,"java/io/IOException", "socket failed");
        return;
    }
    rv = WSAIoctl(s,
        SIO_GET_EXTENSION_FUNCTION_POINTER,
        (LPVOID)&GuidAcceptEx,
        sizeof(GuidAcceptEx),
        &lpAcceptEx,
        sizeof(lpAcceptEx),
        &dwBytes,
        NULL,
        NULL);
    if (rv != 0)
    {
        JNU_ThrowByName(env, "java/io/IOException","WSAIoctl failed on get AcceptEx ");
        goto _ret;
    }
    rv = WSAIoctl(s,
        SIO_GET_EXTENSION_FUNCTION_POINTER,
        (LPVOID)&GuidTransmitFile,
        sizeof(GuidTransmitFile),
        &lpTransmitFile,
        sizeof(lpTransmitFile),
        &dwBytes,
        NULL,
        NULL);
    if (rv != 0)
    {
        JNU_ThrowByName(env, "java/io/IOException","WSAIoctl failed on get TransmitFile");
        goto _ret;
    }
    rv = WSAIoctl(s,
        SIO_GET_EXTENSION_FUNCTION_POINTER,
        (LPVOID)&GuidGetAcceptExSockAddrs,
        sizeof(GuidGetAcceptExSockAddrs),
        &lpGetAcceptExSockaddrs,
        sizeof(lpGetAcceptExSockaddrs),
        &dwBytes,
        NULL,
        NULL);
    if (rv != 0)
    {
        JNU_ThrowByName(env, "java/io/IOException","WSAIoctl failed on get GetAcceptExSockaddrs");
        goto _ret;
    }

hModule=LoadLibrary("net.dll");
    if(hModule==NULL)
    {
        JNU_ThrowByName(env, "java/io/IOException","can‘t load java net.dll");
        goto _ret;
    }

lpNET_SockaddrToInetAddress=(NET_SockaddrToInetAddress_t)GetProcAddress(hModule,"[email protected]");

if(lpNET_SockaddrToInetAddress==NULL)
    {
        JNU_ThrowByName(env, "java/io/IOException","can‘t resolve _NET_SockaddrToInetAddress function ");
        
        
    }

_ret:
    closesocket(s);
    return;

}

细心的同学可能会发现,在创建socket之前没有初始化WinSock库,因为在这段代码前,我初始化了一个InetSocketAddress对象,这样JVM会加载NET.DLL并初始化WinSock库了。

OK,现在,你可以在支持类上同时发起多个AcceptEx请求了。

PS:基于这个我简单测试了下我的服务器,同时开5000个线程,每个下载3M多点的文件,一分钟内能够全部正确完成。
服务器正在开发中,有兴趣的请加入:http://code.google.com/p/jabhttpd

基于JDK7 NIO2的高性能web服务器实践之二(转)

时间: 2024-08-06 20:32:08

基于JDK7 NIO2的高性能web服务器实践之二(转)的相关文章

高性能web服务器nginx(二)之常用功能举例

一.配置使用nginx 1.提供测试页 [[email protected] ~]# mkdir /www/a.com/htdoc [[email protected] ~]# cat /www/a.com/htdoc/index.html  <h1>www.a.com</h1> [[email protected] ~]# chown -R nginx.nginx /www/a.com/htdoc/ 2.备份配置文件并简要更改配置文件 [[email protected] ~]#

高性能web服务器nginx(一)之基本概念

说明本篇文章大部分参考此人的博文:http://freeloda.blog.51cto.com/2033581/1285722,建议若想继续深入学习nginx时最好先看下此人所写的文章,总结的很详细,然后在找相关的书籍和查阅官方文档学习. 一.NGINX介绍 1 简介 传统上基于进程或线程模型架构的web服务通过每进程或每线程处理并发连接请求,这势必会在网络和I/O操作时产生阻塞,其另一个必然结果则是对内存或CPU的利用率低下.生成一个新的进程/线程需要事先备好其运行时环境,这包括为其分配堆内存

高性能Web服务器Nginx

高性能Web服务器Nginx介绍 Nginx是一款轻量级的Web 服务器/反向代理服务器及电子邮件(IMAP/POP3)代理服务器,并在一个BSD-like 协议下发行.其特点是占有内存少,并发能力强,事实上nginx的并发能力确实在同类型的网页服务器中表现较好,可以运行在UNIX.GUN/LINUX.BSD.MAC OS X以及Microsoft Windows等操作系统中,中国大陆使用nginx网站用户有:百度.京东.新浪.网易.腾讯.淘宝等. Nginx的功能 Nginx的模块从功能上分为

高性能Web服务器Nginx使用指南

Nginx是一个高性能的http服务器和反向代理服务器,是一个高度模块化的web服务器,和Apache的模块化不同,Nginx的模块不支持动态编译,Nginx要加入新的第三方模块的时候,必须先下载模块,然后重新编译Nginx,而Apache只需要将新加入的模块编译成so文件,然后配置文件指定是否加载即可,无需重新编译Apache.并且Nginx的rewrite模块会使用正则表示式进行匹配,因此需要pcre软件库的支持,另外ssl加密需要openssl-devel软件库的支持,gzip压缩传输需要

《Nginx高性能Web服务器》系列分享专栏

<Nginx高性能Web服务器>系列分享专栏 [作者:Poechant] Nginx是目前最流行的基于BSD-like协议.轻量级.高性能的HTTP服务器.反向代理服务器和电子邮件(SMTP/POP3/IMAP)服务器.CSDN的Nginx专栏引领大家Step by Step地一起领略当今最强大高性能的Web服务器. <Nginx高性能Web服务器>已整理成PDF文档,点击可直接下载至本地查阅https://www.webfalse.com/read/203778.html 文章

keepalived+nginx+tomcat搭建高性能web服务器集群

使用keepalived+nginx+tomcat搭建高性能web服务器集群,系统采用centos6.9,前端用nginx做反向代理实现负载均衡,同时结合keepalived对nginx实现高可用,后端使用两台tomcat做动态jsp解析,实现了动静分离. 搭建环境 准备四台服务器 vip: 192.168.75.130master: 192.168.75.131 (安装nginx做反向代理实现负载匀衡,结合keepalived实现高可用)backup: 192.168.75.132 (同上)w

CentOS 5.5下安装mysql5.1.57+php5.2.17(FastCGI)+nginx1.0.1高性能Web服务器 [转载]

CentOS 5.5下安装mysql5.1.57+php5.2.17(FastCGI)+nginx1.0.1高性能Web服务器 [转载] 2012年09月05日 ⁄ Linux技术 ⁄ 共 12362字 ⁄ 字号 小 中 大 ⁄ 暂无评论 ⁄ 阅读 85 views 次 由于生产环境都是freebsd平台,之前也写了一篇FreeBSD下安装 mysql5.1.56+php5.2.17(FastCGI)+nginx1.0.1高性能Web服务器,有童鞋想要帮忙写一篇关于centos下的安 装教程,其

nginx高性能web服务器详解(1)--安装nginx

1. 下载 本次使用nginx-0.1.2.3 版本,下载地址 http://nginx.org/en/download.html  新发布版本 http://nginx.org/download  历史版本 2.上传到linux服务器 sz -bey nginx-0.1.2.3.tar.gz 3.解压 3.1 建立目录  mkdir nginx_123 3.2 解压 tar -zxvf nginx-0.1.2.3.tar.gz ./nginx_123/ 4.配置编译环境 nginx源代码的编译

高性能Web服务器Nginx的配置与部署研究(13)应用模块之Memcached模块+Proxy_Cache双层缓存模式

通过<高性能Web服务器Nginx的配置与部署研究——(11)应用模块之Memcached模块的两大应用场景>一文,我们知道Nginx从Memcached读取数据的方式,如果命中,那么效率是相当高的.那么: 1. 如果不命中呢? 我们可以到相应的数据服务器上读取数据,然后将它缓存到Nginx服务器上,然后再将该数据返回给客户端.这样,对于该资源,只有穿透 Memcached的第一次请求是需要到数据服务器读取的,之后在缓存过期时间之内的所有请求,都是读取Nginx本地的.不过Nginx的 pro