Disruptor3.0的实现细节

本文旨在介绍Disruptor3.0的实现细节,首先从整体上描述了Disruptor3.0的核心类图,Disruptor3.0 DSL(领域专用语言)的实现类图,并以Disruptor官方列举的几大特性作为行文思路,看看Disruptor3.0是如何实现这些特性的:内存预加载、消除‘伪共享’、序号栅栏和序号配合使用来消除锁和CAS、批处理效应的具体实现等。

核心类图

  • RingBuffer——Disruptor底层数据结构实现,核心类,是线程间交换数据的中转地;
  • Sequencer——序号管理器,负责消费者/生产者各自序号、序号栅栏的管理和协调;
  • Sequence——序号,声明一个序号,用于跟踪ringbuffer中任务的变化和消费者的消费情况;
  • SequenceBarrier——序号栅栏,管理和协调生产者的游标序号和各个消费者的序号,确保生产者不会覆盖消费者未来得及处理的消息,确保存在依赖的消费者之间能够按照正确的顺序处理;
  • EventProcessor——事件处理器,监听RingBuffer的事件,并消费可用事件,从RingBuffer读取的事件会交由实际的生产者实现类来消费;它会一直侦听下一个可用的序号,直到该序号对应的事件已经准备好。
  • EventHandler——业务处理器,是实际消费者的接口,完成具体的业务逻辑实现,第三方实现该接口;代表着消费者。
  • Producer——生产者接口,第三方线程充当该角色,producer向RingBuffer写入事件。

DSL类图

以下是Disruptor3.0 DSL(domain specific language 特定领域语言)的类图,可以大致知道第三方如何继承Disruptor3.0实现具体业务逻辑。

  • Disruptor——对外暴露的门面类,提供start(),stop(),消费者事件注册,生产者事件发布等api;
  • RingBuffer——对生产者提供下一序号获取、entry元素获取、entry数据更改等api;
  • EventHandler——消费者的接口定义,提供onEvent()方法,负责具体业务逻辑实现;
  • EventHandlerGroup——业务处理器分组,管理多个业务处理器的依赖关系,提供then()、before()、after()等api。

  以下给出代码demo阐述第三方如何简单继承Disruptor3.0:

    public static void main(String[] args) throws Exception
    {
        // The ThreadFactory for create producer thread.
        ThreadFactory producerFactory = new ProducerFactory();

        // The factory for the event
        LongEventFactory eventFactory = new LongEventFactory();

        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 8;

        // Construct the Disruptor,创建Disruptor组件
        Disruptor<LongEvent> disruptor = new Disruptor<>(
                eventFactory,
                bufferSize,
                producerFactory,
                ProducerType.SINGLE,
                new BlockingWaitStrategy()
            );

        // Connect the handler,绑定消费者事件,可以是多个
        disruptor.handleEventsWith(new LongEventHandler());
        disruptor.handleEventsWith(new LogEventHandler());

        // Start the Disruptor, starts all threads running,启动Disruptor,启动所有线程,主要是消费者对应的EventProcessor侦听线程,消费者事件处理器开始侦听RingBuffer中的消息
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        LongEventProducer producer = new LongEventProducer(ringBuffer);

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);       //生产者向RingBuffer中写入消息
            producer.onData(bb);
            Thread.sleep(10);
        }
    }

关键时序图

下图展示了Disruptor3.0整个运行过程的时序图,包括:始化、启动、处理过程。

内存预分配

RingBuffer使用数组Object[] entries作为存储元素,如下图所示,初始化RingBuffer时,会将所有的entries的每个元素指定为特定的Event,这时候event中的detail属性是null;后面生产者向RingBuffer中写入消息时,RingBuffer不是直接将enties[7]指向其他的event对象,而是先获取event对象,然后更改event对象的detail属性;消费者在消费时,也是从RingBuffer中读取出event,然后取出其detail属性。可以看出,生产/消费过程中,RingBuffer的entities[7]元素并未发生任何变化,未产生临时对象,entities及其元素对象一直存活,知道RingBuffer消亡。故而可以最小化GC的频率,提升性能。

注:图中对象Entry写错,应当为Event。

