Disruptor深入解读

将系统性能优化到极致,永远是程序爱好者所努力的一个方向。在java并发领域,也有很多的实践与创新,小到乐观锁、CAS,大到netty线程模型、纤程Quasar、kilim等。Disruptor是一个轻量的高性能并发框架,以惊人的吞吐量而受到广泛的关注。Disruptor为提高程序的并发性能,提供了很多新的思路,比如:

  1. 缓存行填充,消除伪共享;
  2. RingBuffer无锁队列设计;
  3. 预分配缓存对象,使用缓存的循环覆盖取代缓存的新增删除等;

下文将从源码角度解析Disruptor的实现原理。

1 Disruptor术语

Disruptor有很多自身的概念,使得初学者看代码会比较费劲。因此在深入Disruptor原理之前,需要先了解一下Disruptor主要的几个核心类或接口。

  • Sequence: 采用缓存行填充的方式对long类型的一层包装,用以代表事件的序号。通过unsafe的cas方法从而避免了锁的开销;
  • Sequencer: 生产者与缓存RingBuffer之间的桥梁。单生产者与多生产者分别对应于两个实现SingleProducerSequencer与MultiProducerSequencer。Sequencer用于向RingBuffer申请空间,使用publish方法通过waitStrategy通知所有在等待可消费事件的SequenceBarrier;
  • WaitStrategy: WaitStrategy有多种实现,用以表示当无可消费事件时,消费者的等待策略;
  • SequenceBarrier: 消费者与缓存RingBuffer之间的桥梁。消费者并不直接访问RingBuffer,从而能减少RingBuffer上的并发冲突;
  • EventProcessor: 事件处理器,是消费者线程池Executor的调度单元,是对事件处理EventHandler与异常处理ExceptionHandler等的一层封装;
  • Event: 消费事件。Event的具体实现由用户定义;
  • RingBuffer: 基于数组的缓存实现,也是创建sequencer与定义WaitStrategy的入口;
  • Disruptor: Disruptor的使用入口。持有RingBuffer、消费者线程池Executor、消费者集合ConsumerRepository等引用。

2 Disruptor源码分析

2.1 Disruptor并发模型

并发领域的一个典型场景是生产者消费者模型,常规方式是使用queue作为生产者线程与消费者线程之间共享数据的方法,对于queue的读写避免不了读写锁的竞争。Disruptor使用环形缓冲区RingBuffer作为共享数据的媒介。生产者通过Sequencer控制RingBuffer,以及唤醒等待事件的消费者,消费者通过SequenceBarrier监听RingBuffer的可消费事件。考虑一个场景,一个生产者A与三个消费者B、C、D,同时D的事件处理需要B与C先完成。则该模型结构如下:

在这个结构下,每个消费者拥有各自独立的事件序号Sequence,消费者之间不存在共享竞态。SequenceBarrier1监听RingBuffer的序号cursor,消费者B与C通过SequenceBarrier1等待可消费事件。SequenceBarrier2除了监听cursor,同时也监听B与C的序号Sequence,从而将最小的序号返回给消费者D,由此实现了D依赖B与C的逻辑。
RingBuffer是Disruptor高性能的一个亮点。RingBuffer就是一个大数组,事件以循环覆盖的方式写入。与常规RingBuffer拥有2个首尾指针的方式不同,Disruptor的RingBuffer只有一个指针(或称序号),指向数组下一个可写入的位置,该序号在Disruptor源码中就是Sequencer中的cursor,由生产者通过Sequencer控制RingBuffer的写入。为了避免未消费事件的写入覆盖,Sequencer需要监听所有消费者的消息处理进度,也就是gatingSequences。RingBuffer通过这种方式实现了事件缓存的无锁设计。
下面将通过分析源码,来理解Disruptor的实现原理。

2.2 Disruptor类

Disruptor类是Disruptor框架的总入口,能用DSL的形式组织消费者之间的关系链,并提供获取事件、发布事件等方法。它包含以下属性:

private final RingBuffer<T> ringBuffer;
/**消费者事件处理线程池**/
private final Executor executor;
/**消费者集合**/
private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();
/**Disruptor是否启动标示,只能启动一次**/
private final AtomicBoolean started = new AtomicBoolean(false);
/**消费者事件异常处理方法**/
private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<T>();

实例化Disruptor的过程,就是实例化RingBuffer与消费线程池Executor的过程。除此之外,Disruptor类最重要的作用是注册消费者,handleEventsWith方法。该方法有多套实现,而每一个消费者最终都会被包装成EventProcessor。createEventProcessors是包装消费者的重要函数。

EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,?                                           final EventHandler<T>[] eventHandlers)?{
    checkNotStarted();
    //每个消费者有自己的事件序号Sequence
    final Sequence[] processorSequences = new Sequence[eventHandlers.length];
    //消费者通过SequenceBarrier等待可消费事件
    final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);    for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
    {
        final EventHandler<T> eventHandler = eventHandlers[i];
        //每个消费者都以BatchEventProcessor被调度
        final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);
        if (exceptionHandler != null)
        {
            batchEventProcessor.setExceptionHandler(exceptionHandler);
        }
        consumerRepository.add(batchEventProcessor, eventHandler, barrier);
        processorSequences[i] = batchEventProcessor.getSequence();
    }

    if (processorSequences.length > 0)
    {
        consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
    }

    return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
}

从程序中可以看出,每个消费者都以BatchEventProcessor的形式被调度,也就是说,消费者的逻辑都在BatchEventProcessor。

2.3 EventProcessor

EventProcessor有两个有操作逻辑的实现类,BatchEventProcessor与WorkProcessor,处理逻辑很相近,这边仅分析BatchEventProcessor。
BatchEventProcessor的构造函数使用DataProvider,而不直接使用RingBuffer,可能是Disruptor考虑到留给用户替换RingBuffer事件存储的空间,毕竟RingBuffer是内存级的。
Disruptor启动时,会调用每个消费者ConsumerInfo(在消费者集合ConsumerRepository中)的start方法,最终会运行到BatchEventProcessor的run方法。

@Override
public void run()
{
    if (!running.compareAndSet(false, true))
    {
        throw new IllegalStateException("Thread is already running");
    }
    sequenceBarrier.clearAlert();

    notifyStart();

    T event = null;
    // sequence.get()标示当前已经处理的序号
    long nextSequence = sequence.get() + 1L;
    try
    {
        while (true)
        {
            try
            {
                // sequenceBarrier最重要的作用,就是让消费者等待下一个可用的序号
                // 可用序号可能会大于nextSequence,从而消费者可以一次处理多个事件
                // 如果该消费者同时也依赖了其他消费者,则会返回最小的那个
                final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                if (nextSequence > availableSequence)
                {
                    Thread.yield();
                }

                while (nextSequence <= availableSequence)
                {
                    event = dataProvider.get(nextSequence);
                    // eventHandler是用户定义的事件消费逻辑
                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                    nextSequence++;
                }

                // 跟踪自己处理的事件
                sequence.set(availableSequence);
            }
            catch (final TimeoutException e)
            {
                notifyTimeout(sequence.get());
            }
            catch (final AlertException ex)
            {
                if (!running.get())
                {
                    break;
                }
            }
            catch (final Throwable ex)
            {
                exceptionHandler.handleEventException(ex, nextSequence, event);
                sequence.set(nextSequence);
                nextSequence++;
            }
        }
    }
    finally
    {
        notifyShutdown();
        running.set(false);
    }
}

消费者的逻辑,就是在while循环中,不断查询可消费事件,并由用户自定义的消费逻辑eventHandler进行处理。查询可消费事件的逻辑在SequenceBarrier中。

2.4 SequenceBarrier

SequenceBarrier只有一个实现,ProcessingSequenceBarrier。下面是ProcessingSequenceBarrier的构造函数。

