慎入,有点乱,只是学习记录,disruptor_2.10.4
1、Disruptor对象有一个EventProcessorRepository对象
2、EventProcessorRepository里储存的是EventProcessorInfo和EventHandler(既创建Disruptor对象时设置的EventHandler)的映射关系,
及EventProcessorInfo和EventProcessor(实际为BatchEventProcessor)的映射关系
2.1 首先看看BatchEventProcessor是什么东西
BatchEventProcessor继承自EventProcessor,是一个消费者的执行体,说白了就是一个线程,可以看到它的run方法。
它主要有三个成员RingBuffer、SequenceBarrier和EventHandler,这里还需要看看这个SequenceBarrier是什么东西。
barrier在Disruptor的createEventProcessors()中创建的
SequenceBarrier barrier = ringBuffer.newBarrier(Util.getSequencesFor(barrierEventProcessors));
可以在RingBuffer(Sequencer)中看到barrier实际是ProcessingSequenceBarrier对象
new ProcessingSequenceBarrier(waitStrategy, cursor, sequencesToTrack);
先看看它储存了一些什么信息:
waitStrategy:消费者的等待策略
cursor:这个保存的是RingBuffer 环中当前的序号
sequencesToTrack:Sequence[] dependentSequences先理解为依赖的序列号,具体干嘛的还要待看
2.2 现在开看看生产者和消费者是如何运作的
现从Disruptor.start()开始
找到EventProcessorRepository中最后面的EventProcessor
EventProcessor[] gatingProcessors = eventProcessorRepository.getLastEventProcessorsInChain();
从EventProcessor中找到最后面的Sequence,然后设置ringBuffer最后面消费者对应的Sequence
ringBuffer.setGatingSequences(Util.getSequencesFor(gatingProcessors));
接着执行executor,运行EventProcessor(线程)
executor.execute(eventProcessorInfo.getEventProcessor());
2.3 现在看看EventProcessor(消费者线程)都干了些什么
首先它要获取可用的Sequence是哪个
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
走到BlockingWaitStrategy.waitFor()
现得到ringbuffer中当前消费者所在的availableSequence = cursor.get()位置,比较请求的位置和当前的位置,如果请求的位置比当前的位置大,说明消费者已经没有可消费的事件了。执行processorNotifyCondition.await();等待
2.4 看看生产者又是怎么处理的
生产者先要获取下一个sequence是多少,调用ringBuffer.next();
走到MultithreadedClaimStrategy.incrementAndGet(gatingSequences);
这个gatingSequences就是生产者的依赖Sequence,他就是2.2中ringBuffer最后面消费者对应的Sequences,从中找出最小的并保存,先取出最后生产者的sequence并加1,如果sequence+1大于最小消费者sequence。则挂起线程LockSupport.parkNanos(1L);
否则返回可生产的sequence。
3 关于Sequence
可以看到RingBuffer(既Sequencer)有一个Sequence cursor成员
同时AbstractMultithreadedClaimStrategy有一个Sequence claimSequence成员
BatchEventProcessor里也由一个Sequence
这两个Sequence有什么关联了sequence
首先claimSequence保存的是请求的sequence,里面保存的是当前被请求的最大的序号。
生产者先拿到claimSequence中保存的最大序号的下一个序号。
接着,拿到从请求的序号执行ringbuffer.publish()。
3.1生产者分配数据
claimStrategy.serialisePublishing(sequence, cursor,batchSize);
先计算资源(环)中预期的最大序号expectedSequence
然后依次更新ringbuffer的cursor,保存需要请求的所有序号中最大的(如果有多个sequence请求)
3.2 通知消费者
waitStrategy.signalAllWhenBlocking();
现在唯一的疑问就是消费者是如何知道生产者生产到哪个位置了的?
看到BatchEventProcessor.run()中的如下代码,通过SequenceBarrier对象获取当前生产者的位置
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
原来是根据这个sequenceBarrier中的Sequence cursorSequence;中的值来判断当前生产者的位置的。
Sequence cursorSequence这个是在sequenceBarrier也就是ProcessingSequenceBarrier构造时,传入的ringbuffer的Sequence cursor。
sequenceBarrier是如何从cursorSequence获取的当前生产者的位置的了?
联想到另外一个问题,看到BatchEventProcessor是没有WaitStrategy成员对象。那WaitStrategy在消费者这头是在哪里用到的了。
BatchEventProcessor调用ProcessingSequenceBarrier.waitFor()
ProcessingSequenceBarrier又调用waitStrategy.waitFor(),现在看看BlockingWaitStrategy.waitFor()都做了些什么?
如何消费者的请求序号大于ringbuffer的当前序号,则执行processorNotifyCondition.await();挂起线程
否则返回请求的序号,给消费者使用