(三)Mina源码解析之IoFilter

本篇文章主要剖析Mina中的过滤器是如何实现的

首先还是引入一个简单的完整的server端的例子,代码如下

public class Server {
	public static void main(String[] args) {
		IoAcceptor acceptor = new NioSocketAcceptor();
		acceptor.getSessionConfig().setReadBufferSize(2048);
		acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 100000);
		acceptor.getFilterChain().addFirst("logFilter", new LoggingFilter());//①加入系统自带的日志过滤器
		acceptor.setHandler(new MyHandler());
		try {
			acceptor.bind(new InetSocketAddress(9123));
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

在代码1处加入了一个系统自带的过滤器,先不用管系统自带的日志过滤器LoggingFilter内部是如何实现的,等了解完Mina的整个Filter的脉络后自然就知道如何实现了,在代码1处通过acceptor拿到FilterChain后(DefaultIoFilterChainBuilder对象是在创建NioSocketAcceptor对象时创建的,如图1所示)

图1

DefaultIoFilterChainBuilder类提供了四种加入过滤器的方法,分别是addFirst(),addLast(),addBefore(),addAfter(),这四个方法里面都调用了一个register()的方法,register方法是这样实现的,就是向一个List里按一定的顺序加入过滤器(先不要管过滤器是如何实现的,后文会详细介绍),也就是说在服务端启动后,客户端连接服务端之前,mina只是把我们添加的过滤器放到了一个list里,我们之前一直说的可是过滤器链,现在明显是一个静态的list不是一个链,那过滤器到底是在什么时候"链"起来的呢,答案是在客户端和服务端建立连接的时候

   private final List<Entry> entries;
   private void register(int index, Entry e) {
        if (contains(e.getName())) {
            throw new IllegalArgumentException("Other filter is using the same name: " + e.getName());
        }

        entries.add(index, e);
    }

让我们看看客户端和服务端连接的时候mina服务端做了什么,mina是在Processor类中处理新的连接和连接传输的消息的(在Mina源码解析(二)中有讲到),每次有新连接进来都会调用addNow方法,让我们看看addNow方法都做了些什么事情

    private boolean addNow(S session) {
        boolean registered = false;

        try {
            init(session);
            registered = true;

            // Build the filter chain of this session.
            IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
            chainBuilder.buildFilterChain(session.getFilterChain());<span style="color:#33CC00;"></span>//①

            // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
            // in AbstractIoFilterChain.fireSessionOpened().
            // Propagate the SESSION_CREATED event up to the chain
            IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
            listeners.fireSessionCreated(session);
        } catch (Exception e) {
            ExceptionMonitor.getInstance().exceptionCaught(e);

            try {
                destroy(session);
            } catch (Exception e1) {
                ExceptionMonitor.getInstance().exceptionCaught(e1);
            } finally {
                registered = false;
            }
        }

        return registered;
    }

在代码1处将存放过滤器的静态的list构造成了一个过滤器链,来看看是如何构建的,首先通过session.getFilterChain()方法拿到一个fiterChain对象,注意是filterChain不是前文提到的DefaultIoFilterChainBuilder,这个filterChain对象是创建session对象时候创建的一个对象,如图2所示

图2

让我们看看创建DefaultIoFilterChain对象的时候做了哪些事情,新建DefaultIoFilterChain的代码如下:

    public DefaultIoFilterChain(AbstractIoSession session) {
        if (session == null) {
            throw new IllegalArgumentException("session");
        }

        this.session = session;
        head = new EntryImpl(null, null, "head", new HeadFilter());//①
        tail = new EntryImpl(head, null, "tail", new TailFilter());//②
        head.nextEntry = tail;
    }

图3

在创建DefaultIoFilterChain的时候为head属性和tail属性进行初始化,学习过数据结构的同学肯定对这两个东西非常熟悉,没错,这就是链表中的头节点和尾节点,EntryImpl这个类就是被设计用来在java中实现链表的,如图3所示EntryImpl包含5个属性,其中prevEntry指向前一个过滤器,nextEntry指向后一个过滤器,name是该过滤器的名称,filter中用于实现过滤的功能(具体实现方法后面会介绍),nextFilter用来调用下一个过滤器,上述①②代码执行过后会生成一个有两个节点的链表,如图4所示

图4

执行完addNow方法的代码①后,过滤器就被链接起来了,如图5所示

图5

过滤器链建立起来后消息是如何由head节点经过过滤器链到达我们进行业务逻辑处理的IoHandler中的呢,要知道这个问题,我们先来看看mina中跟过滤器链有关的类有哪些,在mina中跟过滤器有关的类主要分为3类,第一类是IoFilterChainBuilder接口和它的实现类DefaultIoFilterChainBuilder。这两个类的功能是当IoSession被创建时用来按照预先规定的顺序建立过滤器链的(如果你忘了这两个类可以回上文找找看看那些地方用到了这两个类),IoFilterChainBuilder只提供了一个接口方法就是buildFilterChain()建立过滤器链的方法。实现类DefaultIoFilterChainBuilder中一些其他的方法都是为这个方法提供额外的支持的

第二类是IoFilterChain和DefaultIoFilterChain。这两个类提供了构建过滤器链的功能还有当有events到来的时候将events经过过滤器链转发到IoHandler(怎么转发的后文会说明),每一个IoSessiond都有自己的过滤器链。有的同学可能不明白了构建功能不是上面说的DefaultIoFilterChainBuilder负责的吗怎么又变成DefaultIoFilterChain负责了,其实构建过滤器链的时候DefaultIoFilterChainBuilder调用了DefaultIoFilterChain的构建过滤器链的具体实现,其实这是实现是设计模式的一种:命令模式,不懂也没关系,不会影响你的理解,命令模式在这里我们就不做讨论了

第三类是IoFilter接口和它具体的实现,mina已经为我们提供了若干种过滤器的实现,下面列出了比较常用的一部分实现,IoFilterAdapter实现了IoFilter接口为我们提供了默认的实现,这样我们自己创建过滤器的时候就可以直接继承IoFIlterAdapter接口,这样我们就可以只实现我们感兴趣的方法,这样做可以使我们的代码更加简洁

我们看看HeadFilter是如何实现的,HeadFilter并没有实现sessionCreated、sessionOpened、messageReceived等等一系列方法,所以当消息过来时我们会调用HeadFilter的父类IoFilterAdapter的实现,IoFilterAdapter的实现就是直接调用下一个过滤器链的sessionCreated、sessionOpened、messageReceived等等方法

private class HeadFilter extends IoFilterAdapter {
        @SuppressWarnings("unchecked")
        @Override
        public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {

            AbstractIoSession s = (AbstractIoSession) session;

            // Maintain counters.
            if (writeRequest.getMessage() instanceof IoBuffer) {
                IoBuffer buffer = (IoBuffer) writeRequest.getMessage();
                // I/O processor implementation will call buffer.reset()
                // it after the write operation is finished, because
                // the buffer will be specified with messageSent event.
                buffer.mark();
                int remaining = buffer.remaining();

                if (remaining > 0) {
                    s.increaseScheduledWriteBytes(remaining);
                }
            } else {
                s.increaseScheduledWriteMessages();
            }

            WriteRequestQueue writeRequestQueue = s.getWriteRequestQueue();

            if (!s.isWriteSuspended()) {
                if (writeRequestQueue.isEmpty(session)) {
                    // We can write directly the message
                    s.getProcessor().write(s, writeRequest);
                } else {
                    s.getWriteRequestQueue().offer(s, writeRequest);
                    s.getProcessor().flush(s);
                }
            } else {
                s.getWriteRequestQueue().offer(s, writeRequest);
            }
        }

        @SuppressWarnings("unchecked")
        @Override
        public void filterClose(NextFilter nextFilter, IoSession session) throws Exception {
            ((AbstractIoSession) session).getProcessor().remove(session);
        }
    }

再让我们看看TailFilter是如何实现的,因为TailFilter已经是尾节点了,所以TailFilter的实现并没有再调用下一个过滤器,而是调用了IoHandler中的方法,这样消息就传到了IoHandler中,现在我们可以在IoHandler中处理我们的业务逻辑了

private static class TailFilter extends IoFilterAdapter {
        @Override
        public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception {
            try {
                session.getHandler().sessionCreated(session);
            } finally {
                // Notify the related future.
                ConnectFuture future = (ConnectFuture) session.removeAttribute(SESSION_CREATED_FUTURE);

                if (future != null) {
                    future.setSession(session);
                }
            }
        }

        @Override
        public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception {
            session.getHandler().sessionOpened(session);
        }

        @Override
        public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
            AbstractIoSession s = (AbstractIoSession) session;

            try {
                s.getHandler().sessionClosed(session);
            } finally {
                try {
                    s.getWriteRequestQueue().dispose(session);
                } finally {
                    try {
                        s.getAttributeMap().dispose(session);
                    } finally {
                        try {
                            // Remove all filters.
                            session.getFilterChain().clear();
                        } finally {
                            if (s.getConfig().isUseReadOperation()) {
                                s.offerClosedReadFuture();
                            }
                        }
                    }
                }
            }
        }

        @Override
        public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception {
            session.getHandler().sessionIdle(session, status);
        }

        @Override
        public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception {
            AbstractIoSession s = (AbstractIoSession) session;

            try {
                s.getHandler().exceptionCaught(s, cause);
            } finally {
                if (s.getConfig().isUseReadOperation()) {
                    s.offerFailedReadFuture(cause);
                }
            }
        }

        @Override
        public void inputClosed(NextFilter nextFilter, IoSession session) throws Exception {
            session.getHandler().inputClosed(session);
        }

        @Override
        public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
            AbstractIoSession s = (AbstractIoSession) session;

            if (!(message instanceof IoBuffer)) {
                s.increaseReadMessages(System.currentTimeMillis());
            } else if (!((IoBuffer) message).hasRemaining()) {
                s.increaseReadMessages(System.currentTimeMillis());
            }

            // Update the statistics
            if (session.getService() instanceof AbstractIoService) {
                ((AbstractIoService) session.getService()).getStatistics().updateThroughput(System.currentTimeMillis());
            }

            // Propagate the message
            try {
                session.getHandler().messageReceived(s, message);
            } finally {
                if (s.getConfig().isUseReadOperation()) {
                    s.offerReadFuture(message);
                }
            }
        }

