ZeroMQ(java)中的数据流SessionBase与SocketBase

前面的文章中已经比较的清楚了ZeroMQ(java)中如何在底层处理IO,

通过StreamEngine对象来维护SelectableChannel对象以及IO的事件回调,然后通过Poller对象来维护Selector对象,然后用IOObject对象来具体的管理SelectableChannel对象在Poller上面的注册,以及事件回调,他们之间的关系可以用下面的图形来简单的描述一下:

对于接收到的数据,首先由StreamEngine进行处理,其实它会调用内部的decoder将字节数据转化为Msg对象,然后再交给上层的对象,

其实这里的上层对象也就是Session对象,每一个StreamEngine对象都有一个自己的Session对象,然后Session对象收到下面传上来的数据之后,再会通过Pipe,将数据发送到其更上层的Socket对象,

然后接下来的数据处理就交由用户的代码来处理了。。。

对于刚刚提到的对象之间的层次,用下面的图形来描述吧:

这张图应该还算刻画的比较直接了吧,底层数据通信部分负责从channel接收数据和发送二进制的数据,然后又Decoder以及Encoder来负责字节数据与Msg之间的转化。。。。

我们在ZMQ中会看到很多种类的Socket,例如Req,Dealer,Router啥的,他们都继承自SocketBase类型,每一种类型都有自己的Session,都继承自SessionBase类型。。。

好了,那么接下来先来看看SessionBase类型吧,每一个StreamEngine对象都有一个Session对象与之关联,他们是一对一的关系,先来看看它的一些重要的属性定义吧:

[java] view plaincopy

  1. private boolean connect;  //是否需要连接,如果是fasle的话,那么表示是listener创建的连接
  2. private Pipe pipe;   //与socket进行通信的pipe
  3. private final Set<Pipe> terminating_pipes;
  4. //如果是true的话,表示还有message在pipe里面没有执行
  5. private boolean incomplete_in;
  6. //如果是true的话,表示停止发送数据到网络中
  7. private boolean pending;
  8. private IEngine engine;  //绑定到这个session上面的底层的通信
  9. private SocketBase socket;   //当前session所属的socket
  10. private IOThread io_thread;    //当前session关联的IO线程,底层的io将会加入到这个IO线程
  11. private static int linger_timer_id = 0x20;
  12. //  True is linger timer is running.
  13. private boolean has_linger_timer;    //有超时事件的注册?
  14. private boolean identity_sent;    //如果是true的话,表示标志已经发送了
  15. private boolean identity_received;  //表示标志已经接收了
  16. private final Address addr;   //连接的地址
  17. private IOObject io_object;   //关联的IO对象,IEngine里的io对象

具体这些属性的是干嘛的,上面的注释基本上都已经给出来的吧,这里比较重要的属性是:

pipe,它用于与上面的Socket进行通信,当下层有数据被解析出来以后,会通过pipe将msg发送给上层的socket,具体pipe的过程,前面的文章已经说过了。。。

IEngine,底层数据通信的StreamEngine对象的引用,这个重要性就不说了吧,。。。

IOThread对象,这个是当前Session对象将会依赖的IO线程,也就是发给session的命令都会被这个IO线程的mailbox接收到,从而在这个线程中执行命令,嗯。。。重要吧。。。

另外还有一些标志位什么的。。。

好了,这里就不细说,来看看一些重要的方法吧:

[java] view plaincopy

  1. //与pipe关联上,这里其实主要是为了将当前pipe的事件回调设置为当前对象
  2. public void attach_pipe(Pipe pipe_) {
  3. assert (!is_terminating ());
  4. assert (pipe == null);
  5. assert (pipe_ != null);
  6. pipe = pipe_;  //保存当前的pipe
  7. pipe.set_event_sink (this);  //将当前的pipe的事件回调社设置为当前
  8. }

用于关联pipe对象,这里可以看到将pipe的事件回到设置成了当前session对象。。那么来看看这些事件回调方法是怎么处理的吧:

[java] view plaincopy

  1. //当有数据传给当前的pipe的时候,其实也就是pipe的对面socket那边发送数据给这里了,然后发送pipe可读的命令,那么表示有数据需要通过底层的engine发送出去了
  2. public void read_activated(Pipe pipe_)  {
  3. // Skip activating if we‘re detaching this pipe
  4. if (pipe != pipe_) {
  5. assert (terminating_pipes.contains (pipe_));
  6. return;
  7. }
  8. if (engine != null)
  9. engine.activate_out ();  //激活底层engine的channel的写事件
  10. else
  11. pipe.check_read ();
  12. }
  13. //表示可以发送数据到pipe了,那么表示需要到底层的engine接收数据了
  14. public void write_activated (Pipe pipe_)  {
  15. // Skip activating if we‘re detaching this pipe
  16. if (pipe != pipe_) {
  17. assert (terminating_pipes.contains (pipe_));
  18. return;
  19. }
  20. if (engine != null)
  21. engine.activate_in ();  //激活底层的engin的channel上面的读取事件,也就是通知底层的engin应该从网络接收数据了
  22. }