以下是RingBuffer.java类中初始化enties数组的源码:

    private void fill(EventFactory<E> eventFactory)
    {
        for (int i = 0; i < bufferSize; i++)
        {
            entries[BUFFER_PAD + i] = eventFactory.newInstance(); //使用工厂方法初始化enties元素
        }
    }

消费者写入数据到entry中:

//消费者实现EventHandler接口public class LongEventHandler implements EventHandler<LongEvent>
{  //event为从RingBuffer entry中读取的事件内容,消费者从event中读取数据,并完成业务逻辑处理
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
    {
        System.out.println(Thread.currentThread().getName() + " say : process LONG Event: " + event);
    }
}

生产者从entry中读取数据:

public class LongEventProducer
{  //生产者持有RingBuffer实例,可以直接向RingBuffer实例中的entry写入数据
    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer bb)
    {
        long sequence = ringBuffer.next();  // Grab the next sequence
        try
        {       //从ringBuffer实例中获取entry
            LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
                                                        // for the sequence       //生产者将数据写入entry
            event.set(bb.getLong(0));  // Fill with data
        }
        finally
        {        //生产者向ringBuffer提交数据变更
            ringBuffer.publish(sequence);
        }
    }
}

可以看出:生产者未更改ringBuffer实例中entry对象,只是更改了entry中的数据,避免了过多创建临时entry对象带来的GC,进而降低了性能损耗。

消除‘伪共享’

如果两个不同的并发变量位于同一个缓存行,则在并发情况下,会互相影响到彼此的缓存有效性,进而影响到性能,这叫着‘伪共享’。为了避开‘伪共享’,Disruptor3.0在Sequence.java中使用多个long变量填充,从而确保一个序号独占一个缓存行。关于缓存行和‘伪共享’请参考:伪共享(False Sharing)

具体实现代码如下:

//在序号实际value变量(long型)左边填充7个long变量class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding
{
    protected volatile long value;
}
//在序号实际value变量(long型)右边填充7个long变量class RhsPadding extends Value {   protected long p9, p10, p11, p12, p13, p14, p15; } 

public class Sequence extends RhsPadding {   static final long INITIAL_VALUE = -1L;   public Sequence() {     this(INITIAL_VALUE);   }   ...... }
Sequence实际value变量的左右均被填充了7个long型变量,其自身也是long型变量,一个long型变量占据8个字节,所以序号与他上一个/下一个序号之间的最小内存分布距离为:7*8=56byte,加上自身的8个byte,可以确保序号变量独占长度为64byte(通常的一个缓存行长度)缓存行。

序号栅栏和序号配合使用来消除锁和CAS

Disruptor3.0中,序号栅栏(SequenceBarrier)和序号(Sequence)搭配使用,协调和管理消费者与生产者的工作节奏,避免了锁和CAS的使用。在Disruptor3.0中,各个消费者和生产者持有自己的序号,这些序号的变化必须满足如下基本条件:

  • 消费者序号数值必须小于生产者序号数值;
  • 消费者序号数值必须小于其前置(依赖关系)消费者的序号数值;
  • 生产者序号数值不能大于消费者中最小的序号数值,以避免生产者速度过快,将还未来得及消费的消息覆盖。

上述前两点是在SequenceBarrier的waitFor()方法中完成的,源码如下:

   public long waitFor(final long sequence) //sequence参数是该消费者期望获取的下一个序号值
        throws AlertException, InterruptedException, TimeoutException
    {
        checkAlert();
     //根据配置的waitStrategy策略,等待期望的下一序号值变得可用     //这里并不保证返回值availableSequence一定等于 given sequence,他们的大小关系取决于采用的WaitStrategy。     //eg. 1、YieldingWaitStrategy在自旋100次尝试后,会直接返回dependentSequence的最小seq,这时并不保证返回值>=given sequence     //    2、BlockingWaitStrategy则会阻塞等待given sequence可用为止,可用并不是说availableSequence == given sequence,而应当是指 >=
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
        //如果当前可用的序号小于期望获取的下一个序号,则返回availableSequence,这将导致调用者EventProcessor继续wait
        if (availableSequence < sequence)
        {
            return availableSequence;
        }
     //这一句是‘批处理’的精妙所在,放在后面讲
        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }

