ZeroMQ(java)中组件间数据传输(Pipe的实现)

在ZeroMQ(java)中,整个IO的处理流程都是分层来进行的,当然处于最下端的肯定是前面介绍过的poller以及StreamEngin了。。。。涉及到上层的话就还有session,以及socket,先用一张图来大概的描述一下整个层次关系吧。。

整个分层的结构大概就是这样吧,其中poller与StreamEngin是怎么交互的,这个就不说饿了吧,然后Session这个怎么与session之间交互呢,这个以后再说吧,其实在streamEngin里面有自己的session引用。。反正这里没啥意思。。主要就在与Session怎么与自己所属的Socket进行交互,当从最底层接收到数据之后,session如何交给上层的socket,让其来处理。。。这里就涉及到了Pipe,也就是session与自己所属的socket之间是通过pipe来进行数据传递的。。。

那么在具体的分析session与socket之前就来看看这个Pipe是怎么工作的吧,先来大概的看看它的类图:

这里可以看到Pipe继承自ZObject类型,那么可以知道Pipe可以发送,接受以及执行命令,同时也就意味着Pipe也需要由自己关联的IO线程才行,或者说有关联的mailbox。。。不过这个也不是强制的,以后再分析Socket的pipe的时候,就会发现它的pipe关联到socket自己的mailbox,但是socket的mailbox没有注册到任何的poller上面去,也就是它并没有在任何IO线程里执行,最后其实是在用户代码的线程中运行的。。。。好了。好像闲话说的比较多了。。用一张图来刻画一下Pipe是怎么运行的吧:

其实通过这张图形就已经将Pipe的运行原理基本描述出来了,pipe的两端都分别关联了两个YPipe(可以将其理解为队列)对象,例如左边将其中一个YPipe当做写端,那么在另外一边就将其看成是读端。。。

这里的YPipe对象可以将其理解为队列,至于说具体的实现,底层确实是队列,只不过是自己实现的,而且实现的还挺繁琐的,就不细说了,不过这里有向吐槽的地方,明明concurrent库中有无锁的队列ConcurrentLinkedList,在并发环境下有很好的性能,干嘛不在这个基础上进行扩展。。。。

这里另外还要看看在ZeroMQ中,也定义的有Pipe类型自己的事件回调,其定义如下:

[java] view plaincopy

  1. public interface IPipeEvents {
  2. void read_activated(Pipe pipe);  //有数据可以读取
  3. void write_activated(Pipe pipe);  //当前pipe有数据写
  4. void hiccuped(Pipe pipe);   //对面的pipe替换掉了读端,也就是当前需要替换写段的时候的回调
  5. void terminated(Pipe pipe);  //当前pipe停止的回调
  6. }

具体每个方法是干嘛用的注释应该说的很清楚了。。那么接下来来看看Pipe的两端是怎么进行交互的吧,首先看如何发送数据到pipe的另外一端:

[java] view plaincopy

  1. //从写端写数据局,发送给pipe的另外一端
  2. public boolean write (Msg msg_)  {
  3. if (!check_write ())
  4. return false;
  5. boolean more = msg_.has_more();
  6. outpipe.write (msg_, more);
  7. if (!more)
  8. msgs_written++;   //已经读取的msg的计数
  9. return true;
  10. }

其实这里直接就是在写端,将数据写到队列里面去就好了,那么如何通知对面当前有数据发送过来了呢,来看另外一个方法:

[java] view plaincopy

  1. //其实这里主要是给对面的pipe发送activate_read命令,表示它可以读了
  2. public void flush () {
  3. //  The peer does not exist anymore at this point.
  4. if (state == State.terminating)
  5. return;
  6. if (outpipe != null && !outpipe.flush ()) {
  7. send_activate_read (peer);  //向对面发送可以读取的命令
  8. }
  9. }

这个,如果看了ZObject就应该很清楚了吧,直接给命令的另外一端发送activate_read类型的命令,那么这个命令最终将会被pipe的另外一端所关联的mailbox收到,从而对面的Pipe将会在其IO线程中执行命令,对于这个命令,进行的操作是process_activate_read方法,那么来看看Pipe中这个方法的的定义吧:

[java] view plaincopy

  1. //收到命令,表示底层的pipe有数据可以读取了,这里主要是要调用事件回调,通知上层的代码,pipe有数据可以读取了
  2. protected void process_activate_read () {
  3. if (!in_active && (state == State.active || state == State.pending)) {
  4. in_active = true;
  5. sink.read_activated (this);  //调用事件回调
  6. }
  7. }

这里其实就是调用当前的pipe的事件回调,来处理当前的pipe对象,其实也就是通知上层的代码,当前pipe有数据可以读了,让其进行处理。。。。

好了,那么到这里整个Pipe的运行原理就算比较的清楚了。。。

不过自己不太明白,在java中这种数据的传递明明很简单就可以实现,干嘛要搞的这么复杂。。。不过这里也有一个好处,就是将每一个对象的方法的执行都封闭在了自己的IO线程内部。。。也算是一种线程封闭原则的实现吧。。。其余的好处,好像没啥好处,而且真的觉得略繁琐。。。。

