Jetty9 源码初解(2)——IO之Connection

一、概述

查看Jetty-io包,清单如下:

接口类:
ByteBufferPool
ClientConnectionFactory
Connection
Connection.Listener
Connection.UpgradeFrom
Connection.UpgradeTo
EndPoint
ManagedSelector.SelectableEndPoint
NetworkTrafficListener
实体类:
AbstractConnection
AbstractEndPoint
ArrayByteBufferPool
ArrayByteBufferPool.Bucket
ByteArrayEndPoint
ByteBufferPool.Lease
ChannelEndPoint
ClientConnectionFactory.Helper
Connection.Listener.Adapter
FillInterest
IdleTimeout
LeakTrackingByteBufferPool
ManagedSelector
MappedByteBufferPool
MappedByteBufferPool.Tagged
NegotiatingClientConnection
NegotiatingClientConnectionFactory
NetworkTrafficListener.Adapter
NetworkTrafficSelectChannelEndPoint
SelectChannelEndPoint
SelectorManager
WriteFlusher
WriterOutputStream
异常类:
EofException
RuntimeIOException

从名字看几个主要的类可能为:Connection、ByteBufferPool、SelectorManager、EndPoint,因为其他类应该是从中延伸出来的。

二、类分析

首先看Connection接口:

public interface Connection extends Closeable
{
    public void addListener(Listener listener);

    public void onOpen();

    /**
     * <p>Callback method invoked when this {@link Connection} is closed.</p>
     * <p>Creators of the connection implementation are responsible for calling this method.</p>
     */
    public void onClose();

    /**
     * @return the {@link EndPoint} associated with this {@link Connection}
     */
    public EndPoint getEndPoint();
    
    /**
     * <p>Performs a logical close of this connection.</p>
     * <p>For simple connections, this may just mean to delegate the close to the associated
     * {@link EndPoint} but, for example, SSL connections should write the SSL close message
     * before closing the associated {@link EndPoint}.</p>
     */
    @Override
    public void close();

    public int getMessagesIn();
    public int getMessagesOut();
    public long getBytesIn();
    public long getBytesOut();
    public long getCreatedTimeStamp();
    
    public interface UpgradeFrom extends Connection
    {
        /* ------------------------------------------------------------ */
        /** Take the input buffer from the connection on upgrade.
         * <p>This method is used to take any unconsumed input from
         * a connection during an upgrade.
         * @return A buffer of unconsumed input. The caller must return the buffer
         * to the bufferpool when consumed and this connection must not.
         */
        ByteBuffer onUpgradeFrom();
    }
    
    public interface UpgradeTo extends Connection
    {
        /**
         * <p>Callback method invoked when this {@link Connection} is upgraded.</p>
         * <p>This must be called before {@link #onOpen()}.</p>
         * @param prefilled An optional buffer that can contain prefilled data. Typically this
         * results from an upgrade of one protocol to the other where the old connection has buffered
         * data destined for the new connection.  The new connection must take ownership of the buffer
         * and is responsible for returning it to the buffer pool
         */
        void onUpgradeTo(ByteBuffer prefilled);
    }
    
    
    /* ------------------------------------------------------------ */
    /** 
     * <p>A Listener for connection events.</p>
     * <p>Listeners can be added to a {@link Connection} to get open and close events.
     * The AbstractConnectionFactory implements a pattern where objects implement
     * this interface that have been added via {@link Container#addBean(Object)} to
     * the Connector or ConnectionFactory are added as listeners to all new connections
     * </p>
     */
    public interface Listener
    {
        public void onOpened(Connection connection);

        public void onClosed(Connection connection);

        public static class Adapter implements Listener
        {
            @Override
            public void onOpened(Connection connection)
            {
            }

            @Override
            public void onClosed(Connection connection)
            {
            }
        }
    }
}

Connection接口主要用来添加监听,并定义监听接口Listener。

再看一个实现了Connection接口的抽象类AbstractConnection:

public abstract class AbstractConnection implements Connection
{
    private static final Logger LOG = Log.getLogger(AbstractConnection.class);

    private final List<Listener> listeners = new CopyOnWriteArrayList<>();
    private final long _created=System.currentTimeMillis();
    private final EndPoint _endPoint;
    private final Executor _executor;
    private final Callback _readCallback;
    private int _inputBufferSize=2048;

