Disruptor极速队列

参考:http://www.cnblogs.com/haiq/p/4112689.html

Disruptor 是线程内通信框架,用于线程里共享数据。LMAX 创建Disruptor作为可靠消息架构的一部分并将它设计成一种在不同组件中共享数据非常快的方法。

Disruptor能做什么

  • 同一个“事件”可以有多个消费者,消费者之间既可以并行处理,也可以相互依赖形成处理的先后次序(形成一个依赖图);  主要基于内存屏障,如下图所示:
  • 预分配用于存储事件内容的内存空间;
  • 针对极高的性能目标而实现的极度优化和无锁的设计;

Disruptor核心概念

先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。

    • Ring Buffer
      如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。
    • Sequence  Disruptor
      通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。
      (注:这是 Disruptor 实现高性能的关键点之一,网上关于伪共享问题的介绍已经汗牛充栋,在此不再赘述)。
    • Sequencer 
      Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
    • Sequence Barrier
      用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。 Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。
    • Wait Strategy
      定义 Consumer 如何进行等待下一个事件的策略。 (注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)
    • Event
      在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。
    • EventProcessor
      EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。
    • EventHandler
      Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。
    • Producer
      即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。

Disruptor关键特性

写入ringBuffer

参考:http://ifeve.com/disruptor-writing-ringbuffer/

Ring Buffer是基于数组,而不是链表,不会产生内存回收的性能消耗。

1)Disruptor 由消费者负责通知ProducerBarrier,处理到了哪个序列号,而不是 Ring Buffer。所以,如果我们想确定我们没有让 Ring Buffer 重叠,需要检查所有的消费者们都读到了哪里。

2)现在生产者想要写入 Ring Buffer 中序号 3 占据的节点,因为它是 Ring Buffer 当前游标的下一个节点。但是 ProducerBarrier 明白现在不能写入,因为有一个消费者正在占用它。所以,ProducerBarrier 停下来自旋 (spins),等待,直到那个消费者离开。

提交数据及多生产者

Disruptor案例

package com.disruptor;
/**
 * 定义事件
 * 事件event就是通过disruptor进行交换的数据类型
 * @author gaojiay
 *
 */
public class LongEvent {

  private long value;
  public void set(long value){
    this.value = value;
  }
  public long get() {
    // TODO Auto-generated method stub
    return value;
  }
}

package com.disruptor;

import com.lmax.disruptor.EventFactory;
/**
 * 事件工厂(Event Factory)定义了实例化定义的事件(Event);
 * Disruptor 通过 EventFactory 在 RingBuffer 中预创建 Event 的实例;
 *
 * 个 Event 实例实际上被用作一个“数据槽”,发布者发布前,
 * 先从 RingBuffer 获得一个 Event 的实例,然后往 Event 实例中填充数据,
 * 之后再发布到 RingBuffer 中,之后由 Consumer 获得该 Event
 *  实例并从中读取数据。
 *
 */
public class LongEventFactory  implements EventFactory<LongEvent>{

  @Override
  public LongEvent newInstance() {
    // TODO Auto-generated method stub
    return new LongEvent();
  }

}

package com.disruptor;

import com.lmax.disruptor.EventHandler;

/**
 * 定义事件处理的具体实现 消费者,也就是事件处理器
 *
 * @author gaojiay
 *
 */
public class LongEventHandler implements EventHandler<LongEvent> {

  private String name;

  @Override
  public void onEvent(LongEvent event, long sequence, boolean endofBatch) throws Exception {
    System.out.println("CoustmerEvent_"+name+":" + event.get());

  }

  public LongEventHandler() {
    super();
  }

  public LongEventHandler(String name) {
    this();
    this.name = name;

  }
}

main

package com.disruptor;
/**
 * ringbuffer的写入  http://ifeve.com/disruptor-writing-ringbuffer/
 */
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
//http://www.cnblogs.com/haiq/p/4112689.html
public class main {
 //static  AtomicLong data = new AtomicLong(1000);
  static long count = 100;
  public static void main(String[] args) {
    //Disruptor 通过 java.util.concurrent.ExecutorService 提供的线程来触发 Consumer 的事件处理。例如:
    ExecutorService executor = Executors.newCachedThreadPool();
    /*BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现;
    SleepingWaitStrategy 的性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景;
    YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。*/
    WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
    WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
    WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();

    //启动
    EventFactory<LongEvent> eventFactory = new LongEventFactory();
    int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必须是 2 的 N 次方;

    Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory,
                    ringBufferSize, executor, ProducerType.SINGLE,
                    new YieldingWaitStrategy());

