disruptor使用示例

LMAX 开源了一个高性能并发编程框架。可以理解为消费者-生产者的消息发布订阅模式。本文下载了官方示例代码,进行实验。

longEvent事件数据

public class LongEvent {
    private long value;

    public void set(long value) {
        this.value = value;
    }

    public long get(){
        return this.value;
    }
}

LongEventFactory事件工厂

import com.lmax.disruptor.EventFactory;
/**
 * 事件生产工厂
 * @author wanghao
 *
 */
public class LongEventFactory implements EventFactory<LongEvent>
{

	@Override
	public LongEvent newInstance() {
		return new LongEvent();
	}

}

LongEventProducer事件生产者

import java.nio.ByteBuffer;

import com.lmax.disruptor.RingBuffer;

/**
 * 生产者,生产longEvent事件
 * @author harry
 *
 */
public class LongEventProducer {
	private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }

    public void product(ByteBuffer bb)
    {
        long sequence = ringBuffer.next();  // Grab the next sequence
        try
        {
            LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
                                                        // for the sequence
            event.set(bb.getLong(0));  // Fill with data
        }
        finally
        {
            ringBuffer.publish(sequence);
        }
    }
}

RingBuffer是消息存储结构,为环形存储结构,每个单元存储一条消息。类似于队列。当ringbuffer中数据填满后,环就会阻塞,等待消费者消费掉数据。当所有消费者消费掉环中一个数据,新的消息才可以加入环中。每个环插入数据后,都会分配下一个位置的编号,即sequence 。

消息者事件处理器

为消费者消费处理器,这处需要执行速度足够快。否则,会影响ringbuffer后续没空间加入新的数据。因此,不能做业务耗时操作。建议另外开始java 线程池处理消息。

import com.lmax.disruptor.EventHandler;
/**
 * 消息者事件处理器,打印输出到控制台
 * @author harry
 *
 */
public class LongEventHandler  implements EventHandler<LongEvent>{
	  public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
	    {
	        System.out.println("consumer:"+Thread.currentThread().getName()+" Event: value=" + event.get()+",sequence="+sequence+",endOfBatch="+endOfBatch);
	    }
}

LongEventProducerWithTranslator

import java.nio.ByteBuffer;

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

/**
 * post-3.0 the preferred approach for publishing messages is
 * via the Event Publisher/Event Translator portion of the API. E.g.
 * @author harry
 *
 */
public class LongEventProducerWithTranslator {
	private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }

    private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
        new EventTranslatorOneArg<LongEvent, ByteBuffer>()
        {
            public void translateTo(LongEvent event, long sequence, ByteBuffer bb)
            {
                event.set(bb.getLong(0));
            }
        };

    public void product(ByteBuffer bb)
    {
        ringBuffer.publishEvent(TRANSLATOR, bb);
    }
}

translateTo方法将ringbuffer中的消息,转换成java对象格式。示例 为LongEvent对象,后续消费者LongEventHandler  处理器,直接操作LongEvent对象,获取消息各属性信息,本示例 为value属性。

product方法,将生产者生产的消息放入ringbuffer中。

LongEventMain

消费者-生产者启动类,其依靠构造Disruptor对象,调用start()方法完成启动线程。Disruptor 需要ringbuffer环,消费者数据处理工厂,WaitStrategy等

ByteBuffer 类字节buffer,用于包装消息。

ProducerType.SINGLE为单线程 ,可以提高性能。

import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class LongEventMain {
	@SuppressWarnings("unchecked")
	public static void main(String[] args) throws Exception
	    {
	        // 执行器,用于构造消费者线程
	        Executor executor = Executors.newCachedThreadPool();

	        // 指定事件工厂
	        LongEventFactory factory = new LongEventFactory();

	        // 指定 ring buffer字节大小, must be power of 2.
	        int bufferSize = 1024;

	        //单线程模式,获取额外的性能
	        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory,
                    bufferSize,executor,
                    ProducerType.SINGLE,
                    new BlockingWaitStrategy());
	        //设置事件业务处理器---消费者
	        disruptor.handleEventsWith(new LongEventHandler());
	        //启动disruptor线程
	        disruptor.start();

	        // 获取 ring buffer环,用于接取生产者生产的事件
	        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
	        //为 ring buffer指定事件生产者
	        //LongEventProducer producer = new LongEventProducer(ringBuffer);
	        LongEventProducerWithTranslator producer=new LongEventProducerWithTranslator(ringBuffer);
	        ByteBuffer bb = ByteBuffer.allocate(8);//预置8字节长整型字节缓存
	        for (long l = 0; true; l++)
	        {
	            bb.putLong(0, l);
	            producer.product(bb);//生产者生产数据
	            Thread.sleep(1000);
	        }

	    }
}

实验结果:

consumer:pool-1-thread-1 Event: value=0,sequence=0,endOfBatch=true

consumer:pool-1-thread-1 Event: value=1,sequence=1,endOfBatch=true

consumer:pool-1-thread-1 Event: value=2,sequence=2,endOfBatch=true

consumer:pool-1-thread-1 Event: value=3,sequence=3,endOfBatch=true

consumer:pool-1-thread-1 Event: value=4,sequence=4,endOfBatch=true

consumer:pool-1-thread-1 Event: value=5,sequence=5,endOfBatch=true

