JDK7 NIO2 实践: 增加 TransmitFile支持(转)

JDK7的NIO2特性或许是我最期待的,我一直想基于它写一个高性能的Java Http Server.现在这个想法终于可以实施了。
本人基于目前最新的JDK7 b76开发了一个HTTP Server性能确实不错。
在windows平台上NIO2采用AccpetEx来异步接受连接,并且读写全都关联到IOCP完成端口。不仅如此,为了方便开发者使用,连IOCP工作线程都封装好了,你只要提供线程池就OK。

但是要注意,IOCP工作线程的线程池必须是 Fix的,因为你发出的读写请求都关联到相应的线程上,如果线程死了,那读写完成情况是不知道的。

作为一个Http Server,传送文件是必不可少的功能,那一般文件的传送都是要把程序里的buffer拷贝到内核的buffer,由内核发送出去的。windows平台上为这种情况提供了很好的解决方案,使用TransmitFile接口

BOOL TransmitFile(
    SOCKET hSocket,
    HANDLE hFile,
    DWORD nNumberOfBytesToWrite,
    DWORD nNumberOfBytesPerSend,
    LPOVERLAPPED lpOverlapped,
    LPTRANSMIT_FILE_BUFFERS lpTransmitBuffers,
    DWORD dwFlags
);

你只要把文件句柄发送给内核就行了,内核帮你搞定其余的,真正做到Zero-Copy.
但是很不幸,NIO2里AsynchronousSocketChannel没有提供这样的支持。而为HTTP Server的性能考量,本人只好自己增加这个支持。

要无缝支持,这个必须得表现的跟 Read /Write一样,有完成的通知,通知传送多少数据,等等。

仔细读完sun的IOCP实现以后发现这部分工作他们封装得很好,基本只要往他们的框架里加东西就好了。
为了能访问他们的框架代码,我定义自己的TransmitFile支持类在sun.nio.ch包里,以获得最大的权限。

package sun.nio.ch;

import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.WritePendingException;
import java.util.concurrent.Future;

/**
 * @author Yvon
 * 
 */
public class WindowsTransmitFileSupport {
   
   //Sun‘s NIO2 channel  implementation class 
    private WindowsAsynchronousSocketChannelImpl channel;
   