public ProcessingSequenceBarrier(final Sequencer sequencer,final WaitStrategy waitStrategy,final Sequence cursorSequence,final Sequence[] dependentSequences)
{
    // 生产者的ringBuffer控制器sequencer
    this.sequencer = sequencer;
    // 消费者等待可消费事件的策略
    this.waitStrategy = waitStrategy;
    // ringBuffer的cursor
    this.cursorSequence = cursorSequence;
    if (0 == dependentSequences.length)
    {
        dependentSequence = cursorSequence;
    }
    else
    {
    // 当依赖其他消费者时,dependentSequence就是其他消费者的序号
        dependentSequence = new FixedSequenceGroup(dependentSequences);
    }
}

消费者通过ProcessingSequenceBarrier的waitFor方法等待可消费序号,实际是调用WaitStrategy的waitFor方法。

2.5 WaitStrategy

WaitStrategy有6个实现类,用于代表6种不同的等待策略,比如阻塞策略、忙等策略等。这边就仅分析一个阻塞策略BlockingWaitStrategy。

@Override
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
    throws AlertException, InterruptedException
{
    long availableSequence;
    if ((availableSequence = cursorSequence.get()) < sequence)
    {
        lock.lock();
        try
        {
            // 如果ringBuffer的cursor小于需要的序号,也就是生产者没有新的事件发出,则阻塞消费者线程,直到生产者通过Sequencer的publish方法唤醒消费者。
            while ((availableSequence = cursorSequence.get()) < sequence)
            {
                barrier.checkAlert();
                processorNotifyCondition.await();
            }
        }
        finally
        {
            lock.unlock();
        }
    }

    // 如果生产者新发布了事件,但是依赖的其他消费者还没处理完,则等待所依赖的消费者先处理。在本文的例子中,就是等B与C先处理完,D才能处理事件。
    while ((availableSequence = dependentSequence.get()) < sequence)
    {
        barrier.checkAlert();
    }

    return availableSequence;
}

到这里,消费者的程序逻辑也就基本都清楚了。最后再看一下生产者的程序逻辑,主要是Sequencer。

2.6 Sequencer

Sequencer负责生产者对RingBuffer的控制,包括查询是否有写入空间、申请空间、发布事件并唤醒消费者等。Sequencer有两个实现SingleProducerSequencer与MultiProducerSequencer,分别对应于单生产者模型与多生产者模型。只要看懂hasAvailableCapacity(),申请空间也就明白了。下面是SingleProducerSequencer的hasAvailableCapacity实现。

@Override
public boolean hasAvailableCapacity(final int requiredCapacity)
{
    long nextValue = pad.nextValue;
    // wrapPoint是一个临界序号,必须比当前最小的未消费序号还小
    long wrapPoint = (nextValue + requiredCapacity) - bufferSize;
    // 当前的最小未消费序号
    long cachedGatingSequence = pad.cachedValue;

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
    {
        long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
        pad.cachedValue = minSequence;

        if (wrapPoint > minSequence)
        {
            return false;
        }
    }
    return true;
}

3 Disruptor实例

本实例基于3.2.0版本的Disruptor,实现2.1小结描述的并发场景。使用Disruptor的过程非常简单,只需要简单的几步。
定义用户事件:

public class MyEvent {
    private long value;

    public MyEvent(){}

    public long getValue() {
        return value;
    }

    public void setValue(long value) {
        this.value = value;
    }
}

定义事件工厂,这是实例化Disruptor所需要的:

public class MyEventFactory implements EventFactory<MyEvent> {
    public MyEvent newInstance() {
        return new MyEvent();
    }
}

定义消费者B、C、D:

public class MyEventHandlerB implements EventHandler<MyEvent> {
    public void onEvent(MyEvent myEvent, long l, boolean b) throws Exception {
        System.out.println("Comsume Event B : " + myEvent.getValue());
    }
}

public class MyEventHandlerC implements EventHandler<MyEvent> {
    public void onEvent(MyEvent myEvent, long l, boolean b) throws Exception {
        System.out.println("Comsume Event C : " + myEvent.getValue());
    }
}

public class MyEventHandlerD implements EventHandler<MyEvent> {
    public void onEvent(MyEvent myEvent, long l, boolean b) throws Exception {
        System.out.println("Comsume Event D : " + myEvent.getValue());
    }
}

在此基础上,就可以运行Disruptor了:

public static void main(String[] args){
    EventFactory<MyEvent> myEventFactory = new MyEventFactory();
    Executor executor = Executors.newCachedThreadPool();
    int ringBufferSize = 32;

    Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(myEventFactory,ringBufferSize,executor, ProducerType.SINGLE,new BlockingWaitStrategy());
    EventHandler<MyEvent> b = new MyEventHandlerB();
    EventHandler<MyEvent> c = new MyEventHandlerC();
    EventHandler<MyEvent> d = new MyEventHandlerD();

    SequenceBarrier sequenceBarrier2 = disruptor.handleEventsWith(b,c).asSequenceBarrier();
    BatchEventProcessor processord = new BatchEventProcessor(disruptor.getRingBuffer(),sequenceBarrier2,d);
    disruptor.handleEventsWith(processord);
//  disruptor.after(b,c).handleEventsWith(d);              // 此行能代替上两行的程序逻辑
    RingBuffer<MyEvent> ringBuffer = disruptor.start();    // 启动Disruptor
    for(int i=0; i<10; i++) {
        long sequence = ringBuffer.next();                 // 申请位置
        try {
            MyEvent myEvent = ringBuffer.get(sequence);
            myEvent.setValue(i);                           // 放置数据
        } finally {
            ringBuffer.publish(sequence);                  // 提交,如果不提交完成事件会一直阻塞
        }
        try{
            Thread.sleep(100);
        }catch (Exception e){
        }
    }
    disruptor.shutdown();
}

按照程序的逻辑,B与C会率先处理ringBuffer中的事件,且处理顺序不分先后。同一事件被B与C处理完成之后,才会被D处理,结果如下:

Comsume Event C : 0
Comsume Event B : 0
Comsume Event D : 0
Comsume Event C : 1
Comsume Event B : 1
Comsume Event D : 1
Comsume Event C : 2
Comsume Event B : 2
Comsume Event D : 2
Comsume Event C : 3
Comsume Event B : 3
Comsume Event D : 3
Comsume Event C : 4
Comsume Event B : 4
Comsume Event D : 4
Comsume Event C : 5
Comsume Event B : 5
Comsume Event D : 5
Comsume Event C : 6
Comsume Event B : 6
Comsume Event D : 6
Comsume Event C : 7
Comsume Event B : 7
Comsume Event D : 7
Comsume Event C : 8
Comsume Event B : 8
Comsume Event D : 8
Comsume Event C : 9
Comsume Event B : 9
Comsume Event D : 9

将本例中的Thread.sleep去掉,即可以观察到B与C的处理不分先后,结果符合预期。

本文乃作者原创,转载请注明出处。http://www.cnblogs.com/miao-rui/p/6379473.html

时间: 2024-10-24 08:12:23

Disruptor深入解读的相关文章

Disruptor 源码阅读笔记--转

原文地址:http://coderbee.net/index.php/open-source/20130812/400 一.Disruptor 是什么? Disruptor 是一个高性能异步处理框架,也可以认为是一个消息框架,它实现了观察者模式. Disruptor 比传统的基于锁的消息框架的优势在于:它是无锁的.CPU友好:它不会清除缓存中的数据,只会覆盖,降低了垃圾回收机制启动的频率. 这个解读是在最新版 3.1.1 的源码上进行. 关于Disruptor的更多介绍可见: http://if

ViewGroup源码解读

我们之前刚刚分析完事件传递机制和view的源码,如果没有看过的,建议看完View的事件拦截机制浅析以及View的事件源码解析.这次我们来分析下viewgroup的. 可能有人会想,怎么又是源码分析,肯定又是一大通.其实没你想的那么复杂.仔细分析一波就行了. 解读ViewGroup 我们都知道,一个事件完整的流程是从dispatchTouchevent–>onInterceptTouchevent–>onTouchEvent.我们先不说事件监听的问题.上述三个步骤就是正常一个点击的流程.前面我们

mysql之show engine innodb status解读(转)