        @Override
        public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
            ((AbstractIoSession) session).increaseWrittenMessages(writeRequest, System.currentTimeMillis());

            // Update the statistics
            if (session.getService() instanceof AbstractIoService) {
                ((AbstractIoService) session.getService()).getStatistics().updateThroughput(System.currentTimeMillis());
            }

            // Propagate the message
            session.getHandler().messageSent(session, writeRequest.getMessage());
        }

        @Override
        public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
            nextFilter.filterWrite(session, writeRequest);
        }

        @Override
        public void filterClose(NextFilter nextFilter, IoSession session) throws Exception {
            nextFilter.filterClose(session);
        }
    }
时间: 2024-09-30 10:30:41

(三)Mina源码解析之IoFilter的相关文章

NIO框架之MINA源码解析(五):NIO超级陷阱和使用同步IO与MINA通信

1.NIO超级陷阱 之所以说NIO超级陷阱,就是因为我在本系列开头的那句话,因为使用缺陷导致客户业务系统瘫痪.当然,我对这个问题进行了很深的追踪,包括对MINA源码的深入了解,但其实之所以会出现这个问题,它的根不是MINA的原因,而是JDK底层的问题. JDK底层在实现nio时,为了能够唤醒等待在io上的线程,在windows平台使用了两个端口建立连接发消息实现.看如下代码: [java] view plain copy print? public class NIOTest { @Test p

NIO框架之MINA源码解析(一):背景

?? "你们的agent占了好多系统的端口,把我们的很多业务系统都给整死了,给我们造成了很大的损失,要求你们的相关领导下周过来道歉"   --   来自我们的一个客户. 怎么可能呢,我们都不相信,我们的agent只占一个端口啊! 事实胜过雄辩,经过查证,确实是由于我们的agent占了好多系统的端口,我看了一下日志,基本把系统可用的端口占完了! 为什么呢?MINA框架私自开的! 由于我们的agent端使用了NIO通信框架MINA,但并没有使用好,造成了这一几乎毁灭行的灾难. 还是先看代码

Handler机制(三)----Looper源码解析

一.Looper Looper对象,顾名思义,直译过来就是循环的意思,从MessageQueue中不断取出message. Class used to run a message loop for a thread. Threads by default do not have a message loop associated with them; to create one, call prepare() in the thread that is to run the loop, and

[Android]Toolbar使用详解(三)——源码解析

更多关于Toolbar的使用请移步Toolbar使用详解系列 从Toolbar的使用一步步解析Toolbar源码 大体架构 API 0.设置导航图标 mToolbar.setNavigationIcon(R.drawable.ic_actionbar_flow); 源码如下 public void setNavigationIcon(int resId) { this.setNavigationIcon(this.mTintManager.getDrawable(resId)); } setNa

NIO框架之MINA源码解析(三):底层通信与责任链模式应用

本文主要介绍下在mina中责任链模式的应用以及mina对于数据读写的处理. 在mina中,对数据的读操作是在processor类里面触发的,收到新消息后就触发读数据链去处理新消息直到自己的业务逻辑代码(IoHandler). 在mina中,数据的写(write)和发(send)差别相对较大,mina中的写消息最终的结果只是把要写的消息经过写数据链处理后的最终结果放在了一个缓存中,并把当前session标记为可发. 数据的发送就是传统中我们所说的发消息,就是把写消息最终处理的结果发送到客户端,待发

NIO框架之MINA源码解析(二):mina核心引擎

MINA的底层还是利用了jdk提供了nio功能,mina只是对nio进行封装,包括MINA用的线程池都是jdk直接提供的. MINA的server端主要有accept.processor.session三部分组成的.其中accept主要负责在指定的端口监听,若有新连接则建立一个新的session:processor则负责处理session对应的发送数据和接收数据并调用上层处理:session则缓存当前连接数据. MINA采用了线程懒启动的技术,即最少启动线程,在MINA server启动的时候,

NIO框架之MINA源码解析(四):粘包与断包处理及编码与解码

1.粘包与段包 粘包:指TCP协议中,发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾.造成的可能原因: 发送端需要等缓冲区满才发送出去,造成粘包 接收方不及时接收缓冲区的包,造成多个包接收 断包:也就是数据不全,比如包太大,就把包分解成多个小包,多次发送,导致每次接收数据都不全. 2.消息传输的格式 消息长度+消息头+消息体  即前N个字节用于存储消息的长度,用于判断当前消息什么时候结束. 消息头+消息体    即固定长度的消息,前几个字节为消息

三.jQuery源码解析之jQuery的框架图

今天有幸被召回母校给即将毕业的学弟学妹们讲我这两年的工作史,看了下母校没啥特别的变化,就是寝室都安了空调,学妹们都非常漂亮而已..好了不扯蛋了,说下今天的主题吧.这些天我在深度定制语法高亮功能的同时发现了博客园提供的一些有意思的函数,甚至有几个博客园都没用到,我也不知道怎么才能触发那些功能..打开这个js就可以看到很多好用的东西了,虽然写的不怎么样,但是至少有这些功能. ps: 推荐安装一个代码格式化的插件,否则一坨看着蛋疼.比如第一个就是 log,方便调试. http://www.qdmm.c

NIO框架之MINA源码解析(转)

http://blog.csdn.net/column/details/nio-mina-source.html http://blog.csdn.net/chaofanwei/article/details/38848085 http://blog.csdn.net/chaofanwei/article/details/38871115 http://blog.csdn.net/chaofanwei/article/details/38920963 http://blog.csdn.net/c