Disruptor之粗糙认识

一 概述

1.Disruptor

Disruptor是一个高性能的异步处理框架,一个“生产者-消费者”模型。

2.RingBuffer

RingBuffer是一种环形数据结构,包含一个指向下一个槽点的序号,可以在线程间传递数据。

3.Event

在Disruptor框架中,生产者生产的数据叫做Event。

二 Disruptor框架基本构成

1.MyEvent:自定义对象,充当“生产者-消费者”模型中的数据。
2.MyEventFactory:实现EventFactory的接口,用于生产数据。
3.MyEventProducerWithTranslator:将数据存储到自定义对象中并发布。
4.MyEventHandler:自定义消费者。

三 Demo

初次接触Disruptor,认识停留在表面,零散,模糊,在此记一个简单的示例,以便日后深入研究。

1.自定义数据类

package com.disruptor.basic;

public class LongEvent {
    private long value;

    public long getValue() {
        return value;
    }

    public void setValue(long value) {
        this.value = value;
    }

}

2.数据生产工厂(创建数据类对象)

package com.disruptor.basic;

import com.lmax.disruptor.EventFactory;

public class LongEventFactory implements EventFactory<LongEvent> {

    public LongEvent newInstance() {
        // TODO Auto-generated method stub
        return new LongEvent();
    }

}

3.数据源(初始化数据对象并发布)

package com.disruptor.basic;

import java.nio.ByteBuffer;

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

public class LongEventProducerWithTranslator {

    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    private final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
        /**
         * event:包含有消费数据的对象; sequence:分配给目标对象的RingBuffer空间序号;
         * bb:包含有将要被存储到目标对象中的数据的容器
         */
        public void translateTo(LongEvent event, long sequence, ByteBuffer bb) {
            // TODO Auto-generated method stub
            event.setValue(bb.getLong(0));// 将数据存储到目标对象中
        }
    };

    public void onData(ByteBuffer bb) {
        ringBuffer.publishEvent(TRANSLATOR, bb);// 发布,将数据推送给消费者
    }

}

4.消费者

package com.disruptor.basic;

import com.lmax.disruptor.EventHandler;

public class LongEventHandler implements EventHandler<LongEvent> {

    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
        // TODO Auto-generated method stub
        System.out.println("当前消费的数据="+event.getValue());
    }

}

5.测试类

package com.disruptor.basic;

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.junit.Test;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class LongEventTest {

    @SuppressWarnings({ "unchecked", "deprecation" })
    @Test
    public void test01() throws InterruptedException {
        ExecutorService executor = Executors.newCachedThreadPool();
        EventFactory<LongEvent> factory = new LongEventFactory();
        int bufferSize = 1024;
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor, ProducerType.SINGLE,
                new YieldingWaitStrategy());
        disruptor.handleEventsWith(new LongEventHandler());
        disruptor.start();

        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        // LongEventProducer producer = new
        // LongEventProducer(ringBuffer);
        LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
        ByteBuffer bb = ByteBuffer.allocate(8);
        // long startTime = System.currentTimeMillis();
        for (long a = 0; a < 100; a++) {
            bb.putLong(0, a);
            producer.onData(bb);
            /*if (a == 99) {
                long endTime = System.currentTimeMillis();
                System.out.println("useTime=" + (endTime - startTime));
            }*/
            Thread.sleep(100);
        }
        /*long endTime = System.currentTimeMillis();
        System.out.println("useTime=" + (endTime - startTime));*/
        disruptor.shutdown();
        executor.shutdown();
    }

    /*@Test
    public void test02() {
        long startTime = System.currentTimeMillis();
        for (long a = 0; a < 100; a++) {
            System.out.println(a);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("useTime=" + (endTime - startTime));
    }*/

}
时间: 2024-11-18 16:27:14

Disruptor之粗糙认识的相关文章

Java并发编程框架Disruptor

