5分钟开启Esper之旅

原作者:CoffeeOneSugar

翻译:刘斌华

在我之前发表的文章中,我提到我最近热衷于Complex Event Processing (CEP) (复杂事件处理)。简单来说,CEP把数据流作为输入,根据一系列预定义的规则,把数据(或部分数据)重定向给监听者们;又或者是当发现数据中的隐含的模式(Pattern)时,触发事件。在大量数据被产生出来并需要进行实时地分析的场景下,CEP特别有用。

有一个很不错的软件项目,可以让你做到这一点,叫做ESPER。你可以在这里找到该项目的网站。Esper向程序员提供一个称为EPL的语言,有些类似于SQL语言,它可以很好地用于对规则和模式的配置,这些规则和模式将被应用到数据流上。

Esper附带的文档是相当完整的,但缺乏实际的例子,这使得它看起来难以被使用。所以这里给出一个Esper的5分钟指导。虽然下面的例子是用Java编写,但是其实Esper是同时支持Java和C#的。我这里假设你已经下载了Esper,如果没有,可以通过点击这个链接来完成。解压刚才下载的文件后,你应该在你磁盘的某个地方可以找到一个叫esper-3.1.0的文件夹。(译者:这篇文章比较早了,现在esper最新版本是5.2.0)

在开始前,你需要添加几个库文件到你的工程中,当然,esper-3.1.0.jar是其中之一,你也需要另外其他4个第三方的库文件,他们可以在esper-3.1.0.jar/esper/lib文件夹中找到。

现在开始我们的5分钟吧。在你把需要分析的数据丢到CEP引擎的管道中之前,需要把这些数据结构化到对象当中去。让我们用一个简单的例子,写一个类(或者叫它bean)来描述给定时间的某个股票的价格:

import java.util.Date;
    public static class Tick {
        String symbol;
        Double price;
        Date timeStamp;

        public Tick(String s, double p, long t) {
            symbol = s;
            price = p;
            timeStamp = new Date(t);
        }
        public double getPrice() {return price;}
        public String getSymbol() {return symbol;}
        public Date getTimeStamp() {return timeStamp;}

        @Override
        public String toString() {
            return "Price: " + price.toString() + " time: " + timeStamp.toString();
        }
    }

它有3个属性:股票代码,价格和时间戳。在我们开始生成数以亿计的数据前,我们需要通知引擎它需要处理哪些对象,这是通过在实例化CEP引擎时,使用一个Configuration对象来实现的:

import com.espertech.esper.client.*;

public class main {

   public static void main(String [] args){

    //The Configuration is meant only as an initialization-time object.
    Configuration cepConfig = new Configuration();
    // We register Ticks as objects the engine will have to handle
    cepConfig.addEventType("StockTick",Tick.class.getName());

   // We setup the engine
    EPServiceProvider cep = EPServiceProviderManager.getProvider("myCEPEngine",cepConfig);
   }
}

作为测试目的,我们现在创建一个函数来生成随机的数据,并且把它们丢到CEP引擎当中,我们把这个函数叫做“GenerateRandomTick”,它把EPRuntime对象作为参数,这个对象用于把事件传递给CEP引擎:

import java.util.Random;
import com.espertech.esper.client.*;

public class exampleMain {
   private static Random generator=new Random();

   public static void GenerateRandomTick(EPRuntime cepRT){
    double price = (double) generator.nextInt(10);
    long timeStamp = System.currentTimeMillis();
    String symbol = "AAPL";
    Tick tick= new Tick(symbol,price,timeStamp);
    System.out.println("Sending tick:" + tick);
    cepRT.sendEvent(tick);
}

    public static void main(String[] args) {
     //The Configuration is meant only as an initialization-time object.
      Configuration cepConfig = new Configuration();
     cepConfig.addEventType("StockTick",Tick.class.getName());

      EPServiceProvider cep=EPServiceProviderManager.getProvider("myCEPEngine",cepConfig);

      EPRuntime cepRT = cep.getEPRuntime();
    }
}

