并发框架Disruptor几个Demo

扫盲:

要想了解Disruptor框架必需多花点时间研究下它的工作原理,不然代码是没法撸的!!!

关于Disruptor的详细资料及原理请细看!!!    http://ifeve.com/disruptor/

Disruptor版本: 3.2.1

名词解释

消费者==事件处理器

一、现在你必须明白以下问题:

1、你必须明白Ringbuffer是什么,它的数据结构是怎么样的,有什么约定,为什么高效,它的职责是什么。

2、ConsumerBarrier (ifeve网上的译文版本比较早,这个类在2.0.0之后就一直被改名,3.2.1的版本中它是SequenceBarrier)它的职责是什么。

Disruptor框架在2.0版本之后不再采用生产者、消费者模型来编写API,而是使用事件模型,其实只是接口设计、类名和概念上的变化,内部原理其实还是一样的。

DEMO 一、使用原生API创建一个简单的生产者和消费者

Java代码  

  1. //DEMO中使用的 消息全假定是一条交易
  2. public class TradeTransaction {
  3. private String id;//交易ID
  4. private double price;//交易金额
  5. public TradeTransaction() {
  6. }
  7. public TradeTransaction(String id, double price) {
  8. super();
  9. this.id = id;
  10. this.price = price;
  11. }
  12. public String getId() {
  13. return id;
  14. }
  15. public void setId(String id) {
  16. this.id = id;
  17. }
  18. public double getPrice() {
  19. return price;
  20. }
  21. public void setPrice(double price) {
  22. this.price = price;
  23. }
  24. }
  25. public class TradeTransactionInDBHandler implements EventHandler<TradeTransaction>,WorkHandler<TradeTransaction> {
  26. @Override
  27. public void onEvent(TradeTransaction event, long sequence,
  28. boolean endOfBatch) throws Exception {
  29. this.onEvent(event);
  30. }
  31. @Override
  32. public void onEvent(TradeTransaction event) throws Exception {
  33. //这里做具体的消费逻辑
  34. event.setId(UUID.randomUUID().toString());//简单生成下ID
  35. System.out.println(event.getId());
  36. }
  37. }
  38. public class Demo1 {
  39. public static void main(String[] args) throws InterruptedException, ExecutionException {
  40. int BUFFER_SIZE=1024;
  41. int THREAD_NUMBERS=4;
  42. /*
  43. * createSingleProducer创建一个单生产者的RingBuffer,
  44. * 第一个参数叫EventFactory,从名字上理解就是“事件工厂”,其实它的职责就是产生数据填充RingBuffer的区块。
  45. * 第二个参数是RingBuffer的大小,它必须是2的指数倍 目的是为了将求模运算转为&运算提高效率
  46. * 第三个参数是RingBuffer的生产都在没有可用区块的时候(可能是消费者(或者说是事件处理器) 太慢了)的等待策略
  47. */
  48. final RingBuffer<TradeTransaction> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TradeTransaction>() {
  49. @Override
  50. public TradeTransaction newInstance() {
  51. return new TradeTransaction();
  52. }
  53. }, BUFFER_SIZE,new YieldingWaitStrategy());
  54. //创建线程池
  55. ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);
  56. //创建SequenceBarrier
  57. SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
  58. //创建消息处理器
  59. BatchEventProcessor<TradeTransaction> transProcessor = new BatchEventProcessor<TradeTransaction>(
  60. ringBuffer, sequenceBarrier, new TradeTransactionInDBHandler());
  61. //这一部的目的是让RingBuffer根据消费者的状态    如果只有一个消费者的情况可以省略
  62. ringBuffer.addGatingSequences(transProcessor.getSequence());
  63. //把消息处理器提交到线程池
  64. executors.submit(transProcessor);
  65. //如果存大多个消费者 那重复执行上面3行代码 把TradeTransactionInDBHandler换成其它消费者类
  66. Future<?> future=executors.submit(new Callable<Void>() {
  67. @Override
  68. public Void call() throws Exception {
  69. long seq;
  70. for(int i=0;i<1000;i++){
  71. seq=ringBuffer.next();//占个坑 --ringBuffer一个可用区块
  72. ringBuffer.get(seq).setPrice(Math.random()*9999);//给这个区块放入 数据  如果此处不理解,想想RingBuffer的结构图
  73. ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见
  74. }
  75. return null;
  76. }
  77. });
  78. future.get();//等待生产者结束
  79. Thread.sleep(1000);//等上1秒,等消费都处理完成
  80. transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!)
  81. executors.shutdown();//终止线程
  82. }
  83. }

DEMO二、使用WorkerPool辅助创建消费者