这里先是pipe可以读的时候的事件回调方法,这个处理很简单吧,直接激活底层StreamEngine的channle在poller上注册写事件,那么当底层channel可以写数据的时候,就会从当前session的pipe里去读取数据,然后发送出去。。

第二个方法是当pipe可以写的时候,这个其实就是直接注册channel的读取事件,那么当channel就会去接收数据,最后这些他们都会通过pipe发送给上层的socket对象。。。到此上面那张图的整个运行情况应该都很清楚了吧。。。

那么Session的最为关键的地方也就差不多了。。。还有一些细节,以后有需要的话再介绍吧。。。

好了接下来来看看SocketBase这个类型吧,前面已经说到了ZMQ中所有的Socket类型多继承自这个类型,可见他的重要性。。。先来看看它的一些重要的属性定义吧:

[java] view plaincopy

  1. private final Map<String, Own> endpoints;  //这里保存所有打开的endpoint,这里key是连接地址,value其实是Session对象
  2. private final Map<String, Pipe> inprocs;   //IPC通信方法
  3. //  Used to check whether the object is a socket.
  4. private int tag;   //标志位,用于判断当前对象是否是socket类型的
  5. private boolean ctx_terminated;    //如果是true,表示关联的context已经停止了
  6. private boolean destroyed;  //如果是true的话,表示当前socket对象已经被命令销毁了
  7. private final Mailbox mailbox;   //邮箱,用于接收别的地方发送过来的命令
  8. private final List<Pipe> pipes;   //当前所有关联的pipe,这些pipe是用于与当前socket的所有session进行数据通信的
  9. private Poller poller;  //poller对象
  10. private SelectableChannel handle;   //这个是mailbox的handler
  11. private long last_tsc;   //上一次执行命令的时间
  12. private int ticks;    //上次执行完命令之后,收到的message
  13. private boolean rcvmore;  //接着还有message要接收
  14. private SocketBase monitor_socket;  //用于监控的socket
  15. private int monitor_events;
  16. protected ValueReference<Integer> errno;

这里比较重要的有,:

enpoints,用于保存所有的session与其连接地址的,

pipes,保存所有与底层的session关联的pipe,这里Socket与Sesssion之间的关系是一对多的...

mailbox,socket也有自己的mailbox,不用依附于IOThread对象,不过这里有个坑,Socket类型的对象有自己的mailbox,不用依附于IO线程,并不意味着它就有自己的线程,因为它直接依赖于用户线程,依赖于用户代码...也就是说mailbox里面的命令的执行都是在用户线程中搞定的...呵呵,刚开始这个地方还纠结了很久...

来看看它的pipe的事件回调吧:

[java] view plaincopy

  1. //当pipe可以读的时候需要执行的回调方法,表示底层的StreamEngin有数据发送到这里了
  2. public void read_activated (Pipe pipe_)  {
  3. xread_activated(pipe_);  //调用子类的方法来具体处理这些数据
  4. }
  5. //当pipe可以写的时候执行的回调
  6. public void write_activated (Pipe pipe_) {
  7. xwrite_activated (pipe_);
  8. }

其实这里没有太多的内容,因为都是在具体的子类中完成的,不过到这里整个ZeroMQ(java)中数据是怎么进行流动的就算已经很清楚了...当然,有一些细节性的东西还没有列出来..

好了,到现在为止,ZeroMQ(java)中就还剩下具体的socket类型的运行以及编码方式两个大的地方没有分析了...

时间: 2024-10-13 02:01:51

ZeroMQ(java)中的数据流SessionBase与SocketBase的相关文章

ZeroMQ(java)中组件间数据传输(Pipe的实现)

在ZeroMQ(java)中,整个IO的处理流程都是分层来进行的,当然处于最下端的肯定是前面介绍过的poller以及StreamEngin了....涉及到上层的话就还有session,以及socket,先用一张图来大概的描述一下整个层次关系吧.. 整个分层的结构大概就是这样吧,其中poller与StreamEngin是怎么交互的,这个就不说饿了吧,然后Session这个怎么与session之间交互呢,这个以后再说吧,其实在streamEngin里面有自己的session引用..反正这里没啥意思.

ZeroMQ(java)中监控Socket