consumer:pool-1-thread-1 Event: value=6,sequence=6,endOfBatch=true

Event: value = 为消费者接收到的数据,sequence为数据在ringbuffer环的位置。

时间: 2024-12-25 10:07:24

disruptor使用示例的相关文章

Disruptor入门

一.什么是 Disruptor Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式实现,或者事件-监听模式的实现,直接称disruptor模式.disruptor最大特点是高性能,其LMAX架构可以获得每秒6百万订单,用1微秒的延迟获得吞吐量为100K+. 可以理解为消费者-生产者的消息发布订阅模式. 二.Disruptor 的核心概念先从了解 Disruptor 的核心概念开始,来了解它是如何运作的.下面介绍的概念模型,既是领

Disruptor使用入门

在最近的项目中看到同事使用到了Disruptor,以前在ifeve上看到过关于Disruptor的文章,但是没有深入研究,现在项目中用到了,就借这个机会对这个并发编程框架进行深入学习.项目中使用到的是disruptor-2.10.4,所以下面分析到的Disruptor的代码是这个版本的. 并发编程网介绍Disruptor的文章是disruptor1.0版本,所以有一些术语在2.0版本上已经没有了或者被替代了. Disruptor术语 github上Disruptor的wiki对Disruptor

Java 并发基础

Java 并发基础 线程简述 线程是进程的执行部分,用来完成一定的任务; 线程拥有自己的堆栈,程序计数器和自己的局部变量,但不拥有系统资源, 他与其他线程共享父进程的共享资源及部分运行时环境,因此编程时需要小心,确保线程不会妨碍同一进程中的其他线程; 多线程优势 进程之间不能共享内存,但线程之间共享内存/文件描述符/进程状态非常容易. 系统创建进程时需要为该其分配很多系统资源(如进程控制块),但创建线程的开销要小得多,因此线程实现多任务并发比进程效率高. Java语言内置多线程支持,而不是单纯采

Disruptor——一种可替代有界队列完成并发线程间数据交换的高性能解决方案

本文翻译自LMAX关于Disruptor的论文,同时加上一些自己的理解和标注.Disruptor是一个高效的线程间交换数据的基础组件,它使用栅栏(barrier)+序号(Sequencing)机制协调生产者与消费者,从而避免使用锁和CAS,同时还组合使用预分配内存机制.缓存行机制(cache line).批处理效应(batch effect)来达到高吞吐量和低时延的目标.目前Disruptor版本已经迭代至3.0,本论文是基于Disruptor1.0写就,在新版本中,相对与1.0版本,其核心设计

Disruptor框架EventProcessor和Workpool的使用

场景使用: 在HelloWorld的实例中,我们创建Disruptor实例,然后调用getRingBuffer方法去获取RingBuffer,其实在很多时候,我们可以直接使用RingBuffer,以及其他的API操作,看一下示例: 使用EventProcessor消息处理器: 使用WorkerPool消息处理器: 先看一下EventProcessor消息处理器: 这是一个event对象: 这是一个消费者, 最后是一个main方法,然后EventFactory也在这个方法里面实现,这里就是没有创建

Disruptor之粗糙认识

一 概述 1.Disruptor Disruptor是一个高性能的异步处理框架,一个"生产者-消费者"模型. 2.RingBuffer RingBuffer是一种环形数据结构,包含一个指向下一个槽点的序号,可以在线程间传递数据. 3.Event 在Disruptor框架中,生产者生产的数据叫做Event. 二 Disruptor框架基本构成 1.MyEvent:自定义对象,充当"生产者-消费者"模型中的数据. 2.MyEventFactory:实现EventFact

Disruptor源码分析

本文将介绍Disruptor的工作机制,并分析Disruptor的主要源码 基于的版本是3.3.7(发布于2017.09.28) 水平有限,如有谬误请留言指正 0. 什么是Disruptor? Disruptor是一个开源的并发框架,提供了类似于Java中有界队列的功能,主要用于生产消费者场景. 与Java原生并发队列不同的是,Disruptor高度优化,在单机上可以轻松跑到千万级别的tps 1. Disruptor的关键想法 a. 使用环形队列作为底层存储(存储空间连续,可以充分利用cache

log4j2用asyncRoot配置异步日志是如何使用disruptor

用asyncRoot配置对应的对接disruptor类是AsyncLoggerConfigDisruptor,用Log4jContextSelector启动参数配置全局异步的对应的对接disruptor类是AsyncLoggerDisruptor.下面分析的是AsyncLoggerConfigDisruptor disruptor的创建与启动需要的部件实现 AsyncLoggerConfigDisruptor.start方法用来创建并启动disruptor实例 创建disruptor需要Even

pfsense Web服务器负载平衡配置示例

在pfsense的网关和服务器中有两种类型的负载平衡功能.网关负载平衡可以通过多个WAN连接分发Internet绑定的流量.服务器负载平衡管理传入流量,因此它利用多个内部服务器进行负载分配和冗余,服务器负载平衡允许流量在多个内部服务器之间分配,它最常用于Web服务器和SMTP服务器.下面我们就以实例来介绍服务器负载平衡的设置. 下面介绍如何通过pfsense2.32配置Web服务器的负载平衡. 网络环境 服务器负载平衡示例网络环境 上图为示例网络环境.它由单个防火墙组成,使用其WAN IP地址池