Java代码  

  1. public class Demo2 {
  2. public static void main(String[] args) throws InterruptedException {
  3. int BUFFER_SIZE=1024;
  4. int THREAD_NUMBERS=4;
  5. EventFactory<TradeTransaction> eventFactory=new EventFactory<TradeTransaction>() {
  6. public TradeTransaction newInstance() {
  7. return new TradeTransaction();
  8. }
  9. };
  10. RingBuffer<TradeTransaction> ringBuffer=RingBuffer.createSingleProducer(eventFactory, BUFFER_SIZE);
  11. SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
  12. ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBERS);
  13. WorkHandler<TradeTransaction> workHandlers=new TradeTransactionInDBHandler();
  14. /*
  15. * 这个类代码很简单的,亲自己看哈!~
  16. */
  17. WorkerPool<TradeTransaction> workerPool=new WorkerPool<TradeTransaction>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), workHandlers);
  18. workerPool.start(executor);
  19. //下面这个生产8个数据,图简单就写到主线程算了
  20. for(int i=0;i<8;i++){
  21. long seq=ringBuffer.next();
  22. ringBuffer.get(seq).setPrice(Math.random()*9999);
  23. ringBuffer.publish(seq);
  24. }
  25. Thread.sleep(1000);
  26. workerPool.halt();
  27. executor.shutdown();
  28. }
  29. }

DEMO三、demo3写个流弊点的像下图这样。这次用Disruptor来完成整个构建工作.

从中图可以看出需求是介样子的:生产者生产数据经过C1,C2处理完成后再到C3。

假设如下场景:

1、交易网关收到交易(P1)把交易数据发到RingBuffer中,

2、负责处理增值业务的消费者C1和负责数据存储的消费者C2负责处理交易

3、负责发送JMS消息的消费者C3在C1和C2处理完成后再进行处理。

让代码说话:

Java代码  

  1. public class TradeTransactionJMSNotifyHandler implements EventHandler<TradeTransaction> {
  2. @Override
  3. public void onEvent(TradeTransaction event, long sequence,
  4. boolean endOfBatch) throws Exception {
  5. //do send jms message
  6. }
  7. }
  8. public class TradeTransactionPublisher implements Runnable{
  9. Disruptor<TradeTransaction> disruptor;
  10. private CountDownLatch latch;
  11. private static int LOOP=10000000;//模拟一千万次交易的发生
  12. public TradeTransactionPublisher(CountDownLatch latch,Disruptor<TradeTransaction> disruptor) {
  13. this.disruptor=disruptor;
  14. this.latch=latch;
  15. }
  16. @Override
  17. public void run() {
  18. TradeTransactionEventTranslator tradeTransloator=new TradeTransactionEventTranslator();
  19. for(int i=0;i<LOOP;i++){
  20. disruptor.publishEvent(tradeTransloator);
  21. }
  22. latch.countDown();
  23. }
  24. }
  25. class TradeTransactionEventTranslator implements EventTranslator<TradeTransaction>{
  26. private Random random=new Random();
  27. @Override
  28. public void translateTo(TradeTransaction event, long sequence) {
  29. this.generateTradeTransaction(event);
  30. }
  31. private TradeTransaction generateTradeTransaction(TradeTransaction trade){
  32. trade.setPrice(random.nextDouble()*9999);
  33. return trade;
  34. }
  35. }
  36. public class TradeTransactionVasConsumer implements EventHandler<TradeTransaction> {
  37. @Override
  38. public void onEvent(TradeTransaction event, long sequence,
  39. boolean endOfBatch) throws Exception {
  40. //do something....
  41. }
  42. }
  43. public class Demo3 {
  44. public static void main(String[] args) throws InterruptedException {
  45. long beginTime=System.currentTimeMillis();
  46. int bufferSize=1024;
  47. ExecutorService executor=Executors.newFixedThreadPool(4);
  48. //这个构造函数参数,相信你在了解上面2个demo之后就看下就明白了,不解释了~
  49. Disruptor<TradeTransaction> disruptor=new Disruptor<TradeTransaction>(new EventFactory<TradeTransaction>() {
  50. @Override
  51. public TradeTransaction newInstance() {
  52. return new TradeTransaction();
  53. }
  54. }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());
  55. //使用disruptor创建消费者组C1,C2
  56. EventHandlerGroup<TradeTransaction> handlerGroup=disruptor.handleEventsWith(new TradeTransactionVasConsumer(),new TradeTransactionInDBHandler());
  57. TradeTransactionJMSNotifyHandler jmsConsumer=new TradeTransactionJMSNotifyHandler();
  58. //声明在C1,C2完事之后执行JMS消息发送操作 也就是流程走到C3
  59. handlerGroup.then(jmsConsumer);
  60. disruptor.start();//启动
  61. CountDownLatch latch=new CountDownLatch(1);
  62. //生产者准备
  63. executor.submit(new TradeTransactionPublisher(latch, disruptor));
  64. latch.await();//等待生产者完事.
  65. disruptor.shutdown();
  66. executor.shutdown();
  67. System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));
  68. }
  69. }
