NIO框架之MINA源码解析(三):底层通信与责任链模式应用

本文主要介绍下在mina中责任链模式的应用以及mina对于数据读写的处理。

在mina中,对数据的读操作是在processor类里面触发的,收到新消息后就触发读数据链去处理新消息直到自己的业务逻辑代码(IoHandler)。

在mina中,数据的写(write)和发(send)差别相对较大,mina中的写消息最终的结果只是把要写的消息经过写数据链处理后的最终结果放在了一个缓存中,并把当前session标记为可发。

数据的发送就是传统中我们所说的发消息,就是把写消息最终处理的结果发送到客户端,待发送完成后,就调用发数据链进行操作,最后再调用IoHandlerde 发方法。

1、触发点

1.1、读数据

读操作是在Processor中的触发的,Processor是AbstractPollingIoProcessor的内部私有类。

Processor中有一个死循环,循环调用Selector的select方法,若有新消息,则进行process()。

详见Processor代码。

1.2、写数据

写操作很简单,是调用session的write方法,进行写数据的,写数据的最终结果保存在一个缓存队列里面,等待发送,并把当前session放入flushSession队列里面。

1.3、 发数据

发数据其实和读数据是差不多的,都在Processor中的触发的,在process()完新消息后,会调用flush()方法,把flushSession队列里面的session取出来,并把缓存的消息发送到客户端。

详见Processor代码。

2、责任链模式

在mina中,责任链模式相关的类都定义在了org.apache.mina.core.filterchain包中,看类图:

IoFilter接口实现代表一个具体的执行单元,是写自定义IoFilter的接口,IoFilter有一个适配器IoFilterAdapter,里面什么都没有干。

NextFilter接口看上去和IoFilter接口差不多,但NextFilter接口代表的是“下一个filter”,这里的下是抽象的,因为在mina的各种链中,处理顺序有的是从头到尾,有的是从尾到头,而这里的下就代表了熟悉中的下一个filter。

Entry接口是链接中的具体对象,其封装了IoFilter接口和NextFilter接口,代表一个实体放在链队列里面。

DefaultIoFilterChainBuilder是mina提供的一个便于用户构造IoFilterChain的方法。

2.1、读数据-责任链

2.2、写数据-责任链

2.3、发数据-责任链

2.4 责任链引擎实现

这里以读数据-责任链为例,说下在mina中具体是怎么实现这个责任链的执行。

engin代码

[java] view plain copy

print?

  1. //class DefaultIoFilterChain
  2. //触发点,传入消息对象
  3. public void fireMessageReceived(Object message) {
  4. if (message instanceof IoBuffer) {
  5. session.increaseReadBytes(((IoBuffer) message).remaining(), System.currentTimeMillis());
  6. }
  7. //从头开始
  8. Entry head = this.head;
  9. callNextMessageReceived(head, session, message);
  10. }
  11. //这个地方是个递归调用入口,目的就是执行entry里面的iofilter,并把当前entry的下一个nextfilter传入
  12. private void callNextMessageReceived(Entry entry, IoSession session, Object message) {
  13. try {
  14. IoFilter filter = entry.getFilter();
  15. NextFilter nextFilter = entry.getNextFilter();
  16. filter.messageReceived(nextFilter, session, message);
  17. } catch (Throwable e) {
  18. fireExceptionCaught(e);
  19. }
  20. }

iofilter代码,这里面拿一个简单的LoggingFilter来说明

[java] view plain copy

print?

  1. //class LoggingFilter
  2. public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
  3. log(messageReceivedLevel, "RECEIVED: {}", message);
  4. //调用nextfilter的receive方法,执行下一个filter
  5. //也可以不调用,直接返回
  6. nextFilter.messageReceived(session, message);
  7. //当然也可以在这里加一些处理后的业务逻辑。
  8. }

nextfilter代码

[java] view plain copy

print?

    1. private class EntryImpl implements Entry {
    2. private EntryImpl prevEntry;
    3. private EntryImpl nextEntry;
    4. private final String name;
    5. private IoFilter filter;
    6. private final NextFilter nextFilter;
    7. private EntryImpl(EntryImpl prevEntry, EntryImpl nextEntry, String name, IoFilter filter) {
    8. if (filter == null) {
    9. throw new IllegalArgumentException("filter");
    10. }
    11. if (name == null) {
    12. throw new IllegalArgumentException("name");
    13. }
    14. this.prevEntry = prevEntry;
    15. this.nextEntry = nextEntry;
    16. this.name = name;
    17. this.filter = filter;
    18. this.nextFilter = new NextFilter() {
    19. public void messageReceived(IoSession session, Object message) {
    20. Entry nextEntry = EntryImpl.this.nextEntry;
    21. //把nextEntry指向相应的entry,继续进入循环体,注意这里的EntryImpl.this.nextEntry,即从头到尾
    22. callNextMessageReceived(nextEntry, session, message);
    23. }
    24. public void messageSent(IoSession session, WriteRequest writeRequest) {
    25. Entry nextEntry = EntryImpl.this.nextEntry;
    26. callNextMessageSent(nextEntry, session, writeRequest);
    27. }
    28. public void filterWrite(IoSession session, WriteRequest writeRequest) {
    29. Entry nextEntry = EntryImpl.this.prevEntry;
    30. //把nextEntry指向相应的entry,继续进入循环体,注意这里的EntryImpl.this.prevEntry,即从尾到头
    31. callPreviousFilterWrite(nextEntry, session, writeRequest);
    32. }
    33. };
    34. }
    35. public String getName() {
    36. return name;
    37. }
    38. public IoFilter getFilter() {
    39. return filter;
    40. }
    41. }