add by zhj: 我第一次知道这个命令是线上服务出了问题,然后同事用这个命令去查看死锁.但用这个命令看死锁有一定的局限性,它只能看到最后一次死锁, 而且只能看到死锁环中的两个事务所执行的最后一条语句(即被死锁卡住的那条语句),看不到整个死锁环,也看到不整个事务的语句.但是即使这亲,对我 们来说也非常有用,因为一般来说,数据库同时存在多个死锁环的可能性比较小,而且有了死锁环中的事务的最后一条语句,我们找到整个死锁环不是太难. "show engine innodb status"这

智慧中国杯百万大奖赛解读 | 学霸去哪了(二)

在上一篇中我们探讨了学生的消费数据,消费数据对本次竞赛预测来讲很重要.本篇将探索寝室门禁.图书借阅.图书馆门禁和学生成绩等一些和学生学习相关的数据,来看看学生的品行如何,虽然资助金和奖学金的性质不太一样,但我们毕竟还是想资助那些品学兼优的学生,而不是资助虽然家境很贫寒但不学无术的学生. 所以本篇探索的数据可以比较好的反应出这些情况.当然这里面还隐藏了更好玩的话题,学霸去哪了?话不多说,let's go! 一.数据目录概况 官方提供的数据分为两组,分别是训练集和测试集,每一组都包含大约1万名学生的

QCustomplot使用分享(二) 源码解读

一.头文件概述 从这篇文章开始,我们将正式的进入到QCustomPlot的实践学习中来,首先我们先来学习下QCustomPlot的类图,如果下载了QCustomPlot源码的同学可以自己去QCustomPlot的目录下documentation/qcustomplot下寻找一个名字叫做index.html的文件,将其在浏览器中打开,也是可以找到这个库的类图.如图1所示,是组成一个QCustomPlot类图的可能组成形式. 一个图表(QCustomPlot):包含一个或者多个图层.一个或多个ite

以蓝牙开发的视觉解读微信Airsync协议

微信硬件平台使用蓝牙作为近场控制的连接件,并拟定了<微信蓝牙外设协议>.这份协议更像一个标准,用于规范微信和蓝牙外设之间的数据交互场景和接口.但从开发者来看,要完全读懂这份协议,恐怕需要熟读很多遍,并且要结合调试才能真正实现微信Airsync通信.笔者对IOT和微信硬件平台的整个框架和技术都比较熟悉了,并且已经在TI的CC254X和Dialog的DA14580上实现了微信Airsync协议通信.现在回过头来,从开发的角度,对微信Airsync协议进行重新解读,以帮助新进入物联网领域的开发者更快

Ehcache详细解读

Ehcache详细解读 Ehcache  是现在最流行的纯Java开源缓存框架,配置简单.结构清晰.功能强大,最初知道它,是从Hibernate的缓存开始的.网上中文的EhCache材料以简单介绍和配置方法居多,如果你有这方面的问题,请自行google:对于API,官网上介绍已经非常清楚,请参见官网:但是很少见到特性说明和对实现原理的分析,因此在这篇文章里面,我会详细介绍和分析EhCache的特性,加上一些自己的理解和思考,希望对缓存感兴趣的朋友有所收获. 一.特性一览 ,来自官网,简单翻译一下

MemCache超详细解读

MemCache是什么 MemCache是一个自由.源码开放.高性能.分布式的分布式内存对象缓存系统,用于动态Web应用以减轻数据库的负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高了网站访问的速度.MemCaChe是一个存储键值对的HashMap,在内存中对任意的数据(比如字符串.对象等)所使用的key-value存储,数据可以来自数据库调用.API调用,或者页面渲染的结果.MemCache设计理念就是小而强大,它简单的设计促进了快速部署.易于开发并解决面对大规模的数据缓存的

linux 系统性能分析(top命令)及更准确解读内存的占用率(free -m 命令)

一.系统性能分析(top命令) top 命令是 Linux 下常用的性能分析工具,能够实时显示系统中各个进程的资源占用状况,默认5秒刷新一下进程列表,所以类似于 Windows 的任务管理器. 系统整体当下的统计信息 top命令显示的前五行是系统整体的统计信息. 第一行是任务队列信息,同uptime命令的执行结果.eg. top  -  15:09:51  up  17  days  ,  3:38  ,  4  users  ,  load  average  :  1.09  ,  3.39