时间: 2024-10-11 12:33:09

并发框架Disruptor几个Demo的相关文章

并发框架Disruptor学习入门

刚刚听说disruptor,大概理一下,只为方便自己理解,文末是一些自己认为比较好的博文,如果有需要的同学可以参考. 本文目标:快速了解Disruptor是什么,主要概念,怎么用 1.Disruptor简介 Disruptor是什么?有什么特点/优点? --Disruptor是一个用于在线程间通信的高效低延时的消息组件,它像个增强的队列. --它是一个高性能.低延迟.使用简单的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式实现,或者事件-监听模式的实现,直

并发框架Disruptor译文

Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易.这个系统是建立在JVM平台上,其核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单.业务逻辑处理器完全是运行在内存中,使用事件源驱动方式.业务逻辑处理器的核心是Disruptor. Disruptor它是一个开源的并发框架,并获得2011 Duke’s程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作.本文是Disrupto

并发框架Disruptor浅析

1.引言 Disruptor是一个开源的Java框架,它被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟.Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS,除金融领域之外,其他一般的应用中都可以用到Disruptor,它可以带来显著的性能提升.其实Disruptor与其说是一个框架,不如说是一种设计思路,这个设计思路对于存在“并发.缓冲区.生

Disruptor并发框架

框架简介 Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易.这个系统是建立在JVM平台上,其核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单.业务逻辑处理器完全是运行在内存中,使`用事件源驱动方式.业务逻辑处理器的核心是Disruptor. Disruptor它是一个开源的并发框架,并获得2011 Duke's 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作. Dis

python之高性能网络编程并发框架eventlet实例

http://blog.csdn.net/mingzznet/article/details/38388299 前言: 虽然 eventlet 封装成了非常类似标准线程库的形式,但线程和eventlet在实际并发执行流程仍然有明显区别.在没有出现 I/O 阻塞时,除非显式声明,否则当前正在执行的 eventlet 永远不会把 cpu 交给其他的 eventlet,而标准线程则是无论是否出现阻塞,总是由所有线程一起争夺运行资源.所有 eventlet 对 I/O 阻塞无关的大运算量耗时操作基本没有

Java并发编程高阶技术 高性能并发框架源码解析与实战

第1章 课程介绍(Java并发编程进阶课程)什么是Disruptor?它一个高性能的异步处理框架,号称"单线程每秒可处理600W个订单"的神器,本课程目标:彻底精通一个如此优秀的开源框架,面试秒杀面试官.本章会带领小伙伴们先了解课程大纲与重点,然后模拟千万,亿级数据进行压力测试.让大家感性认知到Disruptor的强大.... 第2章 并发编程框架核心讲解本章带大家学习并发编程框架的基本使用与API,并介绍其内部各种组件的原理和运行机制.从而为后面的深入学习打下坚实的基础.如果对Dis

J.U.C并发框架

转载:http://itindex.net/detail/48869-j.u.c-%E6%A1%86%E6%9E%B6 J.U.C并发框架 作者:Doug Lea SUNY Oswego Oswego NY 13126 [email protected] 翻译:书卷多情 在J2SE1.5中,java.util.concurrent包下的大部分同步工具(锁.屏障等)以AbstractQueuedSynchronizer类为基础来构建.这个框架提供了一些常用机制用于自动管理并发状态.阻塞及非阻塞线程

Python 开源异步并发框架的未来(转)

Python 开源异步并发框架的未来 fantix 1.1k 2014年04月16日 发布 推荐 4 推荐 收藏 31 收藏,8.9k 浏览 呵呵,这个标题有点大,其实只是想从零开始介绍一下异步的基础,以及 Python 开源异步并发框架的发展和互操作性. 另外,这是我在 OSTC 2014 做的一个 20140330-OSTC-分论坛1王川 http://v.youku.com/v_show/id_XNjk2ODI0ODQ4.html ,幻灯片在这里,欢迎拍砖. 开源 Python 是开源的,

Java并发和多线程1:并发框架基本示例.txt

Executor框架是指java 5中引入的一系列并发库中与executor相关的一些功能类,其中包括ThreadPool,Executor,Executors,ExecutorService,CompletionService,Future,Callable等. 并发编程的一种编程方式是把任务拆分为一系列的小任务,即Runnable,然后在提交给一个Executor执行,Executor.execute(Runnalbe) .Executor在执行时使用内部的线程池完成操作.一.创建线程池Ex