上面第三点是针对生产者建立的Barrier,逻辑判定发生在生产者从ringBuffer获取下一个可用的entry时,RingBuffer会将获取下一个可用的entry委托给Sequencer。我们以最简单的单生产者SingleProducerSequencer的next()实现来说明。SingleProducerSequencer.next()的源码如下:

    public long next(int n)
    {
        if (n < 1) //n表示此次生产者期望获取多少个序号,通常是1
        {
            throw new IllegalArgumentException("n must be > 0");
        }

        long nextValue = this.nextValue;

        long nextSequence = nextValue + n;  //生产者当前序号值+期望获取的序号数量后达到的序号值
        long wrapPoint = nextSequence - bufferSize; //减掉RingBuffer的总的buffer值,用于判断是否出现‘覆盖’
        long cachedGatingSequence = this.cachedValue;  //从后面代码分析可得:cachedValue就是缓存的消费者中最小序号值,他不是当前最新的‘消费者中最小序号值’,而是上次程序进入到下面的if判定代码段是,被赋值的当时的‘消费者中最小序号值’
                //这样做的好处在于:在判定是否出现覆盖的时候,不用每次都调用getMininumSequence计算‘消费者中的最小序号值’,从而节约开销。只要确保当生产者的节奏大于了缓存的cachedGateingSequence一个bufferSize时,从新获取一下 getMinimumSequence()即可。
//(wrapPoint > cachedGatingSequence) : 当生产者已经超过上一次缓存的‘消费者中最小序号值’(cachedGatingSequence)一个‘Ring’大小(bufferSize),需要重新获取cachedGatingSequence,避免当生产者一直在生产,但是消费者不再消费的情况下,出现‘覆盖’
        //(cachedGatingSequence > nextValue) : 生产者和消费者均为顺序递增的,且生产者的seq“先于”消费者的seq,注意是‘先于’而不是‘大于’。当nextValue>Long.MAXVALUE时,nextValue+1就会变成负数,wrapPoint也会变成负数,这时候必然会是:cachedGatingSequence > nextValue
        //                                     这个变化的过程会持续bufferSize个序号,这个区间,由于getMinimumSequence()得到的虽然是名义上的‘消费者中最小序号值’,但是不代表是走在‘最后面’的消费者
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
        {
            cursor.setVolatile(nextValue);  // StoreLoad fence

            long minSequence;
            //生产者停下来,等待消费者消费,知道‘覆盖’现象清除。
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
            {
                waitStrategy.signalAllWhenBlocking();
                LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
            }

            this.cachedValue = minSequence;
        }

        this.nextValue = nextSequence;

        return nextSequence;
    }

批处理效应

当生产者节奏快于消费者,消费者可以通过‘批处理效应’快速追赶,即:消费者可以一次性从RingBuffer中获取多个已经准备好的enties,从而提高效率。代码实现如下:

SequenceBarrier的waitFor()方法:

    public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
    {
        checkAlert();

        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

        if (availableSequence < sequence)
        {
            return availableSequence;
        }

        //获取消费者可以消费的最大的可用序号,支持批处理效应,提升处理效率。
        //当availableSequence > sequence时,需要遍历 sequence --> availableSequence,找到最前一个准备就绪,可以被消费的event对应的seq。
        //最小值为:sequence-1
        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }

源代码

LMAX-Exchange源码github地址:https://github.com/LMAX-Exchange/disruptor

带中文注释的源码github地址:https://github.com/daoqidelv/disruptor

				
时间: 2024-11-08 23:12:16

Disruptor3.0的实现细节的相关文章

扁平化设计2.0

时至今日,扁平化已不再是流行一时的设计风潮,而是一种美学风格.扁平化大胆的用色,简洁明快的界面风格一度让大家耳目一新,当它对元素效果抛弃的如此彻底之际,它又将效果捡起来,改装成另一番模样,使得扁平化进化为扁平化2.0. 扁平化设计特质 对于扁平化的定义,依然没有一个固定范式,但概括起来有下面四个特征: 1)没有多余的效果,例如投影.凹凸或渐变等 2)使用简洁风格的元素和图标 3)大胆丰富且明亮的配色风格 4)尽量减少装饰的极简设计 扁平化所追随的细节依然不变,然而这些"规范"开始松懈了

扁平化2.0