    //nio2 framework core data structure
    PendingIoCache ioCache;

//some field retrieve from sun channel implementation class 
    private Object writeLock;
    private Field writingF;
    private Field writeShutdownF;
    private Field writeKilledF; // f

WindowsTransmitFileSupport()
    {
        //dummy one for JNI code
    }

/**
     * 
     */
    public WindowsTransmitFileSupport(
            AsynchronousSocketChannel
             channel) {

this.channel = (WindowsAsynchronousSocketChannelImpl)channel;
        try {
        // Initialize the fields
            Field f = WindowsAsynchronousSocketChannelImpl.class
                    .getDeclaredField("ioCache");
            f.setAccessible(true);
            ioCache = (PendingIoCache) f.get(channel);
            f = AsynchronousSocketChannelImpl.class
                    .getDeclaredField("writeLock");
            f.setAccessible(true);
            writeLock = f.get(channel);
            writingF = AsynchronousSocketChannelImpl.class
                    .getDeclaredField("writing");
            writingF.setAccessible(true);

writeShutdownF = AsynchronousSocketChannelImpl.class
                    .getDeclaredField("writeShutdown");
            writeShutdownF.setAccessible(true);

writeKilledF = AsynchronousSocketChannelImpl.class
                    .getDeclaredField("writeKilled");
            writeKilledF.setAccessible(true);

} catch (NoSuchFieldException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (SecurityException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IllegalArgumentException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

/**
     * Implements the task to initiate a write and the handler to consume the
     * result when the send file completes.
     */
    private class SendFileTask<V, A> implements Runnable, Iocp.ResultHandler {
        private final PendingFuture<V, A> result;
        private final long file;//file is windows file HANDLE

SendFileTask(long file, PendingFuture<V, A> result) {
            this.result = result;
            this.file = file;
        }

@Override
        // @SuppressWarnings("unchecked")
        public void run() {
            long overlapped = 0L;
            boolean pending = false;
            boolean shutdown = false;

try {
                channel.begin();

// get an OVERLAPPED structure (from the cache or allocate)
                overlapped = ioCache.add(result);
                int n = transmitFile0(channel.handle, file, overlapped);
                if (n == IOStatus.UNAVAILABLE) {
                    // I/O is pending
                    pending = true;
                    return;
                }
                if (n == IOStatus.EOF) {
                    // special case for shutdown output
                    shutdown = true;
                    throw new ClosedChannelException();
                }
                // write completed immediately
                throw new InternalError("Write completed immediately");
            } catch (Throwable x) {
                // write failed. Enable writing before releasing waiters.
                channel.enableWriting();
                if (!shutdown && (x instanceof ClosedChannelException))
                    x = new AsynchronousCloseException();
                if (!(x instanceof IOException))
                    x = new IOException(x);
                result.setFailure(x);
            } finally {
                // release resources if I/O not pending
                if (!pending) {
                    if (overlapped != 0L)
                        ioCache.remove(overlapped);
                
                }
                channel.end();
            }

// invoke completion handler
            Invoker.invoke(result);
        }

/**
         * Executed when the I/O has completed
         */
        @Override
        @SuppressWarnings("unchecked")
        public void completed(int bytesTransferred, boolean canInvokeDirect) {

// release waiters if not already released by timeout
            synchronized (result) {
                if (result.isDone())
                    return;
                channel.enableWriting();

result.setResult((V) Integer.valueOf(bytesTransferred));

}
            if (canInvokeDirect) {
                Invoker.invokeUnchecked(result);
            } else {
                Invoker.invoke(result);
            }
        }

@Override
        public void failed(int error, IOException x) {
            // return direct buffer to cache if substituted

// release waiters if not already released by timeout
            if (!channel.isOpen())
                x = new AsynchronousCloseException();

synchronized (result) {
                if (result.isDone())
                    return;
                channel.enableWriting();
                result.setFailure(x);
            }
            Invoker.invoke(result);
        }

}

public <V extends Number, A> Future<V> sendFile(long file, A att,
            CompletionHandler<V, ? super A> handler) {

boolean closed = false;
        if (channel.isOpen()) {
            if (channel.remoteAddress == null)
                throw new NotYetConnectedException();

// check and update state
            synchronized (writeLock) {
                try{
                if (writeKilledF.getBoolean(channel))
                    throw new IllegalStateException(
                            "Writing not allowed due to timeout or cancellation");
                if (writingF.getBoolean(channel))
                    throw new WritePendingException();
                if (writeShutdownF.getBoolean(channel)) {
                    closed = true;
                } else {
                    writingF.setBoolean(channel, true);
                }
                }catch(Exception e)
                {
                    IllegalStateException ise=new IllegalStateException(" catch exception when write");
                    ise.initCause(e);
                    throw ise;
                }
            }
        } else {
            closed = true;
        }

// channel is closed or shutdown for write
        if (closed) {
            Throwable e = new ClosedChannelException();
            if (handler == null)
                return CompletedFuture.withFailure(e);
            Invoker.invoke(channel, handler, att, null, e);
            return null;
        }

return implSendFile(file,att,handler);
    }

<V extends Number, A> Future<V> implSendFile(long file, A attachment,
            CompletionHandler<V, ? super A> handler) {
        // setup task
        PendingFuture<V, A> result = new PendingFuture<V, A>(channel, handler,
                attachment);
        SendFileTask<V,A> sendTask=new SendFileTask<V,A>(file,result);
        result.setContext(sendTask);
        // initiate I/O (can only be done from thread in thread pool)
        // initiate I/O
        if (Iocp.supportsThreadAgnosticIo()) {
            sendTask.run();
        } else {
            Invoker.invokeOnThreadInThreadPool(channel, sendTask);
        }
        return result;
    }
    
    private native int transmitFile0(long handle, long file,
            long overlapped);
    
}

这个操作跟默认实现的里的write操作是很像的,只是最后调用的本地方法不一样。。

接下来,我们怎么使用呢,这个类是定义在sun的包里的,直接用的话,会报IllegalAccessError,因为我们的类加载器跟初始化加载器是不一样的。
解决办法一个是通过启动参数-Xbootclasspath,让我们的包被初始加载器加载。我个人不喜欢这种办法,所以就采用JNI来定义我们的windows TransmitFile支持类。

这样我们的工作算是完成了,注意,发送文件的时候传得是文件句柄,这样做的好处是你可以更好的控制,一般是在发送前,打开文件句柄,完成后在回调通知方法里关闭文件句柄。

有兴趣的同学可以看看我的HTTP server项目:
http://code.google.com/p/jabhttpd/

目前基本功能实现得差不多,做了些简单的测试,性能比较满意。这个服务器不打算支持servlet api,基本是专门给做基于长连接模式通信的定做的。

JDK7 NIO2 实践: 增加 TransmitFile支持(转)

时间: 2024-10-15 03:10:13

JDK7 NIO2 实践: 增加 TransmitFile支持(转)的相关文章

基于JDK7 NIO2的高性能web服务器实践之二(转)

前一篇博客,我简单提了下怎么为NIO2增加TransmitFile支持,文件传送吞吐量是一个性能关注点,此外,并发连接数也是重要的关注点. 不过JDK7中又一次做了简单的实现,不支持同时投递多个AcceptEx请求,只支持一次一个,返回后再投递.这样,客户端连接的接受速度必然大打折扣.不知道为什么sun会做这样的实现,WSASend()/WSAReceive()一次只允许一个还是可以理解,毕竟简化了编程,不用考虑封包乱序问题.也降低了内存耗尽的风险.AcceptEx却没有这样的理由了. 于是再一

Python爬虫之路——简单网页抓图升级版(增加多线程支持)

转载自我的博客:http://www.mylonly.com/archives/1418.html 经过两个晚上的奋斗,将上一篇文章介绍的爬虫稍微改进了下(Python爬虫之路--简单网页抓图),主要是将获取图片链接任务和下载图片任务用线程分开来处理了,而且这次的爬虫不仅仅可以爬第一页的图片链接的,整个http://desk.zol.com.cn/meinv/下面的图片都会被爬到,而且提供了多种分辨率图片的文件下载,具体设置方法代码注释里面有介绍. 这次的代码仍然有点不足,Ctrl-C无法终止程

为IIS增加PHP支持

环境: win2008x64 + PHP5.3 为IIS增加PHP支持

TeleMCU视频会议系统增加字幕支持

本文原创自 http://blog.csdn.net/voipmaker  转载注明出处. 最新版本TeleMCU增加了字幕支持,与会者可以看到其他人的名字,做法是在与会者的视频上overlay 文本字幕, 下图是三个客户端参与的7分屏视频会议画面,一个PC客户端,一个WebRTC客户端通过Chrome浏览器参与视频会议 ,一个iphone手机,每个画面左下角有参会者的标识. TeleMCU视频会议系统增加字幕支持

为Phonegap Android平台增加websocket支持,使默认成为socket.io首选通

为Phonegap Android平台增加websocket支持,使默认成为socket.io首选通道选择 广而告之 使用socket.io作为跨浏览器平台的实时推送首选,经测试在各个主流浏览器上测试都确实具有良好的下实时表现.这里为推广socketio-netty服务器端实现哈,做次广告,同时预热一下: socketio-netty : 又一款socket.io服务器端实现,兼容0.9-1.0版本~ 示范目的 我们要构建一个在市面上常见浏览器上都可以正常运行的集体聊天应用,保证在IE6+,Fi

为OLED屏增加GUI支持4:文本框控件

本文博客链接:http://blog.csdn.net/jdh99,作者:jdh,转载请注明. 环境: 主机:WIN10 开发环境:MDK5.13 MCU:STM32F103 说明: 本文定义了文本框控件.在gui中增加了字库支持后,就可以用文本框来显示字符. 源代码: gui_widget_text.h /** * Copyright (c), 2015-2025 * @file gui_widget_text.h * @brief 文本控件头文件 * @author jdh * @date

nginx 增加 spdy 支持并测试

安装spdy nginx从1.5开始是支持spdy格式的. http://nginx.org/en/docs/http/ngx_http_spdy_module.html#example 最新的nginx当然也支持,是spdy3. 编译的时候直接增加参数: –with-http_spdy_module ,即可. spdy配置 如果安装成功了spdy,直接修改nginx配置即可 server { listen 443 ssl spdy; ssl_certificate server.crt; ss

TensorFlow(2):给TensorFlow Image 打补丁增加 TuShare 支持

1,关于TuShare TuShare 是一个python的lib 库非常好用. 并且是适合国内的股票市场的,可以直接下载国内的股票数据. 非常的方便. 同事 TensorFlow 已经支持了 Numpy.直接在这个上面增加TuShare类库就好了. 而且docker 的有点就出来了.一层一层的叠加了. 2,增加镜像 和上次一样重新增加一个启动脚本: http://blog.csdn.net/freewebsys/article/details/70237003 vi run_jupyter.s

改造Velocity模板引擎让$[!]{}输出默认进行html转义,并增加$#{}语法支持不转义输出

一直以来在项目中使用Apache Velocity模板引擎作为视图层输出,为了解决XSS漏洞,需要对输出到页面的内容进行HTML转义,我一般采用2种方式实现: 使用过滤器 Filter,在其中进行 HttpServletRequestWrapper 的 getParameter( )等方法重载,在底层进行HTML转义,然后页面直接输出: 这种方式很轻松很直接,业务代码不需要修改就完成了所有的转义工作:但是也带来了问题:修改了用户的原始输入数据,如果需要用到用户的原始输入数据,又得反转义回去,很麻