disruptor的并行用法

实现EventFactory,在newInstance方法中返回,ringBuffer缓冲区中的对象实例;代码如下:

public class DTaskFactory implements EventFactory<DTask> {
    @Override
    public DTask newInstance() {//disruptor使用环形缓冲区,这是环形缓冲区所承载的对象
        return new DTask();
    }
}

生产消费的对象类型:

public class DTask {
    public String getName1() {
        return name1;
    }

    public void setName1(String name1) {
        this.name1 = name1;
    }

    public String getName2() {
        return name2;
    }

    public void setName2(String name2) {
        this.name2 = name2;
    }

    public String getName3() {
        return name3;
    }

    public void setName3(String name3) {
        this.name3 = name3;
    }

    String name1;
    String name2;
    String name3;

}

disruptor的消费处理事件onEvent为消费调用的方法(下面的代码中包含并行和串行执行的消费事件):

public class DTaskHandle implements EventHandler<DTask> {
    @Override
    public void onEvent(DTask dTask, long l, boolean b) throws Exception {
        System.out.println("开始最后消费");
        System.out.println(dTask.getName1());

        System.out.println(dTask.getName2());
        System.out.println(dTask.getName3());
        System.out.println("结束最后消费");
    }
}

public class DTaskHandle1 implements EventHandler<DTask> {
    @Override
    public void onEvent(DTask dTask, long l, boolean b) throws Exception {
        System.out.println("-----DTaskHandle1-----");
        dTask.setName1("name1");
    }
}

public class DTaskHandle2 implements EventHandler<DTask> {
    @Override
    public void onEvent(DTask dTask, long l, boolean b) throws Exception {
        System.out.println("-----DTaskHandle2-----");
        dTask.setName2("name2");
    }
}

public class DTaskHandle3 implements EventHandler<DTask> {
    @Override
    public void onEvent(DTask dTask, long l, boolean b) throws Exception {
        System.out.println("-----DTaskHandle3-----");
        dTask.setName3("name3");
    }
}

测试执行类:

public class DisruptorTest {

    public void exec() throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool();
        Disruptor<DTask> disruptor = new Disruptor(new DTaskFactory(),
                1024 * 1024,
                executor,
                ProducerType.SINGLE, new BusySpinWaitStrategy());

        DTaskHandle dTaskHandle = new DTaskHandle();
        DTaskHandle1 dTaskHandle1 = new DTaskHandle1();
        DTaskHandle2 dTaskHandle2 = new DTaskHandle2();
        DTaskHandle3 dTaskHandle3 = new DTaskHandle3();
        disruptor.handleEventsWith(dTaskHandle1, dTaskHandle2, dTaskHandle3);//消费生产出的对象,并行执行

        disruptor.after(dTaskHandle1, dTaskHandle2, dTaskHandle3).handleEventsWith(dTaskHandle);//并行执行1 2 3后,串行执行dTaskHandle

//        disruptor.

        disruptor.start();
        CountDownLatch latch = new CountDownLatch(1);
        //生产者准备
        executor.submit(new TradePublisher(latch, disruptor));

        latch.await();//等待生产者完事.

        disruptor.shutdown();
        executor.shutdown();
    }

}
时间: 2025-01-15 06:28:46

disruptor的并行用法的相关文章

并发框架Disruptor浅析

1.引言 Disruptor是一个开源的Java框架,它被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟.Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS,除金融领域之外,其他一般的应用中都可以用到Disruptor,它可以带来显著的性能提升.其实Disruptor与其说是一个框架,不如说是一种设计思路,这个设计思路对于存在“并发.缓冲区.生

oracle parallel 并行 设置 理解