时至今日,扁平化已不再是流行一时的设计风潮,而是一种美学风格.扁平化大胆的用色,简洁明快的界面风格一度让大家耳目一新,当它对元素效果抛弃的如此彻底之际,它又将效果捡起来,改装成另一番模样,使得扁平化进化为扁平化2.0. 扁平化设计特质 对于扁平化的定义,依然没有一个固定范式,但概括起来有下面四个特征: 1)没有多余的效果,例如投影.凹凸或渐变等 2)使用简洁风格的元素和图标 3)大胆丰富且明亮的配色风格 4)尽量减少装饰的极简设计 扁平化所追随的细节依然不变,然而这些“规范”开始松懈了.随着扁平

.NET Core 2.0版本预计于2017年春季发布

英文原文: NET Core 2.0 Planned for Spring 2017 微软项目经理 Immo Landwerth 公布了即将推出的 .NET Core 2.0 版本的细节,该版本预计于 2017 年春季发布.这是 .NET Core 平台的一个重要发布,因为 2.0 版本对 .NET Core 的各项功能都有显著扩展. 言归正传,我们来看看即将发布的.NET Core 1.0 版本.按计划它将在 Visual Studio 2017 的正式发布会上推出.但是该版本仍将遵循 .NE

扁平化2.0:扁平化设计以前从未有的6个设计风格

时至今日,扁平化已不再是流行一时的设计风潮,而是一种美学风格.扁平化大胆的用色,简洁明快的界面风格一度让大家耳目一新,当它对元素效果抛弃的如此彻底之际,它又将效果捡起来,改装成另一番模样,使得扁平化进化为扁平化2.0. 扁平化设计特质 对于扁平化的定义,依然没有一个固定范式,但概括起来有下面四个特征: 没有多余的效果,例如投影.凹凸或渐变等 使用简洁风格的元素和图标 大胆丰富且明亮的配色风格 尽量减少装饰的极简设计 扁平化所追随的细节依然不变,然而这些“规范”开始松懈了.随着扁平化进化到2.0时

关注C++细节——动态生成对象初始化细节

①T *p =new T; ②T *p =new T(); 这两类用法不同点的总结. 1.若T为类类型,且用户定义了构造函数,则两种形式的效果完全相同,都会调用这个定义了的构造函数来初始化内部成员变量,但是如果此构造函数中并未对成员变量初始化,则这个时候内部的成员变量进行默认初始化--值是未定义的. 2.若T为类类型,但是用户并没有定义任何构造函数,则我们可以知道编译器会为该类合成一个默认的构造函数,这个时候上述两种形式的结果就不同了,①的类内部的成员变量这个时候执行默认初始化,其值是未定义的.

在已经安装好spark的docker镜像里安装cassandra2.0.7

1. 通过docker run命令的-v/–volume参数将主机文件拷贝到docker容器 [[email protected] ~]# docker run -v /data:/mnt -i -t -P -h sandbox sequenceiq/spark:1.2.0 /etc/bootstrap.sh -bash / Starting sshd: [ OK ] Starting namenodes on [sandbox] sandbox: starting namenode, logg

EF with (LocalDb)V11.0

EF虽说对LocalDb支持的不错,但LocalDb有自身的缺陷(不想sqlite那样数据库文件可以像普通文件一样使用). LocalDb在一个计算机上会对数据库有唯一性约束,要求本机的localdb不能重名.如果没有注意到这一点就会有以下问题: Cannot attach the file '{0}' as database '{1}' EF CodeFirst 指定不同数据库文件路径来新建同名的数据库时就会出现无法新建数据库(localDb).     删除localdb的方法   不能仅仅

Log4j,Log4j2,logback,slf4j日志学习

日志学习笔记 Log4j Log4j是Apache的一个开放源代码项目,通过使用Log4j,我们可以控制日志信息输送的目的地是控制台.文件.数据库等:我们也可以控制每一条日志的输出格式:通过定义每一条日志信息的级别,我们能够更加细致地控制日志的生成过程. Log4j有7种不同的log级别,按照等级从低到高依次为:TRACE.DEBUG.INFO.WARN.ERROR.FATAL.OFF.如果配置为OFF级别,表示关闭log. Log4j支持两种格式的配置文件:properties和xml.包含三

disruptor框架

简介: Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易.这个系统是建立在JVM平台上,其核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单.业务逻辑处理器完全是运行在内存中,使用事件源驱动方式.业务逻辑处理器的核心是Disruptor. Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作. Disru