    EventHandler<LongEvent> eventHandler1 = new LongEventHandler("1");  //消费者1
    EventHandler<LongEvent> eventHandler2 = new LongEventHandler("2");  //消费者2
    disruptor.handleEventsWith(eventHandler1,eventHandler2);

    disruptor.start();
    // 发布事件;  

/* 发布方式一
    RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
    for(int i= 0;i<100;i++){
    long sequence = ringBuffer.next();//请求下一个事件序号;

    try {
        LongEvent event = ringBuffer.get(sequence);//获取该序号对应的事件对象;
        long data = getData();//获取要通过事件传递的业务数据;
        event.set(data);
    } finally{
      //注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;
               如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。
      Disruptor 还提供另外一种形式的调用来简化以上操作,并确保 publish 总是得到调用。
        ringBuffer.publish(sequence);//发布事件;
    }
    }
    disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理;
    executor.shutdown();//关闭 disruptor 使用的线程池;如果需要的话,必须手动关闭, disruptor 在 shutdown 时不会自动关闭;
  */

    //发布方式二 :一些复杂的操作放在Ring Buffer
    RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
    LongEventProducerWithTranslator translator = new LongEventProducerWithTranslator(ringBuffer);
    translator.onData(getData());
  }

  private  static long getData(){
   // return data.getAndIncrement();
    return ++count;
  }

}

两种发布方式

package com.disruptor;

import java.nio.ByteBuffer;

import com.lmax.disruptor.RingBuffer;

public class LongEventProducer {
  private final RingBuffer<LongEvent> ringBuffer;
  public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
      this.ringBuffer = ringBuffer;
  } 

  /**
   * onData用来发布事件,每调用一次就发布一次事件事件
   * 它的参数会通过事件传递给消费者
   *
   * @param bb
   */public void onData(ByteBuffer bb) {
          //可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽
          long sequence = ringBuffer.next();
      try {
          //用上面的索引取出一个空的事件用于填充
          LongEvent event = ringBuffer.get(sequence);// for the sequence
          event.set(bb.getLong(0));
      } finally {
          //发布事件 只ringBuffer
          ringBuffer.publish(sequence);
      }
  } 

}

package com.disruptor;

import java.nio.ByteBuffer;

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

/**
 * Disruptor 3.0提供了lambda式的API。这样可以把一些复杂的操作放在Ring Buffer
 * ,所以在Disruptor3.0以后的版本最好使用Event Publisher或者Event Translator来发布事件。
 * @author gaojiay
 *
 */
public class LongEventProducerWithTranslator {

  //一个translator可以看做一个事件初始化器,publicEvent方法会调用它
  //填充Event
  private static final EventTranslatorOneArg<LongEvent, Long> TRANSLATOR =
      //Translator中方法的参数是通过RingBuffer来传递的。
      new EventTranslatorOneArg<LongEvent, Long>() {
          public void translateTo(LongEvent event, long sequence, Long bb) {
              event.set(bb);
          }
      };

      private final RingBuffer<LongEvent> ringBuffer;
      public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
          this.ringBuffer = ringBuffer;
      } 

      public void onData(long bb) {
          ringBuffer.publishEvent(TRANSLATOR, bb);
      }
}

测试第两种:

package com.disruptor;

import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;

public class LongEventMain {

  public static void main(String[] args) throws InterruptedException {
    // Executor that will be used to construct new threads for consumers
    Executor executor = Executors.newCachedThreadPool();
    // Specify the size of the ring buffer, must be power of 2.
    int bufferSize = 1024;// Construct the Disruptor
    Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);
    // 可以使用lambda来注册一个EventHandler
    disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event.get()));
    // Start the Disruptor, starts all threads running
    disruptor.start();
    // Get the ring buffer from the Disruptor to be used for publishing.
    RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); 

    LongEventProducer producer = new LongEventProducer(ringBuffer); 

    ByteBuffer bb = ByteBuffer.allocate(8);
    for (long l = 0; true; l++) {
        bb.putLong(0, l);
        ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
        Thread.sleep(1000);
    }
} 

}

时间: 2024-11-01 16:33:10

Disruptor极速队列的相关文章

Disruptor 极速体验

