Disruptor学习杂记

慎入,有点乱,只是学习记录,disruptor_2.10.4

1、Disruptor对象有一个EventProcessorRepository对象

2、EventProcessorRepository里储存的是EventProcessorInfo和EventHandler(既创建Disruptor对象时设置的EventHandler)的映射关系,

及EventProcessorInfo和EventProcessor(实际为BatchEventProcessor)的映射关系

2.1 首先看看BatchEventProcessor是什么东西

BatchEventProcessor继承自EventProcessor,是一个消费者的执行体,说白了就是一个线程,可以看到它的run方法。

它主要有三个成员RingBuffer、SequenceBarrier和EventHandler,这里还需要看看这个SequenceBarrier是什么东西。

barrier在Disruptor的createEventProcessors()中创建的

SequenceBarrier barrier = ringBuffer.newBarrier(Util.getSequencesFor(barrierEventProcessors));

可以在RingBuffer(Sequencer)中看到barrier实际是ProcessingSequenceBarrier对象

new ProcessingSequenceBarrier(waitStrategy, cursor, sequencesToTrack);

先看看它储存了一些什么信息:

waitStrategy:消费者的等待策略

cursor:这个保存的是RingBuffer 环中当前的序号

sequencesToTrack:Sequence[] dependentSequences先理解为依赖的序列号,具体干嘛的还要待看

2.2 现在开看看生产者和消费者是如何运作的

现从Disruptor.start()开始

找到EventProcessorRepository中最后面的EventProcessor

EventProcessor[] gatingProcessors = eventProcessorRepository.getLastEventProcessorsInChain();

从EventProcessor中找到最后面的Sequence,然后设置ringBuffer最后面消费者对应的Sequence

ringBuffer.setGatingSequences(Util.getSequencesFor(gatingProcessors));

接着执行executor,运行EventProcessor(线程)

executor.execute(eventProcessorInfo.getEventProcessor());

2.3 现在看看EventProcessor(消费者线程)都干了些什么

首先它要获取可用的Sequence是哪个

final long availableSequence = sequenceBarrier.waitFor(nextSequence);

走到BlockingWaitStrategy.waitFor()

现得到ringbuffer中当前消费者所在的availableSequence = cursor.get()位置,比较请求的位置和当前的位置,如果请求的位置比当前的位置大,说明消费者已经没有可消费的事件了。执行processorNotifyCondition.await();等待

2.4 看看生产者又是怎么处理的

生产者先要获取下一个sequence是多少,调用ringBuffer.next();

走到MultithreadedClaimStrategy.incrementAndGet(gatingSequences);

这个gatingSequences就是生产者的依赖Sequence,他就是2.2中ringBuffer最后面消费者对应的Sequences,从中找出最小的并保存,先取出最后生产者的sequence并加1,如果sequence+1大于最小消费者sequence。则挂起线程LockSupport.parkNanos(1L);

否则返回可生产的sequence。

3 关于Sequence

可以看到RingBuffer(既Sequencer)有一个Sequence cursor成员

同时AbstractMultithreadedClaimStrategy有一个Sequence claimSequence成员

BatchEventProcessor里也由一个Sequence

这两个Sequence有什么关联了sequence

首先claimSequence保存的是请求的sequence,里面保存的是当前被请求的最大的序号。

生产者先拿到claimSequence中保存的最大序号的下一个序号。

接着,拿到从请求的序号执行ringbuffer.publish()。

3.1生产者分配数据

claimStrategy.serialisePublishing(sequence, cursor,batchSize);

先计算资源(环)中预期的最大序号expectedSequence

然后依次更新ringbuffer的cursor,保存需要请求的所有序号中最大的(如果有多个sequence请求)

3.2 通知消费者

waitStrategy.signalAllWhenBlocking();

现在唯一的疑问就是消费者是如何知道生产者生产到哪个位置了的?

看到BatchEventProcessor.run()中的如下代码,通过SequenceBarrier对象获取当前生产者的位置

final long availableSequence = sequenceBarrier.waitFor(nextSequence);

原来是根据这个sequenceBarrier中的Sequence cursorSequence;中的值来判断当前生产者的位置的。

Sequence cursorSequence这个是在sequenceBarrier也就是ProcessingSequenceBarrier构造时,传入的ringbuffer的Sequence cursor。

sequenceBarrier是如何从cursorSequence获取的当前生产者的位置的了?

联想到另外一个问题,看到BatchEventProcessor是没有WaitStrategy成员对象。那WaitStrategy在消费者这头是在哪里用到的了。

BatchEventProcessor调用ProcessingSequenceBarrier.waitFor()

ProcessingSequenceBarrier又调用waitStrategy.waitFor(),现在看看BlockingWaitStrategy.waitFor()都做了些什么?

如何消费者的请求序号大于ringbuffer的当前序号,则执行processorNotifyCondition.await();挂起线程

