ZeroMQ(java)中对IO的封装(StreamEngine)

哎,各种各样杂七杂八的事情。。。好久没有看代码了,其实要搞明白一个与IO相关的框架,最好的办法就是把它的I/0的读写两个过程搞清楚。。。例如在netty中,如果能将eventLoop的运行原理搞清楚,然后摸清楚整个I/O读写两个过程,那么也就差不太多了。。。。

这次来看看ZeroMQ(java)中如何来处理I/O的,先来看看一个类型的定义,IOObject类型,这个类型应该扮演的是工具类的形象,前面看过在ZeroMQ中所谓的IO线程的定义,那么IOObject就是用于直接与IO线程交互的,或者说的更直接的一点就是它是与IO线程里的poller对象交互的。。。

那么先来看看IOObject的类图吧:

这张图应该将IOObject与IOThread以及Poller之间的关系表现的很清楚了吧。。。。IOObject实现了IPollEvents接口,那么也就代表它可以响应IO事件。。。不过其实它并不直接实现这些IO事件,而是将其委托给内部的一个IPollEvents对象。。只不过是做了一层代理而已。。。

好了,接下来来看看IOObject的代码吧,先来看看它的属性申明:

[java] view plaincopy

  1. private Poller poller;   //poller对象
  2. private IPollEvents handler;   //用于执行事件回调的handler

这个poller就是从IO线程里面获取过来的,handler就是刚刚提到的事件回调的处理对象。。。IOObject不过是对其进行了一层包装而已。。。

那么接下来来看看重要的方法定义:

[java] view plaincopy

  1. //在将一个IO对象加入到一个IO线程的时候,要注意确定当前IO对象之前没有加入到任何IO线程或者已经从别的IO线程上面退下来了
  2. //将当前这个IO对象加入到IO线程上面去,说白了主要是获取这个IO线程的poller对象
  3. public void plug(IOThread io_thread_) {
  4. assert (io_thread_ != null);
  5. assert (poller == null);
  6. poller = io_thread_.get_poller ();      //获取这个线程的poller对象
  7. }

这个方法用于将当前这个IO对象加入到一个IO线程,其实主要的是要获取这个IO线程的Poller对象。。好了,接下来再来看看如何注册channel以及事件吧:

[java] view plaincopy

  1. //在poller里面移除channel
  2. public final void rm_fd(SelectableChannel handle) {
  3. poller.rm_fd(handle);
  4. }
  5. //给这个channel注册读取的事件
  6. public final void set_pollin (SelectableChannel handle_) {
  7. poller.set_pollin (handle_);
  8. }
  9. //在这个channel上面注册写事件
  10. public final void set_pollout (SelectableChannel handle_) {
  11. poller.set_pollout (handle_);
  12. }
  13. //注册链接事件
  14. public final void set_pollconnect(SelectableChannel handle) {
  15. poller.set_pollconnect(handle);
  16. }
  17. //注册accept事件
  18. public final void set_pollaccept(SelectableChannel handle) {
  19. poller.set_pollaccept(handle);
  20. }
  21. //取消读取事件的注册
  22. public final void reset_pollin(SelectableChannel handle) {
  23. poller.reset_pollin (handle);
  24. }
  25. //取消写事件的注册
  26. public final void reset_pollout(SelectableChannel handle) {
  27. poller.reset_pollout (handle);
  28. }

这部分代码应该很简单吧,而且应该对IOObject的用处比较的清楚了,然后至于说IOObject对象如何响应in_event什么的,前面已经说过了,其实是委托给了handler对象来处理。。。好啦,IOObject的分析就算差不多了。。接下来来看看StreamEngine类型的实现吧,还是先来看看它初略的类图吧:

其实觉得看一个类的类图,基本上就能看出这个类的很多情况,好了,不说闲话了,来看看它的属性的定义吧:

[java] view plaincopy

  1. private static final int GREETING_SIZE = 12;   //问候msg的大小,12个字节  (10字节的头,1字节的版本,1字节的socket类型)
  2. //  True iff we are registered with an I/O poller.
  3. private boolean io_enabled;   //如果是true的话,表示当前已经注册到了poller上面去
  4. private SocketChannel handle;   //真正底层用于通信的socketChannel
  5. private ByteBuffer inbuf;  //接收数据的buf
  6. private int insize;   //记录接收的数据的大小
  7. private DecoderBase decoder;  //decoder
  8. private Transfer outbuf;   //outbuf
  9. private int outsize;   //outbuf的大小
  10. private EncoderBase encoder;  //encoder
  11. //  When true, we are still trying to determine whether
  12. //  the peer is using versioned protocol, and if so, which
  13. //  version.  When false, normal message flow has started.
  14. private boolean handshaking;  //是否是在握手中,当值为false的时候代表握手已经完成了
  15. //  The receive buffer holding the greeting message
  16. //  that we are receiving from the peer.
  17. private final ByteBuffer greeting;  //用于接收问候msg的buf
  18. //  The send buffer holding the greeting message
  19. //  that we are sending to the peer.
  20. private final ByteBuffer greeting_output_buffer;  //用于发送问候msg的buf
  21. private SessionBase session;    //所属的session
  22. private Options options;  //选项配置
  23. // String representation of endpoint
  24. private String endpoint;   //这里一般是地址信息
  25. private boolean plugged;   //是否已经加入了
  26. private boolean terminating;  //是否已经停止了
  27. // Socket
  28. private SocketBase socket;  //所属的socket
  29. private IOObject io_object;    //拥有的IO对象

这里面有很多重要的属性,例如handler是SocketChannel类型的,可以知道它才是实际上底层用于通信的,然后又inbuf以及outbuf,这两个东西是干嘛用的应该一眼就看出来了吧,然后还有encoder和decoder,呵呵,可以猜到,读取到的数据先要经过decoder的处理才提交给上层,发送出去的数据也会通过encoder处理成二进制再发送出去。。。然后还有一个io_objcet对象。。。

接下来来看看构造方法吧:

[java] view plaincopy

  1. //构造函数,第一个参数是底层的channel,
  2. public StreamEngine (SocketChannel fd_, final Options options_, final String endpoint_)
  3. {
  4. handle = fd_;
  5. inbuf = null;
  6. insize = 0;
  7. io_enabled = false;
  8. outbuf = null;
  9. outsize = 0;
  10. handshaking = true;  //初始化为ture,表示还没有完成握手
  11. session = null;
  12. options = options_;
  13. plugged = false;
  14. terminating = false;
  15. endpoint = endpoint_;
  16. socket = null;
  17. greeting = ByteBuffer.allocate (GREETING_SIZE);  //创建用于接收问候msg的buf
  18. greeting_output_buffer = ByteBuffer.allocate (GREETING_SIZE);   //创建用于发送握手信息的buf
  19. encoder = null;
  20. decoder = null;
  21. try {
  22. Utils.unblock_socket (handle);  //将底层的channel设置为非阻塞的
  23. if (options.sndbuf != 0) {  //设置底层的socket的发送缓冲大小
  24. handle.socket().setSendBufferSize((int)options.sndbuf);
  25. }
  26. if (options.rcvbuf != 0) {  //设置底层的socket的接收缓冲大小
  27. handle.socket().setReceiveBufferSize((int)options.rcvbuf);
  28. }
  29. } catch (IOException e) {
  30. throw new ZError.IOException(e);
  31. }
  32. }

这个比较有意思的就是将channel设置为了非阻塞的模式,然后设置了底层socket的发送以及接受缓冲的大小。。其余的就没啥意思了。。。