    protected AbstractConnection(EndPoint endp, Executor executor)
    {
        if (executor == null)
            throw new IllegalArgumentException("Executor must not be null!");
        _endPoint = endp;
        _executor = executor;
        _readCallback = new ReadCallback();
    }

    @Override
    public void addListener(Listener listener)
    {
        listeners.add(listener);
    }

    public int getInputBufferSize()
    {
        return _inputBufferSize;
    }

    public void setInputBufferSize(int inputBufferSize)
    {
        _inputBufferSize = inputBufferSize;
    }

    protected Executor getExecutor()
    {
        return _executor;
    }

    @Deprecated
    public boolean isDispatchIO()
    {
        return false;
    }

    protected void failedCallback(final Callback callback, final Throwable x)
    {
        if (callback.isNonBlocking())
        {
            try
            {
                callback.failed(x);
            }
            catch (Exception e)
            {
                LOG.warn(e);
            }
        }
        else
        {
            try
            {
                getExecutor().execute(new Runnable()
                {
                    @Override
                    public void run()
                    {
                        try
                        {
                            callback.failed(x);
                        }
                        catch (Exception e)
                        {
                            LOG.warn(e);
                        }
                    }
                });
            }
            catch(RejectedExecutionException e)
            {
                LOG.debug(e);
                callback.failed(x);
            }
        }
    }

    /**
     * <p>Utility method to be called to register read interest.</p>
     * <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
     * will be called back as appropriate.</p>
     * @see #onFillable()
     */
    public void fillInterested()
    {
        if (LOG.isDebugEnabled())
            LOG.debug("fillInterested {}",this);
        getEndPoint().fillInterested(_readCallback);
    }

    public boolean isFillInterested()
    {
        return getEndPoint().isFillInterested();
    }

    /**
     * <p>Callback method invoked when the endpoint is ready to be read.</p>
     * @see #fillInterested()
     */
    public abstract void onFillable();

    /**
     * <p>Callback method invoked when the endpoint failed to be ready to be read.</p>
     * @param cause the exception that caused the failure
     */
    protected void onFillInterestedFailed(Throwable cause)
    {
        if (LOG.isDebugEnabled())
            LOG.debug("{} onFillInterestedFailed {}", this, cause);
        if (_endPoint.isOpen())
        {
            boolean close = true;
            if (cause instanceof TimeoutException)
                close = onReadTimeout();
            if (close)
            {
                if (_endPoint.isOutputShutdown())
                    _endPoint.close();
                else
                {
                    _endPoint.shutdownOutput();
                    fillInterested();
                }
            }
        }
    }

    /**
     * <p>Callback method invoked when the endpoint failed to be ready to be read after a timeout</p>
     * @return true to signal that the endpoint must be closed, false to keep the endpoint open
     */
    protected boolean onReadTimeout()
    {
        return true;
    }

    @Override
    public void onOpen()
    {
        if (LOG.isDebugEnabled())
            LOG.debug("onOpen {}", this);

        for (Listener listener : listeners)
            listener.onOpened(this);
    }

    @Override
    public void onClose()
    {
        if (LOG.isDebugEnabled())
            LOG.debug("onClose {}",this);

        for (Listener listener : listeners)
            listener.onClosed(this);
    }

    @Override
    public EndPoint getEndPoint()
    {
        return _endPoint;
    }

    @Override
    public void close()
    {
        getEndPoint().close();
    }

    @Override
    public int getMessagesIn()
    {
        return -1;
    }

    @Override
    public int getMessagesOut()
    {
        return -1;
    }

    @Override
    public long getBytesIn()
    {
        return -1;
    }

    @Override
    public long getBytesOut()
    {
        return -1;
    }

    @Override
    public long getCreatedTimeStamp()
    {
        return _created;
    }

    @Override
    public String toString()
    {
        return String.format("%[email protected]%x[%s]",
                getClass().getSimpleName(),
                hashCode(),
                _endPoint);
    }

    private class ReadCallback implements Callback
    {
        @Override
        public void succeeded()
        {
            onFillable();
        }

        @Override
        public void failed(final Throwable x)
        {
            onFillInterestedFailed(x);
        }

        @Override
        public String toString()
        {
            return String.format("[email protected]%x{%s}", AbstractConnection.this.hashCode(),AbstractConnection.this);
        }
    }
}
时间: 2024-11-10 11:51:07

Jetty9 源码初解(2)——IO之Connection的相关文章

