本篇文章主要剖析Mina中的过滤器是如何实现的
首先还是引入一个简单的完整的server端的例子,代码如下
public class Server { public static void main(String[] args) { IoAcceptor acceptor = new NioSocketAcceptor(); acceptor.getSessionConfig().setReadBufferSize(2048); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 100000); acceptor.getFilterChain().addFirst("logFilter", new LoggingFilter());//①加入系统自带的日志过滤器 acceptor.setHandler(new MyHandler()); try { acceptor.bind(new InetSocketAddress(9123)); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
在代码1处加入了一个系统自带的过滤器,先不用管系统自带的日志过滤器LoggingFilter内部是如何实现的,等了解完Mina的整个Filter的脉络后自然就知道如何实现了,在代码1处通过acceptor拿到FilterChain后(DefaultIoFilterChainBuilder对象是在创建NioSocketAcceptor对象时创建的,如图1所示)
图1
DefaultIoFilterChainBuilder类提供了四种加入过滤器的方法,分别是addFirst(),addLast(),addBefore(),addAfter(),这四个方法里面都调用了一个register()的方法,register方法是这样实现的,就是向一个List里按一定的顺序加入过滤器(先不要管过滤器是如何实现的,后文会详细介绍),也就是说在服务端启动后,客户端连接服务端之前,mina只是把我们添加的过滤器放到了一个list里,我们之前一直说的可是过滤器链,现在明显是一个静态的list不是一个链,那过滤器到底是在什么时候"链"起来的呢,答案是在客户端和服务端建立连接的时候
private final List<Entry> entries; private void register(int index, Entry e) { if (contains(e.getName())) { throw new IllegalArgumentException("Other filter is using the same name: " + e.getName()); } entries.add(index, e); }
让我们看看客户端和服务端连接的时候mina服务端做了什么,mina是在Processor类中处理新的连接和连接传输的消息的(在Mina源码解析(二)中有讲到),每次有新连接进来都会调用addNow方法,让我们看看addNow方法都做了些什么事情
private boolean addNow(S session) { boolean registered = false; try { init(session); registered = true; // Build the filter chain of this session. IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder(); chainBuilder.buildFilterChain(session.getFilterChain());<span style="color:#33CC00;"></span>//① // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here // in AbstractIoFilterChain.fireSessionOpened(). // Propagate the SESSION_CREATED event up to the chain IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners(); listeners.fireSessionCreated(session); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); try { destroy(session); } catch (Exception e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } finally { registered = false; } } return registered; }
在代码1处将存放过滤器的静态的list构造成了一个过滤器链,来看看是如何构建的,首先通过session.getFilterChain()方法拿到一个fiterChain对象,注意是filterChain不是前文提到的DefaultIoFilterChainBuilder,这个filterChain对象是创建session对象时候创建的一个对象,如图2所示
图2
让我们看看创建DefaultIoFilterChain对象的时候做了哪些事情,新建DefaultIoFilterChain的代码如下:
public DefaultIoFilterChain(AbstractIoSession session) { if (session == null) { throw new IllegalArgumentException("session"); } this.session = session; head = new EntryImpl(null, null, "head", new HeadFilter());//① tail = new EntryImpl(head, null, "tail", new TailFilter());//② head.nextEntry = tail; }
图3
在创建DefaultIoFilterChain的时候为head属性和tail属性进行初始化,学习过数据结构的同学肯定对这两个东西非常熟悉,没错,这就是链表中的头节点和尾节点,EntryImpl这个类就是被设计用来在java中实现链表的,如图3所示EntryImpl包含5个属性,其中prevEntry指向前一个过滤器,nextEntry指向后一个过滤器,name是该过滤器的名称,filter中用于实现过滤的功能(具体实现方法后面会介绍),nextFilter用来调用下一个过滤器,上述①②代码执行过后会生成一个有两个节点的链表,如图4所示
图4
执行完addNow方法的代码①后,过滤器就被链接起来了,如图5所示
图5
过滤器链建立起来后消息是如何由head节点经过过滤器链到达我们进行业务逻辑处理的IoHandler中的呢,要知道这个问题,我们先来看看mina中跟过滤器链有关的类有哪些,在mina中跟过滤器有关的类主要分为3类,第一类是IoFilterChainBuilder接口和它的实现类DefaultIoFilterChainBuilder。这两个类的功能是当IoSession被创建时用来按照预先规定的顺序建立过滤器链的(如果你忘了这两个类可以回上文找找看看那些地方用到了这两个类),IoFilterChainBuilder只提供了一个接口方法就是buildFilterChain()建立过滤器链的方法。实现类DefaultIoFilterChainBuilder中一些其他的方法都是为这个方法提供额外的支持的
第二类是IoFilterChain和DefaultIoFilterChain。这两个类提供了构建过滤器链的功能还有当有events到来的时候将events经过过滤器链转发到IoHandler(怎么转发的后文会说明),每一个IoSessiond都有自己的过滤器链。有的同学可能不明白了构建功能不是上面说的DefaultIoFilterChainBuilder负责的吗怎么又变成DefaultIoFilterChain负责了,其实构建过滤器链的时候DefaultIoFilterChainBuilder调用了DefaultIoFilterChain的构建过滤器链的具体实现,其实这是实现是设计模式的一种:命令模式,不懂也没关系,不会影响你的理解,命令模式在这里我们就不做讨论了
第三类是IoFilter接口和它具体的实现,mina已经为我们提供了若干种过滤器的实现,下面列出了比较常用的一部分实现,IoFilterAdapter实现了IoFilter接口为我们提供了默认的实现,这样我们自己创建过滤器的时候就可以直接继承IoFIlterAdapter接口,这样我们就可以只实现我们感兴趣的方法,这样做可以使我们的代码更加简洁
我们看看HeadFilter是如何实现的,HeadFilter并没有实现sessionCreated、sessionOpened、messageReceived等等一系列方法,所以当消息过来时我们会调用HeadFilter的父类IoFilterAdapter的实现,IoFilterAdapter的实现就是直接调用下一个过滤器链的sessionCreated、sessionOpened、messageReceived等等方法
private class HeadFilter extends IoFilterAdapter { @SuppressWarnings("unchecked") @Override public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { AbstractIoSession s = (AbstractIoSession) session; // Maintain counters. if (writeRequest.getMessage() instanceof IoBuffer) { IoBuffer buffer = (IoBuffer) writeRequest.getMessage(); // I/O processor implementation will call buffer.reset() // it after the write operation is finished, because // the buffer will be specified with messageSent event. buffer.mark(); int remaining = buffer.remaining(); if (remaining > 0) { s.increaseScheduledWriteBytes(remaining); } } else { s.increaseScheduledWriteMessages(); } WriteRequestQueue writeRequestQueue = s.getWriteRequestQueue(); if (!s.isWriteSuspended()) { if (writeRequestQueue.isEmpty(session)) { // We can write directly the message s.getProcessor().write(s, writeRequest); } else { s.getWriteRequestQueue().offer(s, writeRequest); s.getProcessor().flush(s); } } else { s.getWriteRequestQueue().offer(s, writeRequest); } } @SuppressWarnings("unchecked") @Override public void filterClose(NextFilter nextFilter, IoSession session) throws Exception { ((AbstractIoSession) session).getProcessor().remove(session); } }
再让我们看看TailFilter是如何实现的,因为TailFilter已经是尾节点了,所以TailFilter的实现并没有再调用下一个过滤器,而是调用了IoHandler中的方法,这样消息就传到了IoHandler中,现在我们可以在IoHandler中处理我们的业务逻辑了
private static class TailFilter extends IoFilterAdapter { @Override public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception { try { session.getHandler().sessionCreated(session); } finally { // Notify the related future. ConnectFuture future = (ConnectFuture) session.removeAttribute(SESSION_CREATED_FUTURE); if (future != null) { future.setSession(session); } } } @Override public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception { session.getHandler().sessionOpened(session); } @Override public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { AbstractIoSession s = (AbstractIoSession) session; try { s.getHandler().sessionClosed(session); } finally { try { s.getWriteRequestQueue().dispose(session); } finally { try { s.getAttributeMap().dispose(session); } finally { try { // Remove all filters. session.getFilterChain().clear(); } finally { if (s.getConfig().isUseReadOperation()) { s.offerClosedReadFuture(); } } } } } } @Override public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception { session.getHandler().sessionIdle(session, status); } @Override public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception { AbstractIoSession s = (AbstractIoSession) session; try { s.getHandler().exceptionCaught(s, cause); } finally { if (s.getConfig().isUseReadOperation()) { s.offerFailedReadFuture(cause); } } } @Override public void inputClosed(NextFilter nextFilter, IoSession session) throws Exception { session.getHandler().inputClosed(session); } @Override public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception { AbstractIoSession s = (AbstractIoSession) session; if (!(message instanceof IoBuffer)) { s.increaseReadMessages(System.currentTimeMillis()); } else if (!((IoBuffer) message).hasRemaining()) { s.increaseReadMessages(System.currentTimeMillis()); } // Update the statistics if (session.getService() instanceof AbstractIoService) { ((AbstractIoService) session.getService()).getStatistics().updateThroughput(System.currentTimeMillis()); } // Propagate the message try { session.getHandler().messageReceived(s, message); } finally { if (s.getConfig().isUseReadOperation()) { s.offerReadFuture(message); } } } @Override public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { ((AbstractIoSession) session).increaseWrittenMessages(writeRequest, System.currentTimeMillis()); // Update the statistics if (session.getService() instanceof AbstractIoService) { ((AbstractIoService) session.getService()).getStatistics().updateThroughput(System.currentTimeMillis()); } // Propagate the message session.getHandler().messageSent(session, writeRequest.getMessage()); } @Override public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { nextFilter.filterWrite(session, writeRequest); } @Override public void filterClose(NextFilter nextFilter, IoSession session) throws Exception { nextFilter.filterClose(session); } }