MINA原理详解

  • 通过SocketConnector同服务器端建立连接
  • 链接建立之后I/O的读写交给了I/O Processor线程,I/O Processor是多线程的
  • 通过I/O Processor读取的数据经过IoFilterChain里所有配置的IoFilter,IoFilter进行消息的过滤,格式的转换,在这个层面可以制定一些自定义的协议
  • 最后IoFilter将数据交给Handler进行业务处理,完成了整个读取的过程
  • 写入过程也是类似,只是刚好倒过来,通过IoSession.write写出数据,然后Handler进行写入的业务处理,处理完成后交给IoFilterChain,进行消息过滤和协议的转换,最后通过I/O Processor将数据写出到socket通道
  • IoFilterChain作为消息过滤链

    1. 读取的时候是从低级协议到高级协议的过程,一般来说从byte字节逐渐转换成业务对象的过程
    2. 写入的时候一般是从业务对象到字节byte的过程
      IoSession贯穿整个通信过程的始终

    整个过程可以用一个图来表现

    消息箭头都是有NioProcessor-N线程发起调用,默认情况下也在NioProcessor-N线程中执行

    Connector : 作为连接客户端,SocketConector用来和服务器端建立连接,连接成功,创建IoProcessor Thread(不能超过指定的processorCount),Thread由指定的线程池进行管理,IoProcessor 利用NIO框架对IO进行处理,同时创建IoSession。连接的建立是通过Nio的SocketChannel进行。

    NioSocketConnector connector = new NioSocketConnector(processorCount);
    ConnectFuture future = connector.connect(new InetSocketAddress(HOSTNAME, PORT));建立一个I/O通道 

    Acceptor :作为服务器端的连接接受者,SocketAcceptor用来监听端口,同客户端建立连接,连接建立之后的I/O操作全部交给IoProcessor进行处理

    IoAcceptor acceptor = new NioSocketAcceptor();
    acceptor.bind( new InetSocketAddress(PORT) ); 

    Protocol : 利用IoFilter,对消息进行解码和编码,如以下代码通过 MyProtocolEncoder 将java对象转成byte串,通过MyProtocalDecoder 将byte串恢复成java对象

    connector.getFilterChain().addLast("codec";,  new  ProtocolCodecFilter( new  MyProtocalFactory()));
    ......
    public   class  MyProtocalFactory  implements  ProtocolCodecFactory {
         ProtocolEncoderAdapter encoder = new  MyProtocolEncoder();
         ProtocolDecoder decoder = new  MyProtocalDecoder() ;
         public  ProtocolDecoder getDecoder(IoSession session)  throws  Exception {
            return  decoder;
         }
         public  ProtocolEncoder getEncoder(IoSession session)  throws  Exception {
            return  encoder;
         }
    }
    ......
    public   class  MyProtocalDecoder  extends  ProtocolDecoderAdapter  {  
    
     public   void  decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
         throws  Exception {
            int   id  = in.getInt();
            int   len = in.getInt();
            byte []  dst =  new   byte [len];  
    
            in.get(dst);  
    
            String name = new  String(dst,"GBK");  
    
            Item item = new  Item();
            item.setId(id);
            item.setName(name);
            out.write(item);
     }
    }
    ......
    public   class  MyProtocolEncoder  extends  ProtocolEncoderAdapter {  
    
     public   void  encode(IoSession session, Object message,
         ProtocolEncoderOutput out) throws  Exception {
        Item item = (Item)message;
        int  byteLen =  8  + item.getName().getBytes("GBK").length ;
        IoBuffer buf = IoBuffer.allocate(byteLen);
        buf.putInt(item.getId());
        buf.putInt(item.getName().getBytes("GBK").length);
        buf.put(item.getName().getBytes("GBK";));
        buf.flip();
        out.write(buf);  
    
     }
    }  

    handler : 具体处理事件,事件包括:sessionCreated、sessionOpened、sessionClosed、sessionIdle、exceptionCaught、messageReceived、messageSent。
    connector.setHandler(new MyHandler());MyHandler继承IoHandlerAdapter类或者实现IoHandler接口.事件最终由IoProcessor线程发动调用。

    Processor : I/O处理器、允许多线程读写,开发过程中只需要指定线程数量,Processor通过Nio框架进行I/O的续写操作,Processor包含了Nio的Selector的引用。这点也正是mina的优势,如果直接用Nio编写,则需要自己编写代码来实现类似Processor的功能。正因为 I/O Processor是异步处理读写的,所以我们有时候需要识别同一个任务的消息,比如一个任务包括发送消息,接收消息,反馈消息,那么我们需要在制定消息格式的时候,消息头里能包含一个能识别是同一个任务的id。
    I/O Porcessor线程数的设置 :如果是SocketConnector,则可以在构造方法中指定,如:new SocketConnector(processorCount, Executors.newCachedThreadPool());如果是SocketAcceptor,也是一样的:SocketAcceptor acceptor = new SocketAcceptor(ProcessorCount, Executors.newCachedThreadPool());
    processorCount为最大Porcessor线程数,这个值可以通过性能测试进行调优,默认值是cpu核数量+1(Runtime.getRuntime().availableProcessors() + 1)。
    比较奇怪的是,每个IoProcessor在创建的时候会本地自己和自己建立一个连接?

    IoSession : IoSession是用来保持IoService的上下文,一个IoService在建立Connect之后建立一个IoSession(一个连接一个session),IoSession的生命周期从Connection建立到断开为止
    IoSession做两件事情:
    1.通过IoSession可以获取IoService的所有相关配置对象(持有对IoService,Processor池,SocketChannel,SessionConfig和IoService.IoHandler的引用)
    2.通过IoSession.write 是数据写出的入口

    关于线程
    ThreadModel 1.x版本的mina还有线程模式选项在2.x之后就没有了
    1.x版本指定线程模式
    SocketConnectorConfig cfg = new SocketConnectorConfig();
    cfg.setThreadModel(ThreadModel.MANUAL);

    MINA有3种worker线程
    Acceptor、Connector、I/O processor 线程
    Acceptor Thread: 一般作为服务器端链接的接收线程,实现了接口IoService,线程的数量就是创建SocketAcceptor 的数量
    Connector Thread :一般作为客户端的请求建立链接线程,实现了接口IoService,维持了一个和服务器端Acceptor的一个链接,线程数量就是创建SocketConnector 的数量
    Mina的SocketAcceptor和SocketConnector均是继承了BaseIoService,是对IoService的两种不同的实现
    I/O processor Thread :作为I/O真正处理的线程,存在于服务器端和客户端,用来处理I/O的读写操作,线程的数量是可以配置的,默认最大数量是CPU个数+1

    服务器端:在创建SocketAcceptor的时候指定ProcessorCount

    SocketAcceptor acceptor = new SocketAcceptor(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool()); 

    客户端:在创建SocketConnector 的时候指定ProcessorCount

    SocketConnector connector = new SocketConnector(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool());

    I/O Processor Thread,是依附于IoService,类似上面的例子SocketConnector connector = new SocketConnector(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool());是指SocketConnector这个线程允许CPU+1个I/O Processor Thread
    NioProcessor虽然是多线程,但是对与一个连接的时候业务处理只会使用一个线程进行处理(Processor线程对于一个客户端连接只使用一个线程NioProcessor-n)如果handler的业务比较耗时,会导致NioProcessor线程堵塞 ,在2个客户端同时连接上来的时候会创建第2个(前提是第1个NioProcessor正在忙),创建的最大数量由Acceptor构造方法的时候指定。如果:一个客户端连接同服务器端有很多通信,并且I/O的开销不大,但是Handler处理的业务时间比较长,那么需要采用独立的线程模式,在 FilterChain的最后增加一个ExecutorFitler :

    acceptor.getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool())); 

    这样可以保证processor和handler的线程是分开的,否则:客户端发送3个消息,而服务器对于每个消息要处理10s左右,那么这3个消息是被串行处理,在处理第一个消息的时候,后面的消息将被堵塞,同样反过来客户端也有同样的问题。

    客户端Porcessor堵塞测试情况:

    1. 以下代码在建立连接后连续发送了5个消息(item)
      ConnectFuture future = connector.connect( new InetSocketAddress(HOSTNAME, PORT));
      future.awaitUninterruptibly();
      session = future.getSession();
      Item item = new Item();
      item.setId(12345 );
      item.setName("hi");
      session.write(item);
      session.write(item);
      session.write(item);
      session.write(item);
      session.write(item);
    2. 在handle的messageSent方法进行了延时处理,延时3秒
      public   void  messageSent(IoSession session, Object message)  throws  Exception {
        Thread.sleep(3000 );
        System.out.println(message);  

      }

    3. 测试结果
      5个消息是串行发送,都由同一个IoPorcessor线程处理
      session.write(item);
      session.write(item);
      session.write(item);
      session.write(item);
      session.write(item);  

      服务器端每隔3秒收到一个消息。因为调用是由IoProcessor触发,而一个connector只会使用一个IoProcessor线程

    4. 增加ExecutorFilter,ExecutorFilter保证在处理handler的时候是独立线程
      connector.getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool()));
    5. 测试结果
      4个session.wirte变成了并行处理,服务器端同时收到了5条消息

    关注公众号获取springcloud dubbo 视频

    原文地址:http://blog.51cto.com/13538361/2118177

    时间: 2024-08-05 01:39:19

    MINA原理详解的相关文章

    图像处理中的数学原理详解17——卷积定理及其证明

    欢迎关注我的博客专栏"图像处理中的数学原理详解" 全文目录请见 图像处理中的数学原理详解(总纲) http://blog.csdn.net/baimafujinji/article/details/48467225 图像处理中的数学原理详解(已发布的部分链接整理) http://blog.csdn.net/baimafujinji/article/details/48751037 1.4.5   卷积定理及其证明 卷积定理是傅立叶变换满足的一个重要性质.卷积定理指出,函数卷积的傅立叶变

    Java虚拟机工作原理详解

    原文地址:http://blog.csdn.net/bingduanlbd/article/details/8363734 一.类加载器 首先来看一下java程序的执行过程. 从这个框图很容易大体上了解java程序工作原理.首先,你写好java代码,保存到硬盘当中.然后你在命令行中输入 [java] view plaincopy javac YourClassName.java 此时,你的java代码就被编译成字节码(.class).如果你是在Eclipse IDE或者其他开发工具中,你保存代码

    kickstart安装系统原理详解

    前言 作为中小公司的运维,经常会遇到一些机械式的重复工作,例如:有时公司同时上线几十甚至上百台服务器,而且需要我们在短时间内完成系统安装. 常规的办法有什么? 光盘安装系统===>一个服务器DVD内置光驱百千块,百台服务器都配光驱就浪费了,因为一台服务器也就开始装系统能用的上,以后用的机会屈指可数.用USB外置光驱,插来插去也醉了. U盘安装系统===>还是同样的问题,要一台一台服务器插U盘. 网络安装系统(ftp,http,nfs) ===>这个方法不错,只要服务器能联网就可以装系统了

    Storm概念、原理详解及其应用(一)BaseStorm

    本文借鉴官文,添加了一些解释和看法,其中有些理解,写的比较粗糙,有问题的地方希望大家指出.写这篇文章,是想把一些官文和资料中基础.重点拿出来,能总结出便于大家理解的话语.与大多数"wordcount"代码不同的是,并不会有如何运行第一storm代码等内容,只有在运行完代码后,发现需要明白:"知其然,并知其所以然". Storm是什么?为什么要用Storm?为什么不用Spark? 第一个问题,以下概念足以解释: Storm是基于数据流的实时处理系统,提供了大吞吐量的实

    SVM -支持向量机原理详解与实践之四

    SVM -支持向量机原理详解与实践之四 SVM原理分析 SMO算法分析 SMO即Sequential minmal optimization, 是最快的二次规划的优化算法,特使对线性SVM和稀疏数据性能更优.在正式介绍SMO算法之前,首先要了解坐标上升法. 坐标上升法(Coordinate ascent) 坐标上升法(Coordinate Ascent)简单点说就是它每次通过更新函数中的一维,通过多次的迭代以达到优化函数的目的. 坐标上升法原理讲解 为了更加通用的表示算法的求解过程,我们将算法表

    SVM-支持向量机原理详解与实践之一

    目录(?)[+] 前言 SVM机器学习与深度学习 人工智能领域 机器学习与深度学习 SVM简介 SVM原理分析 快速理解SVM原理 线性可分和线性不可分 函数间隔和几何间隔 超平面分析与几何间隔详解 二次最优化 SVM-支持向量机原理详解与实践 前言 去年由于工作项目的需要实际运用到了SVM和ANN算法,也就是支持向量机和人工神经网络算法,主要是实现项目中的实时采集图片(工业高速摄像头采集)的图像识别的这一部分功能,虽然几经波折,但是还好最终还算顺利完成了项目的任务,忙碌一年,趁着放假有时间好好

    JSPatch实现原理详解<二>

    本文转载至 http://blog.cnbang.net/tech/2855/ 距离上次写的<JSPatch实现原理详解>有一个月的时间,在这段时间里 JSPatch 在不断地完善和改进,代码已经有很多变化,有一些修改值得写一下,作为上一篇的补充. Special Struct 先说下 _objc_msgForward,在上一篇提到为了让替换的方法走 forwardInvocation,把它指向一个不存在的 IMP: class_getMethodImplementation(cls, @se

    Linux下FFMPEG--H264--编码&&解码的C实现与相关原理详解

    FFMPEG是很强大的一套视频音频处理库,不过,强大的功能一般免不了复杂的实现,或者更加现实地说,"麻烦"的部署和使用流程 关于"FFMPEG怎么部署"这事就放在另一篇文章啦,下面入正题.. 编码encoder模块和解码decoder模块都有init初始化方法和资源free方法 init初始化方法主要是进行ffmpeg所必需的编解码器的初始化和部分功能方法的参数配置,而free资源释放方法则是相应地进行必要的回收 Encoder模块的实现和细节分析 #include

    KVC/KVO原理详解及编程指南

    http://blog.csdn.net/wzzvictory/article/details/9674431 2.KVC/KVO实现原理 键值编码和键值观察是根据isa-swizzling技术来实现的,主要依据runtime的强大动态能力.下面的这段话是引自网上的一篇文章: http://blog.csdn.net/kesalin/article/details/8194240 当某个类的对象第一次被观察时,系统就会在运行期动态地创建该类的一个派生类,在这个派生类中重写基类中任何被观察属性的