否则返回请求的序号,给消费者使用

时间: 2024-10-13 16:22:09

Disruptor学习杂记的相关文章

[Flask]学习杂记--模板

这个学习杂记主要不是分享经验,更多是记录下falsk的体验过程,以后做东西在深入研究,因为django之前用的时间比较长,所以很多概念都是一看而过,做个试验了解下flask的功能. flask中使用是jinja2 模板,和django自带的模板很类似,但又比django的模板强大,提供了更多有用的工具,基本使用上基本都是一致的. 写了一个小案例,把模板渲染的基本步骤和变量传递的基本使用练习了下. jinjia2 模板文档 http://jinja.pocoo.org/docs/dev/ flas

emacs学习杂记

1.下载emacs源码编译安装后,安装路径在:/usr/local/bin 也可在emacs源码下生产的Makefile查看安装的路径:prefix=/usr/local bindir=${exec_prefix}/bin 2.emacs学习:Emacs 编辑环境,第 1 部分: 学习 Emacs 的基础知识 替代sourceinsight 用emacs 让emacs强于sourceinsight 3.cscope在emacs的配置与使用

学习杂记之root密码的更改

1  在学习linux过程中总会出现忘记root密码的时候,这是要在开机启动的系统选项中按上下键.使得它的页面停留在你要更改密码的选项后按e,之后会进入一个文本,在他的最后第2行会有这样的字眼ro  rd.XXXX  这是你将光标移到到对应位置,将原有的ro  rd.  更改为rw  rd.break 然后按ctrl+r这时系统会进入一个假root,输入chroot sysroot 可以变成真的root 然后passwd  改密码即可,如果系统的selinux 是开的话 要touch /.aut

Linux学习杂记

最近一口气看完了韩顺平老师讲的Linux视频教程,自己也在学习的过程中做了些笔记,记载如下,希望帮助到一些喜欢研究Linux的同学,也算是在云端备份一下笔记吧,以免电脑出现不可控的因素而遗失自己的心血. 首先声明,这篇笔记不是按照韩老师讲的顺序来写的,可能有些凌乱,但我相信,你总能找到你想要的信息: Linux必看书籍: 鸟哥的私房菜 Linux编程从入门到精通 Linux内核安全剖析 命令: shutdown -h now  立刻关机 shutdown -r now   重启 reboot  

学习杂记1:c#,顺序泛型栈,泛型委托,Lambda,拓展方法

最近又重新回过头来学习C#,才发现之前学习的知识有多么的浅显,有些东西还是挺难懂的,但是比较有意思,例如协变,逆变,不变就挺有意思,但是也挺绕,下面是学习过程中写的一些代码,有点乱,就当日记记叙一下. using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Genericity泛型 { //泛型委托 p

Python学习杂记_4_分支和循环

分支 和 循环 分支和循环这俩结构在各语言中都有着很重要的地位,当然我之前都没有学好,这里总结一下在Python学习中对这俩结构的认识. 分支结构 # 单分支 if 条件判断: 执行语句- # 双分支 if 条件判断: 执行语句- else: 执行语句- #多分支 if 条件判断: 执行语句- elif: 执行语句- else: 执行语句- 循环结构 1. while循环 ,需要定义循环变量来控制循环. i = 0初始化一个控制循环的变量 while 有关循环变量的条件判断: 执行循环语句 -

并发框架Disruptor学习入门

刚刚听说disruptor,大概理一下,只为方便自己理解,文末是一些自己认为比较好的博文,如果有需要的同学可以参考. 本文目标:快速了解Disruptor是什么,主要概念,怎么用 1.Disruptor简介 Disruptor是什么?有什么特点/优点? --Disruptor是一个用于在线程间通信的高效低延时的消息组件,它像个增强的队列. --它是一个高性能.低延迟.使用简单的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式实现,或者事件-监听模式的实现,直

go语言学习杂记

go语言在windows下下载安装 推荐在golang中国下载 http://www.golangtc.com/download 安装.... go环境变量与工作目录 根据约定GOPATH需要建立3个目录 bin(存放编译后生成的可执行文件) pkg(存放编译后生成的包文件 .a) src(存放项目源码 比如:.go .c .h .s等)使用go env查看环境 常用命令 go get:获取远程包(git或hg(google code)) go run :直接运行程序 go bulid :测试编

【Obj-C】学习杂记

之前大学学过C++,忘得差不多了,前阵子看了些<[中文版]C语言编程:一本全面的C语言入门教程(第3版)>,觉得可以直接开始看<Objective-C基础教程(第2版)>pdf版. 恩,#import就是#include啦,#import<Foundation/foundation.h>就包含默认所有的头文件,很方便,唯一的不会重复. Obj-C的Bool不同于C的,是一个八位带符号的字符类型,其实只看末位,是0就NO,是1就YES(有别于TRUE&FALSE)