[java] view plaincopy

  1. //将当前engine加入到IO线程以及session,其实这里最主要的事情是将channel注册到poller上面去
  2. public void plug (IOThread io_thread_,
  3. SessionBase session_)  {
  4. assert (!plugged);
  5. plugged = true;  //标志位
  6. //  Connect to session object.
  7. assert (session == null);
  8. assert (session_ != null);
  9. session = session_;    //当前所属的session
  10. socket = session.get_soket ();  //获取所属的scoekt,这个是ZMQ的socket
  11. io_object = new IOObject(null);  //创建IO对象,
  12. io_object.set_handler(this);  //设置IO对象的事件回调
  13. //  Connect to I/O threads poller object.
  14. io_object.plug (io_thread_);  // 将IO对象搞到这个IO线程上面去,其实最主要的就是获取这个IO线程的poller对象
  15. io_object.add_fd (handle);   //将底层的channel加入
  16. io_enabled = true; //表示已经加入了
  17. //  Send the ‘length‘ and ‘flags‘ fields of the identity message.
  18. //  The ‘length‘ field is encoded in the long format.
  19. //设置发送的问候msg的信息
  20. greeting_output_buffer.put ((byte) 0xff);
  21. greeting_output_buffer.putLong (options.identity_size + 1);
  22. greeting_output_buffer.put ((byte) 0x7f);
  23. io_object.set_pollin (handle);  //注册当前channel的读事件
  24. //  When there‘s a raw custom encoder, we don‘t send 10 bytes frame
  25. boolean custom = false;
  26. try {
  27. custom = options.encoder != null && options.encoder.getDeclaredField ("RAW_ENCODER") != null;
  28. } catch (SecurityException e) {
  29. } catch (NoSuchFieldException e) {
  30. }
  31. if (!custom) {
  32. outsize = greeting_output_buffer.position ();
  33. outbuf = new Transfer.ByteBufferTransfer ((ByteBuffer) greeting_output_buffer.flip ());  //设置需要发送的buf,将问候信息发送出去
  34. io_object.set_pollout (handle);
  35. }
  36. //  Flush all the data that may have been already received downstream.
  37. in_event ();  //看是否有数据读取了
  38. }

这个方法用于将当前IO对象注册到IO线程上面去,并且还要管理session,可以看到这里主要是利用IOObject对象,用于在poller对象上面注册channel,以及读写事件。。。另外还有对握手信息的处理。。。好了,握手这部分的内容,因为现在还没有看,不知道具体的流程是啥样的,就先放一下。。。再来看两个重要的方法定义吧:

[java] view plaincopy

  1. //当底层的chanel有数据可以读取的时候的回调方法
  2. public void in_event ()  {
  3. if (handshaking)
  4. if (!handshake ())
  5. return;
  6. assert (decoder != null);
  7. boolean disconnection = false;
  8. //  If there‘s no data to process in the buffer...
  9. if (insize == 0) {  //如果inbuf里面没有数据需要处理
  10. //  Retrieve the buffer and read as much data as possible.
  11. //  Note that buffer can be arbitrarily large. However, we assume
  12. //  the underlying TCP layer has fixed buffer size and thus the
  13. //  number of bytes read will be always limited.
  14. inbuf = decoder.get_buffer ();  //从解码器里面获取buf,用于写入读取的数据,因为在已经设置了底层socket的TCP接收缓冲区的大小
  15. insize = read (inbuf);  //用于将发送过来的数据写到buf中去,并记录大小
  16. inbuf.flip();  //这里准备从buf里面读取数据了
  17. //  Check whether the peer has closed the connection.
  18. if (insize == -1) {  //如果是-1的话,表示底层的socket连接已经出现了问题
  19. insize = 0;
  20. disconnection = true;
  21. }
  22. }
  23. //  Push the data to the decoder.
  24. int processed = decoder.process_buffer (inbuf, insize);  //解析这些读取到的数据
  25. if (processed == -1) {
  26. disconnection = true;
  27. } else {
  28. //  Stop polling for input if we got stuck.
  29. if (processed < insize)  //如果处理的数据居然还没有读到的数据多,那么取消读取事件的注册
  30. io_object.reset_pollin (handle);
  31. //  Adjust the buffer.
  32. insize -= processed;  //还剩下没有处理的数据的大小
  33. }
  34. //  Flush all messages the decoder may have produced.
  35. session.flush ();  //将decoder解析出来的数据交给session
  36. //  An input error has occurred. If the last decoded message
  37. //  has already been accepted, we terminate the engine immediately.
  38. //  Otherwise, we stop waiting for socket events and postpone
  39. //  the termination until after the message is accepted.
  40. if (disconnection) {   //表示已经断开了连接,那么需要处理一下
  41. if (decoder.stalled ()) {
  42. io_object.rm_fd (handle);
  43. io_enabled = false;
  44. } else
  45. error ();
  46. }
  47. }
  48. //表示可以写数据了
  49. public void out_event ()   {
  50. //  If write buffer is empty, try to read new data from the encoder.
  51. if (outsize == 0) {  //需要写的数据量为0
  52. //  Even when we stop polling as soon as there is no
  53. //  data to send, the poller may invoke out_event one
  54. //  more time due to ‘speculative write‘ optimisation.
  55. if (encoder == null) {
  56. assert (handshaking);
  57. return;
  58. }
  59. outbuf = encoder.get_data (null);  //从encoder里面获取数据
  60. outsize = outbuf.remaining();
  61. //  If there is no data to send, stop polling for output.
  62. if (outbuf.remaining() == 0) {   //如果确实没有数据要写,那么取消写事件的注册
  63. io_object.reset_pollout (handle);
  64. // when we use custom encoder, we might want to close
  65. if (encoder.is_error()) {
  66. error();
  67. }
  68. return;
  69. }
  70. }
  71. //  If there are any data to write in write buffer, write as much as
  72. //  possible to the socket. Note that amount of data to write can be
  73. //  arbitratily large. However, we assume that underlying TCP layer has
  74. //  limited transmission buffer and thus the actual number of bytes
  75. //  written should be reasonably modest.
  76. int nbytes = write (outbuf);  //写数据
  77. //  IO error has occurred. We stop waiting for output events.
  78. //  The engine is not terminated until we detect input error;
  79. //  this is necessary to prevent losing incomming messages.
  80. if (nbytes == -1) {  //如果-1,那么表示底层用到的socket其实已经出现了问题
  81. io_object.reset_pollout (handle);  //取消写事件的注册
  82. if (terminating)
  83. terminate ();
  84. return;
  85. }
  86. outsize -= nbytes;  //这里更新需要写的数据的数量
  87. //  If we are still handshaking and there are no data
  88. //  to send, stop polling for output.
  89. if (handshaking)
  90. if (outsize == 0)
  91. io_object.reset_pollout (handle);
  92. // when we use custom encoder, we might want to close after sending a response
  93. if (outsize == 0) {
  94. if (encoder != null && encoder.is_error ()) {
  95. error();
  96. return;
  97. }
  98. if (terminating)
  99. terminate ();
  100. }
  101. }