引子:以前一直没太关注oracle并行这个特性.前几天一个兄弟碰到的一个问题,才让我觉得这个东西还是有很多需要注意的地方,有必要仔细熟悉下.其实碰到的问题不复杂: 类似如下的一条语句:insert into xxxx select /+parallel(a) / * from xxx a;数据量大约在75G左右,这位兄弟从上午跑到下午还没跑完,过来问我咋回事,说平常2hrs能跑完的东西跑了好几个小时还撒动静.查看系统性能也比较 正常,cpu,io都不繁忙,平均READ速度在80M/s左右(勉强凑

(转)hints语法

HINT的基本概念 在向大家详细介绍Oracle Hints之前,首先让大家了解下Oracle Hints是什么,然后全面介绍Oracle Hints,希望对大家有用.基于代价的优化器是很聪明的,在绝大多数情况下它会选择正确的优化器,减轻了DBA的负担.但有时它也聪明反被聪明误,选择了很差的执行计划,使某个语句的执行变得奇慢无比. 此时就需要DBA进行人为的干预,告诉优化器使用我们指定的存取路径或连接类型生成执行计划,从而使语句高效的运行.例如,如果我们认为对于一个特定的语句,执行全表扫描要比执

Oracle Hints详解

在向大家详细介绍Oracle Hints之前,首先让大家了解下Oracle Hints是什么,然后全面介绍Oracle Hints,希望对大家有用.基于代价的优化器是很聪明的,在绝大多数情况下它会选择正确的优化器,减轻了DBA的负担.但有时它也聪明反被聪明误,选择了很差的执行计划,使某个语句的执行变得奇慢无比. 此时就需要DBA进行人为的干预,告诉优化器使用我们指定的存取路径或连接类型生成执行计划,从而使语句高效的运行.例如,如果我们认为对于一个特定的语句,执行全表扫描要比执行索引扫描更有效,则

多线程爬坑之路--并发,并行,synchonrized同步的用法

一.多线程的并发与并行: 并发:多个线程同时都处在运行中的状态.线程之间相互干扰,存在竞争,(CPU,缓冲区),每个线程轮流使用CPU,当一个线程占有CPU时,其他线程处于挂起状态,各线程断续推进. 并行:多个线程同时执行,但是每个线程各自有自己的CPU,不存在CPU资源的竞争,他们之间也可能存在资源的竞争. 并发发生在同一段时间间隔内,并行发生在同一时刻内.并发执行的总时间是每个任务的时间和,而并行则取决于最长任务的时间. 下面看一下A,B两个任务在并行和并发情况下是怎么执行的:[不考虑其他资

【python】详解map函数的用法之函数并行作用解析

Python函数编程中的map(func, seq1[, seq2,-]) 函数是将func作用于seq中的每一个元素,其中seq须是可迭代对象,并将所有的调用的结果作为一个list返回.如果func为None,作用同zip(). 本文参考自:Python中map()函数浅析一文,感谢精彩分享. 下面举得例子来帮助我们更好的理解这个工作过程: 1.对可迭代函数'iterable'中的每一个元素应用'function'方法,将结果作为list返回. 2.如果给出了额外的可迭代参数,则对每个可迭代参

R语言串行与并行Apply用法

串行 APPLY<- function(m){ mTemp <- apply(m, 2, mysort) return(mTemp)} snowfall包的并行 SNOWFALL<-function(m,ncl){ library(snowfall) sfInit(parallel = TRUE, cpus = ncl) mTemp<- sfApply(m,2,mysort) sfStop() return(mTemp)}

并行编程简单用法及异常处理

当有大量的数据需要处理或者有大量任务需要完成,而且每块数据或每个任务是彼此独立的,这时候可以考虑使用并行编程.现代计算机都是多核的,并行编程可以提高CPU利用率以提高吞吐量. Parallel.Invoke形参能接收一个Action数组. static void Main(string[] args) { Test(); Console.ReadKey(); } static void M1() { Console.WriteLine("方法1"); } static void M2(

C#并行库(TaskParallelLibrary)用法 z

1. Task.Factory.StartNew(() => DoSomeWork());是异步的 下面的代码会先输出ddd,因为Task.Factory.Startnew不阻塞: var task = Task.Factory.StartNew(() => Console.WriteLine("eee")); Console.WriteLine("ddd"); 如果你想阻塞,应该加上wait,改为这样: var task = Task.Factory.