时间: 2024-11-10 21:47:36

ZeroMQ(java)中组件间数据传输(Pipe的实现)的相关文章

ZeroMQ(java)中的数据流SessionBase与SocketBase

前面的文章中已经比较的清楚了ZeroMQ(java)中如何在底层处理IO, 通过StreamEngine对象来维护SelectableChannel对象以及IO的事件回调,然后通过Poller对象来维护Selector对象,然后用IOObject对象来具体的管理SelectableChannel对象在Poller上面的注册,以及事件回调,他们之间的关系可以用下面的图形来简单的描述一下: 对于接收到的数据,首先由StreamEngine进行处理,其实它会调用内部的decoder将字节数据转化为Ms

ZeroMQ(java)中监控Socket

基本上ZeroMQ(java)中基本的代码都算是过了一遍了吧,不过觉得它在日志这一块貌似基本没有做什么工作,也就是我们通过日志来知道ZeroMQ都发生了什么事情.. 而且由于ZeroMQ中将连接的建立和重连接都进行了隔离,用户不需要做什么事情来维护连接,当然这样做的好处是使程序员的编码工作变少了,但是当然也有不好的地方,那就是用户失去了对ZeroMQ整个运行阶段的控制.. 例如,当我们主动去连接一个远程地址,或者连接中断之后,没有日志告诉我们都这些事情发生了...当时对于一个消息通信系统来说,这

React 中组件间通信的几种方式

在使用 React 的过程中,不可避免的需要组件间进行消息传递(通信),组件间通信大体有下面几种情况: 父组件向子组件通信 子组件向父组件通信 非嵌套组件间通信 跨级组件之间通信 1.父组件向子组件通信父组件通过向子组件传递 props,子组件得到 props 后进行相应的处理.演示代码:父组件 parent.js: import React,{ Component } from "react"; export default class App extends Component{

vue中组件间的传参

1.父传子 父组件准备一个数据,通过自定义属性给子组件赋值,进行传递 在子组件中通过 props 属性来接收参数 <body> <div id="app"> <son passdata="msg"></son> </div> </body> <script> Vue.component('son', { template: '<div>父组件的数据为:{{ passdat

vue中组件间的通信,父传子,子传父

参考文章 :https://www.cnblogs.com/yszblog/p/10135969.html 1 父传子 子组件Vue 父组件 注册子组件 子组件在props中创建一个属性,用以接收父组件传过来的值 父组件中注册子组件 在子组件标签中添加子组件props中创建的属性 把需要传给子组件的值赋给该属性 2 通过$emit  子组件向父组件传值 2.1 子组件Vue 2.2 父组件 2.3 父组件监听子组件发射的事件名 子组件中需要以某种方式例如点击事件的方法来触发一个自定义事件 将需要

vue中组件间的通信

1.props:父组件的数据传递给子组件(数据在子组件中) (1)在子组件中申明props,props的类型一般为数组类型 window.HomeList ={ template, props:['empList'] } (2)在父组件中,给子组件所在的标签绑定属性 <home-list :empList="empList"></home-list> 父组件的数据如下: data(){ return { hobbies:['吃饭','睡觉','打豆豆','看书'

java中子类与基类变量间的赋值

Java中子类与基类变量间的赋值 子类对象可以直接赋给基类变量. 基类对象要赋给子类对象变量,必须执行类型转换, 其语法是: 子类对象变量=(子类名)基类对象名; 也不能乱转换.如果类型转换失败Java会抛出以下这种异常: ClassCastException package yanzheng; class Mammal{} class Dog extends Mammal {} class Cat extends Mammal{} public class TestCast { public 

java中解决组件重叠的问题(例如鼠标移动组件时)

java中解决组件覆盖的问题!     有时候在移动组件的时候会出现两个组件覆盖的情况,但是你想让被覆盖的组件显示出来或者不被覆盖! 在设计GUI时已经可以定义组件的叠放次序了(按摆放组件的先后顺序). 真正麻烦的是响应哪个组件,这就要创建一个链表,把组件对象按顺序存起来,响应事件时扫描这个链表,按 链表中的先后顺序选择响应组件对象.     所以要想满足自己的需求,那么在添加组件的时候就要注意顺序就好了! 另外如果不想用上面的方法,那么你就采用JLayeredPane这个类,分层面板可以帮助你

【Android的从零单排开发日记】之入门篇(十二)——Android组件间的数据传输

组件我们有了,那么我们缺少一个组件之间传递信息的渠道.利用Intent做载体,这是一个王道的做法.还有呢,可以利用文件系统来做数据共享.也可以使用Application设置全局数据,利用组件来进行控制数据. 一.Intent数据传递 那么首先是简单的跳转.我们可以借助bundle这个容器来存放我们想要传递的数据. Intent intent = new Intent(); intent.setClass(activity1.this, activity2.class); //描述起点和目标 Bu