这两个方法是用于相应IO事件的,前面提到的IOObject将IO事件其实委托给了内部的handler来处理,其实这个handler对象就是SteamEngine对象,也就是底层的channel有数据可以读写的时候,将会用上面的两个方法来处理。这里就可以看到读写事件最原始的处理流程了,而且也看到了encoder以及decoder的用处。。。这里代码应该还算是比较的简单,由于这部分还涉及到与上层的session对象之间的交互,这个还要等到以后来分析。。。

好了,那么到这里ZeroMQ中IO的处理流程也就算是有了基本的了解了。。。。

时间: 2024-11-08 01:37:00

ZeroMQ(java)中对IO的封装(StreamEngine)的相关文章

ZeroMQ(java)中的数据流SessionBase与SocketBase

前面的文章中已经比较的清楚了ZeroMQ(java)中如何在底层处理IO, 通过StreamEngine对象来维护SelectableChannel对象以及IO的事件回调,然后通过Poller对象来维护Selector对象,然后用IOObject对象来具体的管理SelectableChannel对象在Poller上面的注册,以及事件回调,他们之间的关系可以用下面的图形来简单的描述一下: 对于接收到的数据,首先由StreamEngine进行处理,其实它会调用内部的decoder将字节数据转化为Ms

java中的io系统详解

java中的io系统详解 分类: JAVA开发应用 笔记(读书.心得)2009-03-04 11:26 46118人阅读 评论(37) 收藏 举报 javaiostreamconstructorstringbyte 相关读书笔记.心得文章列表 Java 流在处理上分为字符流和字节流.字符流处理的单元为 2 个字节的 Unicode 字符,分别操作字符.字符数组或字符串,而字节流处理单元为 1 个字节,操作字节和字节数组. Java 内用 Unicode 编码存储字符,字符流处理类负责将外部的其他

深入理解Java中的IO