时间: 2024-10-19 23:46:55

NIO框架之MINA源码解析(三):底层通信与责任链模式应用的相关文章

NIO框架之MINA源码解析(五):NIO超级陷阱和使用同步IO与MINA通信

1.NIO超级陷阱 之所以说NIO超级陷阱,就是因为我在本系列开头的那句话,因为使用缺陷导致客户业务系统瘫痪.当然,我对这个问题进行了很深的追踪,包括对MINA源码的深入了解,但其实之所以会出现这个问题,它的根不是MINA的原因,而是JDK底层的问题. JDK底层在实现nio时,为了能够唤醒等待在io上的线程,在windows平台使用了两个端口建立连接发消息实现.看如下代码: [java] view plain copy print? public class NIOTest { @Test p

NIO框架之MINA源码解析(一):背景

?? "你们的agent占了好多系统的端口,把我们的很多业务系统都给整死了,给我们造成了很大的损失,要求你们的相关领导下周过来道歉"   --   来自我们的一个客户. 怎么可能呢,我们都不相信,我们的agent只占一个端口啊! 事实胜过雄辩,经过查证,确实是由于我们的agent占了好多系统的端口,我看了一下日志,基本把系统可用的端口占完了! 为什么呢?MINA框架私自开的! 由于我们的agent端使用了NIO通信框架MINA,但并没有使用好,造成了这一几乎毁灭行的灾难. 还是先看代码

NIO框架之MINA源码解析(二):mina核心引擎

MINA的底层还是利用了jdk提供了nio功能,mina只是对nio进行封装,包括MINA用的线程池都是jdk直接提供的. MINA的server端主要有accept.processor.session三部分组成的.其中accept主要负责在指定的端口监听,若有新连接则建立一个新的session:processor则负责处理session对应的发送数据和接收数据并调用上层处理:session则缓存当前连接数据. MINA采用了线程懒启动的技术,即最少启动线程,在MINA server启动的时候,

NIO框架之MINA源码解析(四):粘包与断包处理及编码与解码

1.粘包与段包 粘包:指TCP协议中,发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾.造成的可能原因: 发送端需要等缓冲区满才发送出去,造成粘包 接收方不及时接收缓冲区的包,造成多个包接收 断包:也就是数据不全,比如包太大,就把包分解成多个小包,多次发送,导致每次接收数据都不全. 2.消息传输的格式 消息长度+消息头+消息体  即前N个字节用于存储消息的长度,用于判断当前消息什么时候结束. 消息头+消息体    即固定长度的消息,前几个字节为消息

NIO框架之MINA源码解析(转)

http://blog.csdn.net/column/details/nio-mina-source.html http://blog.csdn.net/chaofanwei/article/details/38848085 http://blog.csdn.net/chaofanwei/article/details/38871115 http://blog.csdn.net/chaofanwei/article/details/38920963 http://blog.csdn.net/c

(三)Mina源码解析之IoFilter

本篇文章主要剖析Mina中的过滤器是如何实现的 首先还是引入一个简单的完整的server端的例子,代码如下 public class Server { public static void main(String[] args) { IoAcceptor acceptor = new NioSocketAcceptor(); acceptor.getSessionConfig().setReadBufferSize(2048); acceptor.getSessionConfig().setId

Volley框架使用及源码解析

1. Volley特点 (1) 特别适合数据量小,通信频繁的网络操作. (2) 扩展性强.Volley 中大多是基于接口的设计,可根据需要自行定制. (3) 一定程度符合 Http 规范,包括返回 ResponseCode(2xx.3xx.4xx.5xx)的处理,请求头的处理, 缓存机制的支持等.并支持重试及优先级定义. (4) 提供简便的图片加载工具 GitHub地址:https//github.com/mcxiaoke/android-volley 2. 概念介绍 Request:表示一个请

volley源码解析(三)--Volley核心之RequestQueue类

上一篇文章给大家说明了Request<T>的内部结构,对于这个类而言,volley让我们关注的主要请求获得响应以后,怎么根据自己的需要解析响应,然后在主线程中回调监听器的方法,至于是怎么获得响应的,线程又是怎么开启的,都跟Request无关. 前面已经提到,Request会放在队列里面等待线程的提取,RequestQueue类作为volley的核心类,可以说是连接请求与响应的桥梁.另外,RequestQueue类作为一个集合,还给我们统一管理请求带来更大的方便,这样的思想是很值得我们学习的.

rpc框架yar之源码解析- 协议和传输

Yar 协议头和传输源码分析 这篇博客主要学习rpc框架yar的协议头和传输的实现, 能力有限,有些语句没有看懂,所以猜测了一部分. yar_header_t的实现 _yar_header的定义主要在yar_protocol.h和yar_protocol.c里面,下面介绍这两个文件里的源码.借用网上的一幅图片,请求体包括 yar_header + packager_name + yar_request_t 这三个部分,返回类似.下面主要介绍yar_header的部分. ? yar_protoco