20 优化
esper为了处理高速的生成力已经高度优化,并且接收事件和输出结果低延迟。
esper还可以进一步最大化可测使用在 软实时和硬实时JVM 上。
本章描述了最好的优化练习,而且解释了怎么去评价esper优化 通过使用我们提供的优化工具
20.1 优化结果
为了进一步理解以下测试结果,请查询下一章
测试结果说明:
eseper在一个双核2GH Ineter硬件,处理速度超过 500000 event/s,并且引擎延迟平均低于3微秒(超过99%的测试低于10us(微秒))
在一个平均加权基准点在1000个的查询注册在系统中
这达到70Mbit/s 85%的Cpu使用
esper还示范了线性的扩展性从100000到5000000 event/s 在这个硬件上,并且各个不同的生命查询始终如一的产生结果
其他测试示范具有相同的执行结果(始终连续的处理,match all,match none,没有声明注册,时间窗口或者长度窗口计算VWAP)
在笔记本电脑上的测试大概有5倍的性能降低-70000到200000 event/s ,在这个情况下还可以进行简单的测试和小配置
20.2 优化技巧
20.2.1 理解怎么去优化你的java虚拟机
esper运行在jvm上,你需要熟悉JVM调优。可以考虑的关键参数包括:最大堆内存、最小堆内存、年轻代堆大小。
在时间窗口和长度窗口上的声明可能会消耗大量的内存,因为他们的大小或者长度可能会很大。
对于时间数据窗口,你需要意识到内存消耗取决于实际的进入事件流。
事件模式event pattern实例也消耗内存,尤其在事件模式使用‘every‘关键字 去重复使用子表达式模式 也取决于实际的事件流。
20.2.2 输入输出瓶颈
你的应用从esper语句接收输出事件通过 UpdateListener接口 或者通过强类型描述订阅POJO对象.
这样的事件输出被发送通过应用或者 定时线程发送一个输入事件 到引擎实例。
输出事件处理:你的监听者或者订阅者的执行会暂时的阻塞线程直到处理完成,可能因此降低吞吐量。
它可以受益 对于你的应用去异步的执行输出事件 并且不阻塞Esper引擎,同时输出事件被你的监听者处理,特别是当你的监听者代码执行阻塞IO
举例来说:你的应用程序可能想发送输出事件到JMS目的地或者写入输出事件数据到关系型数据库。
为了最佳的吞吐量,考虑在一个单独的线程中执行这一的阻塞操作。
此外,在性能测试中,当从一个存储器或者网络读取输入事件,你可能发现Esper处理事件比输入事件到Esper更快。
在这种情况下,你可能要考虑使用一个内存驱动程序 进行性能测试。
也可以考虑 从事件处理方法解耦读取操作(sendEvent method)通过多个readers或者 从存储器预处理数据
21.2.3. Theading 穿线
我们推荐使用多线程给Esper发送事件。我们提供了一个测试类。
为了避免冲突, 这个测试类没有使用阻塞队列和线程池。
多线程测试代码如下:
public class SampleClassThreading { public static void main(String[] args) throws InterruptedException { int numEvents = 1000000; int numThreads = 3; Configuration config = new Configuration(); config.getEngineDefaults().getThreading() .setListenerDispatchPreserveOrder(false); config.getEngineDefaults().getThreading() .setInternalTimerEnabled(false); // remove thread that handles time advancing EPServiceProvider engine = EPServiceProviderManager .getDefaultProvider(config); engine.getEPAdministrator().getConfiguration().addEventType(MyEvent.class); engine.getEPAdministrator().createEPL( "create context MyContext coalesce by consistent_hash_crc32(id) " + "from MyEvent granularity 64 preallocate"); String epl = "context MyContext select count(*) from MyEvent group by id"; EPStatement stmt = engine.getEPAdministrator().createEPL(epl); stmt.setSubscriber(new MySubscriber()); Thread[] threads = new Thread[numThreads]; CountDownLatch latch = new CountDownLatch(numThreads); int eventsPerThreads = numEvents / numThreads; for (int i = 0; i < numThreads; i++) { threads[i] = new Thread( new MyRunnable(latch, eventsPerThreads, engine.getEPRuntime())); } long startTime = System.currentTimeMillis(); for (int i = 0; i < numThreads; i++) { threads[i].start(); } latch.await(10, TimeUnit.MINUTES); if (latch.getCount() > 0) { throw new RuntimeException("Failed to complete in 10 minute"); } long delta = System.currentTimeMillis() - startTime; System.out.println("Took " + delta + " millis"); } public static class MySubscriber { public void update(Object[] args) { } } public static class MyRunnable implements Runnable { private final CountDownLatch latch; private final int numEvents; private final EPRuntime runtime; public MyRunnable(CountDownLatch latch, int numEvents, EPRuntime runtime) { this.latch = latch; this.numEvents = numEvents; this.runtime = runtime; } public void run() { Random r = new Random(); for (int i = 0; i < numEvents; i++) { runtime.sendEvent(new MyEvent(r.nextInt(512))); } latch.countDown(); } } public static class MyEvent { private final int id; public MyEvent(int id) { this.id = id; } public int getId() { return id; } } }