深入理解Java中的IO 引言:     对程序语言的设计者来说,创建一个好的输入/输出(I/O)系统是一项艰难的任务 < Thinking in Java >   本文的目录视图如下: Java IO概要 a.Java IO中常用的类 b.Java流类的类结构图 1.流的概念和作用 2.Java IO所采用的模型  : 3.IO流的分类 4.Java IO流对象 1.输入字节流InputStream 2.输出字节流OutputStream 3.字符输入流Reader 4.字符输出流Write

java中的IO基础总结

java中的I/O类库设计可谓是比较丰富的,在我们平时的编程中也经常接触到,往往大部分的系统都有对IO操作的一些封装代码,平时要用到往往翻翻api或者找个写好的方法复制就搞定,由此带来的是对java本身提供的这些方法不熟悉,平时不好好梳理下,对java的io包下面这些常用类也就比较凌乱了.所以这里通过api文档和java.io下面的源码去整理下. 1.表示字节输入输出流的所有类的超类(InputStream/OutputStream) 构造方法:InputStream() 创建一个输入的stre

JAVA中各种IO的关系及说明

JAVA中的IO以前看着太混乱了,现在梳理一下 1.IO流分为两大类,一个是以stream结尾的,叫做字节流,顾名思义,按照byte为单位进行传输:另一种是以reader和writer结尾的叫做字符流,它貌似是封装了stream结尾的 IO流类,而产生的另一种功能类似,但是传输介质不再是byte,而是字符,也就是说,根据传说字符的不同,比如UTF-8,GBK等,它的传输单位也不是固定的. 2.输入输出,输入指的是从文件向内存中进行读入,输出指的是,内存中的内容写出到文件中 3.常见的Buffer

Java中的IO流(五)

上一篇<Java中的IO流(四)>记录了一下Properties类,此类不属于IO流,它属于集合框架.接下来说一下IO流中的其它流 一,打印流PrintStream PrintStream为其他输出流添加了功能,使它们能够方便地打印各种数据值表示形式.并且此注永远不会抛出IOException. 此流的构造函数大致分三类 1,接收File文件类型的 2,接收OutputStream类型的 3,接收文件名形式的 下演示一下此流的两个方法 1 private static void functio

Java中的IO流

Java中的IO流是实现输入/输出的基础. 按照流的方向不同:分为输入流和输出流. 按照处理数据单位的不同:分为字节流(8位)和字符流(16位). 按照功能不同:分为节点流和处理流 所有面向字节的流类都继承于InputStream类(输入流) 或OutputStream类(输出流),这两个类是抽象类,我们可以利用它的子类来完成不同的功能. InputStream.OutputStream都是抽象类 InputStream抽象了应用程序读取数据的方式 OutputStream抽象类应用程序写出数据

JAVA 中的IO流

Java中的IO流是用来处理设备与设备之前的数据传输,在java中以流的形式传输.流分为两类:字节流和字符流. 字节流:InputStream,OutPutSteam.(计算机内的数据都是以字节存储的,字节流可以操作任意数据) 字符流:Reader,Writer.(字符流只能操作字符,但是在实际应用中字符流比较方便) 从操作来看又可以分为:输入流和输出流. 在进行IO流操作的时候分为以下几步:1.导入IO流的包,2.进行IO流的异常处理,3.关闭IO流释放资源. 字节流 ————————————

Java中的继承、封装、多态的理解

Java中的继承.封装.多态 继承的理解: 1.继承是面向对象的三大特征之一,也是实现代码复用的重要手段.Java的继承具有单继承的特点,每个子类只有一个直接父类. 2.Java的继承通过extends关键字来实现,实现继承的类被称为子类,被继承的类称为父类(有的也称其为基类.超类),父类和子类的关系,是一种一般和特殊的关系.就像是水果和苹果的关系,苹果继承了水果,苹果是水果的子类,水果是苹果的父类,则苹果是一种特殊的水果. 3.Java使用extends作为继承的关键字,extends关键字在

Java中的继承、封装、多态

继承 所谓封装,就是将对象的成员变量和成员函数包装和隐藏起来,对外界通过特定的接口来访问. public class User { private String name; public User (String name) { this.name = name; } public String getName () { return this.name; } public void sayName () { System.out.println(this.getName()); } publi