public class DisruptorServer { private Disruptor disruptor = null; public static void main(String[] args) { DisruptorContext.start(); System.out.println("Disruptor服务已启动..."); for(long i=0; i<101; i++){ DisruptorContext.publish(i); } DisruptorContext.stop(); System.out.println("...Disruptor服务已停止"); } } public class DisruptorContext { private static Disruptor<LongEvent> disruptor = null; private static ExecutorService executor = null; public static void start(){ if(null==disruptor){ EventFactory<LongEvent> eventFactory = new LongEventFactory(); executor = Executors.newSingleThreadExecutor(); WaitStrategy waitStrategy = new BlockingWaitStrategy(); int ringBufferSize = 1024*1024; disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor, ProducerType.SINGLE, waitStrategy); EventHandler<LongEvent> eventHandler = new LongEventHandler(); disruptor.handleEventsWith(eventHandler); disruptor.start(); } } public static void stop(){ disruptor.shutdown(); executor.shutdown(); } public static void publish(long eventData){ RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); long sequence = ringBuffer.next(); try{ LongEvent event = ringBuffer.get(sequence); event.set(eventData); }finally{ ringBuffer.publish(sequence); } } } public class LongEvent { private long value; public void set(long value) { this.value = value; } public long get(){ return this.value; } } public class LongEventFactory implements EventFactory<LongEvent> { @Override public LongEvent newInstance() { return new LongEvent(); } } public class LongEventHandler implements EventHandler<LongEvent>{ @Override public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception { System.out.println("Disruptor消费者输出Event :" + event.get()); } }
Event 需要进入disruptor交换的对象都需要封装成event,本例中封装的是一个long
EventFactory工厂,决定以何种方式创建event。
* 工厂模式:我不知道你需要的是什么样的对象,索性把你的构造方法(工厂)传过来吧。
EventHandler事件处理的具体实现,也即producer——consumer中的consumer的具体实现
* 本例中仅仅对event中的long进行输出
时间: 2024-10-13 03:15:08