现在,我们有了一个可以工作的CEP引擎,和不断输入的虚假的数据,是时候来创建我们的第一条规则了,用Esper的说法,我们的第一条EPL语句。要这么做,我们需要请引擎的管理员记录我们的语句。然后,CEP引擎会根据EPL语句的定义,过滤它收到的数据,当数据满足语句中的选择条件或者模式时,触发事件。

public static void main(String[] args) {
    //The Configuration is meant only as an initialization-time object.
    Configuration cepConfig = new Configuration();
    cepConfig.addEventType("StockTick",Tick.class.getName());
    EPServiceProvider cep = EPServiceProviderManager.getProvider("myCEPEngine",cepConfig);
    EPRuntime cepRT = cep.getEPRuntime();

    // We register an EPL statement
    EPAdministrator cepAdm = cep.getEPAdministrator();
    EPStatement cepStatement = cepAdm.createEPL("select * from " +
                                "StockTick(symbol=‘AAPL‘).win:length(2) " +
                                "having avg(price) > 6.0");
  }

这里,我们的规则设置为:每当最近的2次数据的平均值大于6.0时,触发事件。

下一步,主要是创建一个监听者并把它和我们的选择规则产生的事件关联起来。可以这么做:

cepStatement.addListener(new CEPListener());

这里有不同的方式来实现监听者,下面的是其中最简单的一种。这里监听者只是简单地把它从引擎中收到的对象打印出来:

public static class CEPListener implements UpdateListener {
 public void update(EventBean[] newData, EventBean[] oldData) {
         System.out.println("Event received: "
                            + newData[0].getUnderlying());
    }
}

到目前为止,看上去还不错。现在是测试我们的代码的时候了。让我们生成一些数据,看一切能否正常工作。可以在main函数中添加一下行来做到:

for(int i = 0; i< 5; i++)
    GenerateRandomTick(cepRT);

现在所有的代码看上去如下所示(我把Tick类和入口函数放在一起,这样你就能把它们复制粘贴到一个文件并运行它们)

import com.espertech.esper.client.*;
import java.util.Random;
import java.util.Date;

public class exampleMain {

    public static class Tick {
        String symbol;
        Double price;
        Date timeStamp;

        public Tick(String s, double p, long t) {
            symbol = s;
            price = p;
            timeStamp = new Date(t);
        }
        public double getPrice() {return price;}
        public String getSymbol() {return symbol;}
        public Date getTimeStamp() {return timeStamp;}

        @Override
        public String toString() {
            return "Price: " + price.toString() + " time: " + timeStamp.toString();
        }
    }

    private static Random generator = new Random();

    public static void GenerateRandomTick(EPRuntime cepRT) {

        double price = (double) generator.nextInt(10);
        long timeStamp = System.currentTimeMillis();
        String symbol = "AAPL";
        Tick tick = new Tick(symbol, price, timeStamp);
        System.out.println("Sending tick:" + tick);
        cepRT.sendEvent(tick);

    }

    public static class CEPListener implements UpdateListener {

        public void update(EventBean[] newData, EventBean[] oldData) {
            System.out.println("Event received: " + newData[0].getUnderlying());
        }
    }

    public static void main(String[] args) {

//The Configuration is meant only as an initialization-time object.
        Configuration cepConfig = new Configuration();
        cepConfig.addEventType("StockTick", Tick.class.getName());
        EPServiceProvider cep = EPServiceProviderManager.getProvider("myCEPEngine", cepConfig);
        EPRuntime cepRT = cep.getEPRuntime();

        EPAdministrator cepAdm = cep.getEPAdministrator();
        EPStatement cepStatement = cepAdm.createEPL("select * from " +
                "StockTick(symbol=‘AAPL‘).win:length(2) " +
                "having avg(price) > 6.0");

        cepStatement.addListener(new CEPListener());

       // We generate a few ticks...
        for (int i = 0; i < 5; i++) {
            GenerateRandomTick(cepRT);
        }
    }
}

Output:

log4j:WARN No appenders could be found for logger (com.espertech.esper.epl.metric.MetricReportingPath).
log4j:WARN Please initialize the log4j system properly.
Sending tick:Price: 6.0 time: Tue Jul 21 01:11:15 CEST 2009
Sending tick:Price: 0.0 time: Tue Jul 21 01:11:15 CEST 2009
Sending tick:Price: 7.0 time: Tue Jul 21 01:11:15 CEST 2009
Sending tick:Price: 4.0 time: Tue Jul 21 01:11:15 CEST 2009
Sending tick:Price: 9.0 time: Tue Jul 21 01:11:15 CEST 2009
Event received: Price: 9.0 time: Tue Jul 21 01:11:15 CEST 2009

正如你看到的,只有最后两行数据平均值大于6,因此只有一个事件最终被引擎触发了。相当不错!

Oh,你或许担心输出中的第一行,是的,这里还有一点小问题。事实上,Esper使用的日志生成包log4j导致了这个警告。它是可以通过一个叫log4j.xml的文件来配置的,你可以在esper-3.1.0/examples下的某个例子中的/etc目录下找到它。我不认为给我们所有的程序都弄一个xml配置文件是个好主意,所以我们在以下代码中,用点技巧来配置我们的logger,在你的文件开始处加入些import和代码:

import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.SimpleLayout;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
//and this in the main function before the rest of your code:

public static void main(String [] args){

      SimpleLayout layout = new SimpleLayout();
      ConsoleAppender appender = new ConsoleAppender(new SimpleLayout());
      Logger.getRootLogger().addAppender(appender);
      Logger.getRootLogger().setLevel((Level) Level.WARN);
(...)

5分钟到此结束。

下一篇文章中,我将更深入一些来探索EPL语句,提供一些代码来连接两个引擎实现所谓的“事件精化”(event refinement)(译者:好像之后作者再也没有更新过了,所以,不要指望后续了:))

原文地址:https://coffeeonesugar.wordpress.com/2009/07/21/getting-started-with-esper-in-5-minutes/刘斌华原创翻译,转载请注明出处

时间: 2024-11-29 06:17:12

5分钟开启Esper之旅的相关文章

【啊哈!算法】算法9:开启树之旅

这是什么?是一个图?不对,确切的说这是一棵树.这哪里像树呢?不要着急我们来变换一下. 是不是很像一棵倒挂的树,也就是说它是根朝上,而叶子朝下的.不像?哈哈,看完下面这幅图你就会觉得像啦. 你可能会问:树和图有什么区别?这个称之为树的东西貌似和无向图差不多嘛.不要着急,继续往下看.树其实就是不包含回路的连通无向图.你可能还是无法理解这其中的差异,举个例子,如下.          上面这个例子中左边的是一棵树,而右边的是一个图.因为左边的没有回路,而右边的存在1->2->5->3->

【坐在马桶上看算法】算法9:开启“树”之旅

我们先来看一个例子. 这是什么?是一个图?不对,确切的说这是一棵树.这哪里像树呢?不要着急我们来变换一下. 是不是很像一棵倒挂的树,也就是说它是根朝上,而叶子朝下的.不像?哈哈,看完下面这幅图你就会觉得像啦. 你可能会问:树和图有什么区别?这个称之为树的东西貌似和无向图差不多嘛.不要着急,继续往下看.树其实就是不包含回路的连通无向图.你可能还是无法理解这其中的差异,举个例子,如下.          上面这个例子中左边的是一棵树,而右边的是一个图.因为左边的没有回路,而右边的存在1->2->5

设计模式(一) 开启设计之旅

设计模式--开启设计之旅 我的设计模式之旅:      "身体和灵魂,总有一个在路上",有的人旅行,有的人看书,还有些人在旅行中写出启迪人心的好书来,身体在路上赋予了灵魂的力量.我的设计模式之旅,与你.与我,能带来什么呢?抛开遥远深邃的美丽传说,是为了感谢Java OO给了我新的生命力,是为了将自己的所学贡献给社区,是为了给后生多点上一盏灯,是为了让自己的灵魂保持活力,最后,也是最重要的,为了中国的软件行业不要多出我这个码农.转行本来是要冒风险,要转身,为什么不华丽转身呢?既然转身,为