Jetty9 源码初解(2)——IO之EndPoint

一.概述 EndPoint作为jetty-io的一个重要组成部分,是基于javaNIO的封装,用于底层网络的读写,一旦网络读写准备好,会调用相应的connection的handle方法. 二.类分析 EndPoint源码如下: /**  *  * 一个传输端点  *  * <h3>异步方法</h3>  */ public interface EndPoint extends Closeable {     /* -----------------------------------

Jetty9 源码初解(1)——Http

一.概述 个人是个实践型人员,所以打算看着jetty源码,从头开始组装Jetty. 首先从github.com里找到jetty-project项目,用git下载源码,本文以9.3.x为例. 首先Jetty作为一个web server,必然需要支持HTTP. 查看Jetty-http项目下http包下一共有下列几个类: 接口: HttpContent HttpFieldPreEncoder HttpParser.HttpHandler HttpParser.RequestHandler HttpP

Jetty9 源码初解(1)——HTTP前传

转自:http://blog.csdn.net/gueter/archive/2007/03/08/1524447.aspx Author :Jeffrey 引言 HTTP是一个属于应用层的面向对象的协议,由于其简捷.快速的方式,适用于分布式超媒体信息系统.它于1990年提出,经过几年的使用与发展,得到不断地完善和扩展.目前在WWW中使用的是HTTP/1.0的第六版,HTTP/1.1的规范化工作正在进行之中,而且HTTP-NG(Next Generation of HTTP)的建议已经提出.HT

libevent源码分析一--io事件响应

这篇文章将分析libevent如何组织io事件,如何捕捉事件的发生并进行相应的操作.这里不会详细分析event与event_base的细节,仅描述io事件如何存储与如何响应. 1.  select libevent的实现io事件的backend实际上使用的是io复用接口,如select, poll, epoll等,这里以最简单的select为例进行说明.首先简单介绍一下select接口: int select(int nfds, fd_set *readfds, fd_set *writefds

【原创】k8s源码分析-----kube-scheduler

本文转自本人空间:http://user.qzone.qq.com/29185807/blog/1459831332 源码为k8s v1.1.1稳定版本 一.主要流程 1.main入口 源码在k8s.io/kubernetes/plugin/cmd/kube-scheduler 这种封装是k8s里面一贯的封装风格,就不再多说了 源码在k8s.io/kubernetes/plugin/cmd/kube-scheduler/app 继续往下 真正的入口 下面有个ratelimiter 在factor

【原创】k8s源码分析-----kube-proxy(2)ProxyServer

本文QQ空间链接:http://user.qzone.qq.com/29185807/blog/1460685179 本文csdn博客链接:http://blog.csdn.net/screscent/article/details/51159168 k8s源码为v1.1.1稳定版本 1.ProxyServer的构建与主流程 源码在k8s.io\kubernetes\cmd\kube-proxy main函数入口 这个就不再多说了,这种结构已经见多了 继续进入源码k8s.io\kubernete

【原创】k8s源码分析-----kubectl(1)api.RESTMapper

本文QQ空间链接:http://user.qzone.qq.com/29185807/blog/1460961715 本文csdn博文链接:http://blog.csdn.net/screscent/article/details/51179485 源码为k8s v1.1.1稳定版本 api. RESTMapper是kube-apiserver和kubectl的基础,在讲解kube-apiserver的时候,我们就有简单的讲解api. RESTMapper,但并没有系统的讲解.那么这一章,我们

MYC编译器源码分析

前文.NET框架源码解读之MYC编译器讲了MyC编译器的架构,整个编译器是用C#语言写的,上图列出了MyC编译器编译一个C源文件的过程,编译主路径如下: 首先是入口Main函数用来解析命令行参数,读取源文件,并开始编译过程.Main函数在MyC.cs文件,而IO.cs文件主要保存读取源码文件的相关操作.下表是Main函数的源码(批注用注释的方式显示),IO.cs文件用单独的一个小节说明: public static void Main() { try { // 看源码注释,代码是99年写的,也就

epoll源码分析(基于linux-5.1.4)

API epoll提供给用户进程的接口有如下四个,本文基于linux-5.1.4源码详细分析每个API具体做了啥工作,通过UML时序图理清内核内部的函数调用关系. int epoll_create1(int size): 创建一个epfd句柄,size为0时等价于int epoll_create(0). int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event): 向epfd上添加/修改/删除fd. int epoll_w