基本上ZeroMQ(java)中基本的代码都算是过了一遍了吧,不过觉得它在日志这一块貌似基本没有做什么工作,也就是我们通过日志来知道ZeroMQ都发生了什么事情.. 而且由于ZeroMQ中将连接的建立和重连接都进行了隔离,用户不需要做什么事情来维护连接,当然这样做的好处是使程序员的编码工作变少了,但是当然也有不好的地方,那就是用户失去了对ZeroMQ整个运行阶段的控制.. 例如,当我们主动去连接一个远程地址,或者连接中断之后,没有日志告诉我们都这些事情发生了...当时对于一个消息通信系统来说,这

Java中I/O流之数据流

Java 中的数据流: 对于某问题:将一个 long 类型的数据写到文件中,有办法吗?    转字符串 → 通过 getbytes() 写进去,费劲,而且在此过程中 long 类型的数需要不断地转换. 现在,Java 中的数据流能够很好的解决这个问题(不需要转换,直接写进去) 1. DataInputStream 与 DataOutputStream 分别继承自 InputStream.OutputStream, 它属于处理流,需要分别套接在 InputStream.OutputStream 类

深入理解Java中的流---结合Hadoop进行详解

在JavaSe的基础课程当中,可以说流是一个非常重要的概念,并且在Hadoop中得到了广泛的应用,本篇博客将围绕流进行深入的详解. (一)JavaSe中流的相关概念 1.流的定义 ①在Java当中,若一个类专门用于数据传输,则这个类称为流 ②流就是程序和设备之间嫁接以来的一根用于数据传输的管道,这个设备可以是本地硬盘,可以是内存条,也可以是网络所关联的另外一台计算机等等,其中不同管道上有不同的按钮,按下不同的按钮相当于调用不同的方法,这根带按钮的用于数据传输的管道就是流,即流就是一根管道 ③流一

java中TCP总结

先看一张图,画的很挫,将就看. TCP 客户端与服务端通信时,是服务端会拿到客户端的socket进行通信. TCP就相当于以前的座机,有一个听筒和一个话筒,A用话筒说话,B用听筒听. 下面讲讲java中TCP的使用以及步骤. TCP客户端步骤: 1.建立Socket服务,并确定IP和Port 2.通过socket服务获取输入流或输出流. 3.通输入或输出流操作数据. TCP服务端的步骤: 1.用ServerSocket来监听Port 2.用accept获取客户端的Socket 3.通客户端的So

Java中常见的IO流及其使用

Java中IO流分成两大类,一种是输入流,所有的输入流都直接或间接继承自InputStream抽象类,输入流作为数据的来源,我们可以通过输入流的read方法读取字节数据:另一种是输出流,所有的输出流都直接或间接继承自OutputStream抽象类,输出流接收数据,可以通过write方法写入字节数据.在Java的IO流类中,大部分的输入流和输出流都是成对存在的,即如果存在XXXInputStream,那么就存在XXXOutputStream,反之亦然.(SequenceInputStream和St

三、JAVA中的IO流,输出流

JAVA中java.io.*;提供数据的流式输入与输出的相关类,IO流是对于输入与输出的抽象,JAVA把这些不同来源和目标的数据都统一抽象为数据流.将输入与输出的数据统一为流的好处是程序不必关心要读取得是文件,还是网络中的数据,而是统一当作IO流来处理. IO流又分为字符流与字节流,主要区别 字节流 按字节处理(一次处理一个字节) 使用类OutputStream 字符流 按字符处理(一次处理一个字符即两个字节) 使用类Writer 其实字节流与字符流处理十分相似(即每次处理的单位不同),好多书上

【转】输入/输出流 - 深入理解Java中的流 (Stream)

基于流的数据读写,太抽象了,什么叫基于流,什么是流?Hadoop是Java语言写的,所以想理解好Hadoop的Streaming Data Access,还得从Java流机制入手.流机制也是JAVA及C++中的一个重要的机制,通过流使我们能够自由地操作包括文件,内存,IO设备等等中的数据. 首先,流是什么? 流是个抽象的概念,是对输入输出设备的抽象,Java程序中,对于数据的输入/输出操作都是以“流”的方式进行.设备可以是文件,网络,内存等. 流具有方向性,至于是输入流还是输出流则是一个相对的概

java中的io系统详解

java中的io系统详解 分类: JAVA开发应用 笔记(读书.心得)2009-03-04 11:26 46118人阅读 评论(37) 收藏 举报 javaiostreamconstructorstringbyte 相关读书笔记.心得文章列表 Java 流在处理上分为字符流和字节流.字符流处理的单元为 2 个字节的 Unicode 字符,分别操作字符.字符数组或字符串,而字节流处理单元为 1 个字节,操作字节和字节数组. Java 内用 Unicode 编码存储字符,字符流处理类负责将外部的其他