开启“树”之旅

我们先来看一个例子. 这是什么?是一个图?不对,确切的说这是一棵树.这哪里像树呢?不要着急我们来变换一下. 是不是很像一棵倒挂的树,也就是说它是根朝上,而叶子朝下的.不像?哈哈,看完下面这幅图你就会觉得像啦. 你可能会问:树和图有什么区别?这个称之为树的东西貌似和无向图差不多嘛.不要着急,继续往下看.树其实就是不包含回路的连通无向图.你可能还是无法理解这其中的差异,举个例子,如下.          上面这个例子中左边的是一棵树,而右边的是一个图.因为左边的没有回路,而右边的存在1->2->5

搞定Android模拟器,开启甜蜜之旅

在前几期中总结分享了Android的前世今生.Android 系统架构和应用组件那些事.带你一起来聊一聊Android开发环境,上三期分别养成高富帅.轻松邂逅女神.并和女神约会成功,那么从本期开始准备出发甜蜜之旅. Android程序必须运行在Android系统,因此Android开发时必须准备相关的运行环境,即Android虚拟设备(Android模拟器). 一.AVD Manager入口 进入ADT Bundle工具包的解压包下面的eclipse目录,双击"eclipse.exe"

如何用15分钟开启高效的一天

原文在这里:http://www.lifehack.org/articles/productivity/14-things-productive-people-the-first-15-minutes-the-workday.html,作者YONG KANG CHAN,译者foruok,转载请注明出处http://blog.csdn.net/foruok.本文首发于我的微信订阅号"程序视界". 你上班的第一个15分钟会定下你整个工作日的基调.(译者注:这与一支股票开盘15分钟内的表现会

开启Linux之旅--Windows本机连接远程Linux主机

如果你觉得使用虚拟机安装Linux跑起来太慢,想同时拥有一台Linux主机和Window主机,又不想来回扭头在两个显示器之间切换.这时候要是出来个一个远程控制软件,一根网线就可以让你拥有虚拟机的快捷的界面切换和流畅的运行速度.VNC,即Visual Network Computer,就是可以胜任这一角色的优秀的远程控制软件. 一般采用putty和vnc结合的方式,下面分别介绍putty和vnc: 1.putty: putty是一款超轻量级的运行在windows操作系统上的用于远程连接linux服

(一)C#编程基础复习——开启编程之旅

回想当年学习编程,刚开始学习是非常艰苦的,可能是因为文科生原因,刚开始接触工科类的知识不是很擅长,上去大学第一年基本没有好好学习编程,入门C#编程基础一窍不通,也许那时年少无知,第二学期开始奋发图强,终于在编程之路越走越远,现在目前在国内BAT某家公司从事互联网内部系统开发工具的开发,趁这段时间有点空闲好好总结,复习下以前的知识,编程之路漫漫,只有不断的成长和学习,才能走得更远,不忘初心,方得始终.这里先讲几个基础,然后层层深入,尽管自己学识也不是很渊博,但是写下这些回忆,或许对自己也是有很大帮

开启工作之旅,做一名优秀的科研逃兵

就在几个月以前,我还坚定的想成为一个科学工作者.这是我长期的想法,几乎没有动摇过.可惜今年以来,身边发生的种种事情都让我想退出学术圈(虽然我还没进去),越来越觉得我需要重新审视自己的定位.是的,报效祖国的方式并非只有科研,其他的工作也是社会主义事业的核心力量.读书这么多年,主旋律的价值观或多或少对我产生了影响,当然结合自己的兴趣爱好,做科研是一件极其优雅的工作.但是一些因素已经导致我在这方面远远落后于前沿方向了,而且我恐怕没有足够的魄力排除外周的压力来搞科研,说白了就是担心英语差和科研不赚钱.