实现EventFactory,在newInstance方法中返回,ringBuffer缓冲区中的对象实例;代码如下:
public class DTaskFactory implements EventFactory<DTask> { @Override public DTask newInstance() {//disruptor使用环形缓冲区,这是环形缓冲区所承载的对象 return new DTask(); } }
生产消费的对象类型:
public class DTask { public String getName1() { return name1; } public void setName1(String name1) { this.name1 = name1; } public String getName2() { return name2; } public void setName2(String name2) { this.name2 = name2; } public String getName3() { return name3; } public void setName3(String name3) { this.name3 = name3; } String name1; String name2; String name3; }
disruptor的消费处理事件onEvent为消费调用的方法(下面的代码中包含并行和串行执行的消费事件):
public class DTaskHandle implements EventHandler<DTask> { @Override public void onEvent(DTask dTask, long l, boolean b) throws Exception { System.out.println("开始最后消费"); System.out.println(dTask.getName1()); System.out.println(dTask.getName2()); System.out.println(dTask.getName3()); System.out.println("结束最后消费"); } } public class DTaskHandle1 implements EventHandler<DTask> { @Override public void onEvent(DTask dTask, long l, boolean b) throws Exception { System.out.println("-----DTaskHandle1-----"); dTask.setName1("name1"); } } public class DTaskHandle2 implements EventHandler<DTask> { @Override public void onEvent(DTask dTask, long l, boolean b) throws Exception { System.out.println("-----DTaskHandle2-----"); dTask.setName2("name2"); } } public class DTaskHandle3 implements EventHandler<DTask> { @Override public void onEvent(DTask dTask, long l, boolean b) throws Exception { System.out.println("-----DTaskHandle3-----"); dTask.setName3("name3"); } }
测试执行类:
public class DisruptorTest { public void exec() throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); Disruptor<DTask> disruptor = new Disruptor(new DTaskFactory(), 1024 * 1024, executor, ProducerType.SINGLE, new BusySpinWaitStrategy()); DTaskHandle dTaskHandle = new DTaskHandle(); DTaskHandle1 dTaskHandle1 = new DTaskHandle1(); DTaskHandle2 dTaskHandle2 = new DTaskHandle2(); DTaskHandle3 dTaskHandle3 = new DTaskHandle3(); disruptor.handleEventsWith(dTaskHandle1, dTaskHandle2, dTaskHandle3);//消费生产出的对象,并行执行 disruptor.after(dTaskHandle1, dTaskHandle2, dTaskHandle3).handleEventsWith(dTaskHandle);//并行执行1 2 3后,串行执行dTaskHandle // disruptor. disruptor.start(); CountDownLatch latch = new CountDownLatch(1); //生产者准备 executor.submit(new TradePublisher(latch, disruptor)); latch.await();//等待生产者完事. disruptor.shutdown(); executor.shutdown(); } }
时间: 2025-01-15 06:28:46