Disruptor是什么? Disruptor是一个高性能的异步处理框架,一个轻量级的JMS,和JDK中的BlockingQueue有相似处,但是它的处理速度非常快,获得2011年程序框架创新大奖,号称"一个线程一秒钟可以处理600W个订单"(这有点吓人吧),并且Disruptor不仅仅只有buffer,它提供的功能非常强大,比如它可以帮助我们轻松构建数据流处理(比如一个数据先交给A和B这2个消费者并行处理后在交给C处理,是不是有点想起storm这种流处理,实际上strom的底层就是应

三步创建Disruptor应用

Disruptor是一个高性能的用于线程间消息处理的开源框架.它的目标就是快. 我们知道,java.util.concurrent.ArrayBlockingQueue 是一个非常优秀的有界队列实现.Disruptor与之相比,性能更加的优秀. 性能比较 完整的性能报告在这里. Disruptor内部使用了RingBuffer,它是Disruptor的核心的数据结构.和其它的RingBuffer实现不同,Disruptor没有尾指针.这样实现是经过深思熟虑的,你可以看这篇文档了解其细节. 更多的

Disruptor——一种可替代有界队列完成并发线程间数据交换的高性能解决方案

本文翻译自LMAX关于Disruptor的论文,同时加上一些自己的理解和标注.Disruptor是一个高效的线程间交换数据的基础组件,它使用栅栏(barrier)+序号(Sequencing)机制协调生产者与消费者,从而避免使用锁和CAS,同时还组合使用预分配内存机制.缓存行机制(cache line).批处理效应(batch effect)来达到高吞吐量和低时延的目标.目前Disruptor版本已经迭代至3.0,本论文是基于Disruptor1.0写就,在新版本中,相对与1.0版本,其核心设计

构建高性能服务(三)Java高性能缓冲设计 vs Disruptor vs LinkedBlockingQueue--转载

原文地址:http://maoyidao.iteye.com/blog/1663193 一个仅仅部署在4台服务器上的服务,每秒向Database写入数据超过100万行数据,每分钟产生超过1G的数据.而每台服务器(8核12G)上CPU占用不到100%,load不超过5.这是怎么做到呢?下面将给你描述这个架构,它的核心是一个高效缓冲区设计,我们对它的要求是: 1,该缓存区要尽量简单 2,尽量避免生产者线程和消费者线程锁 3,尽量避免大量GC 缓冲 vs 性能瓶颈 提高硬盘写入IO的银弹无疑是批量顺序

disruptor使用示例

LMAX 开源了一个高性能并发编程框架.可以理解为消费者-生产者的消息发布订阅模式.本文下载了官方示例代码,进行实验. longEvent事件数据 public class LongEvent { private long value; public void set(long value) { this.value = value; } public long get(){ return this.value; } } LongEventFactory事件工厂 import com.lmax.

并发框架Disruptor浅析

1.引言 Disruptor是一个开源的Java框架,它被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟.Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS,除金融领域之外,其他一般的应用中都可以用到Disruptor,它可以带来显著的性能提升.其实Disruptor与其说是一个框架,不如说是一种设计思路,这个设计思路对于存在“并发.缓冲区.生

Disruptor初级入门

Disruptor 极速体验 高性能队列 Disruptor

Disruptor框架EventProcessor和Workpool的使用

场景使用: 在HelloWorld的实例中,我们创建Disruptor实例,然后调用getRingBuffer方法去获取RingBuffer,其实在很多时候,我们可以直接使用RingBuffer,以及其他的API操作,看一下示例: 使用EventProcessor消息处理器: 使用WorkerPool消息处理器: 先看一下EventProcessor消息处理器: 这是一个event对象: 这是一个消费者, 最后是一个main方法,然后EventFactory也在这个方法里面实现,这里就是没有创建

Disruptor底层实现讲解与RingBuffer数据结构讲解

Disruptor术语 RingBuffer:被看作Disruptor最主要的组件,然而从2.0开始RingBuffer仅仅负责存储和更新在Disruptor中流通的数据.对一些特殊的使用场景能够被用户(使用其他数据结构)完全替代. Sequence:Disruptor使用Sequence来表示一个特殊组件处理的序号.和Disruptor一样,每个消费者(EventProcessor)都维持着一个Sequence.大部分的并发代码依赖这些Sequence值的运转,因此Sequence支持多种当前