已经不记得最早接触到 Disruptor 是什么时候了,只记得发现它的时候它是以具有闪电般的速度被介绍的.于是在脑子里, Disruptor 和“闪电”一词关联了起来,然而却一直没有时间去探究一下. 最近正在进行一项对性能有很高要求的产品项目的研究,自然想起了闪电般的 Disruptor ,这必有它的用武之地,于是进行了一番探查,将成果和体会记录在案. 一.什么是 Disruptor  从功能上来看,Disruptor 是实现了“队列”的功能,而且是一个有界队列.那么它的应用场景自然就是“生产者

Disruptor初级入门

Disruptor 极速体验 高性能队列 Disruptor

并发框架Disruptor学习入门

刚刚听说disruptor,大概理一下,只为方便自己理解,文末是一些自己认为比较好的博文,如果有需要的同学可以参考. 本文目标:快速了解Disruptor是什么,主要概念,怎么用 1.Disruptor简介 Disruptor是什么?有什么特点/优点? --Disruptor是一个用于在线程间通信的高效低延时的消息组件,它像个增强的队列. --它是一个高性能.低延迟.使用简单的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式实现,或者事件-监听模式的实现,直

强如 Disruptor 也发生内存溢出?

前言 OutOfMemoryError 问题相信很多朋友都遇到过,相对于常见的业务异常(数组越界.空指针等)来说这类问题是很难定位和解决的. 本文以最近碰到的一次线上内存溢出的定位.解决问题的方式展开:希望能对碰到类似问题的同学带来思路和帮助. 主要从表现-->排查-->定位-->解决 四个步骤来分析和解决问题. 表象 最近我们生产上的一个应用不断的爆出内存溢出,并且随着业务量的增长出现的频次越来越高. 该程序的业务逻辑非常简单,就是从 Kafka 中将数据消费下来然后批量的做持久化操作

小编带你进入强如 Disruptor 也发生内存溢出?

前言OutOfMemoryError 问题相信很多朋友都遇到过,相对于常见的业务异常(数组越界.空指针等)来说这类问题是很难定位和解决的. 本文以最近碰到的一次线上内存溢出的定位.解决问题的方式展开:希望能对碰到类似问题的同学带来思路和帮助. 主要从表现-->排查-->定位-->解决 四个步骤来分析和解决问题. 表象最近我们生产上的一个应用不断的爆出内存溢出,并且随着业务量的增长出现的频次越来越高. 该程序的业务逻辑非常简单,就是从 Kafka 中将数据消费下来然后批量的做持久化操作.

Java 并发基础

Java 并发基础 线程简述 线程是进程的执行部分,用来完成一定的任务; 线程拥有自己的堆栈,程序计数器和自己的局部变量,但不拥有系统资源, 他与其他线程共享父进程的共享资源及部分运行时环境,因此编程时需要小心,确保线程不会妨碍同一进程中的其他线程; 多线程优势 进程之间不能共享内存,但线程之间共享内存/文件描述符/进程状态非常容易. 系统创建进程时需要为该其分配很多系统资源(如进程控制块),但创建线程的开销要小得多,因此线程实现多任务并发比进程效率高. Java语言内置多线程支持,而不是单纯采

高并发高可用高性能的架构学习

张善友15年底分享的博客:千万级规模高性能.高并发的网络架构经验分享 张开涛17年5月出版的书籍:<亿级流量网站架构核心技术> 提到Disruptor + Redis队列 关于Disruptor的介绍:http://ifeve.com/disruptor/ 原文地址:https://www.cnblogs.com/rgqancy/p/8645845.html

分布式高并发系统设计与分析

如何搭建SpringBoot微服务 ThreadPoolExecutor线程池的使用 ReentrantLock和Synchronized的使用场景 数据库锁机制(悲观锁.乐观锁) 分布式锁(RedissLock.Zookeeper) 进程内消息队列(LinkedBlockingQueue.ArrayBlockingQueue.ConcurrentLinkedQueue) 分布式消息队列(Redis.Kafka) AOP实现切面锁 Disruptor高效队列 商品详情页静态化 case0: 抢红

【转】Java学习---内存溢出的排查经历

[原文]https://www.toutiao.com/i6595365358301872643/ 前言 OutOfMemoryError 问题相信很多朋友都遇到过,相对于常见的业务异常(数组越界.空指针等)来说这类问题是很难定位和解决的. 本文以最近碰到的一次线上内存溢出的定位.解决问题的方式展开:希望能对碰到类似问题的同学带来思路和帮助. 主要从表现-->排查-->定位-->解决 四个步骤来分析和解决问题. 表象 最近我们生产上的一个应用不断的爆出内存溢出,并且随着业务量的增长出现的