Esper系列(五)Order by、Limit、构建事件流、Updating an Insert Stream

Order by

与SQL语法类似类似,默认为升序排列;
注意:

  1. 如果order by的子句中出现了聚合函数,那么该聚合函数必须出现在select的子句中。
  2. 出现在select中的expression或者在select中定义的expression,在order by中也有效。
  3. 如果order by所在的句子没有join或者没有group by,则排序结果幂等,否则为非幂等。

 

Limit
格式一:

limit row_count [offset offset_count]

注意:row_count和offset_count既可以是常量也可以是变量.
例句:

// 从结果集中第三条开始输出5个事件流,offset 2表示跳过前面两个事件流,limit 5表示出入的个数
String epsql = "select value as result from inEvent.win:length_batch(20) limit 5 offset 2";

格式二:

limit offset_count[, row_count]

例句:

String epsql = "select value as result from inEvent.win:length_batch(20) limit 2,5";

 

构建事件流

Insert(未定义的事件流)
功能:根据已有的事件流组合生成新的事件流.

 

格式:

insert [istream | irstream | rstream] into event_stream_name  [ (property_name [, property_name] ) ]

说明:

istream表示新事件流(New Events),rstream表示旧事件流(Old Events),irstream两者都包含event_stream_name为新建事件流名称,property_name为事件流属性。

例句:

// 创建inEvent事件流该事件流有两个属性字段name与salary
String insql = "insert into inEvent select name,salary from  orderEvent";
// 创建EPL
epAdmin.createEPL(insql);
// 查询inEvent事件流中属性salary大于150的name属性字段
String epsql = "select name as result from inEvent where salary > 150 

 

多事件流Insert

例句:

// 将事件流orderEvent的name、salary属性值分别赋值插给inEvent事件流的content、price属性
String insql1 = "insert into inEvent(content,price) select name,salary from orderEvent";
epAdmin.createEPL(insql1);
 
// 将事件流orderEvent中的JavaBean,bean中的属性key、value属性值分别赋值插给inEvent事件流的content、price属性
String insql2 = "insert into inEvent (content,price) select bean.key,bean.value from orderEvent ";
epAdmin.createEPL(insql2);
 
// 查询数据流inEvent中content属性值
10  String epsql = "select content as result from inEvent ";

注意:

Insert新创建的事件流属性字段,可由自定义静态函数返回,但一定要返回javabean,map,或者Object数组,且不能用as来为转换后的结果设置别名;

例子:

String insql = "insert into msgEvent select BaseUntil.getEvent() from orderEvent";
epAdmin.createEPL(insql);
String epsql = "select * from msgEvent";

其中BaseUntil.getEvent()返回orderEvent类型的javaBean;

Insert(已定义的事件流)

上面对事件流的构建都是新生成的,即事件流没有预定义。在事件流有预定义的情况下,Insert中引用该事件流时必须带包名。
例如:

文件名:msgEvent.java

// msgEvent事件流定义
public class msgEvent {
    private int msgId;
    private String msgInfo;
    // 注意该事件流定义中没有对应属性字段的set方法,只能通过构造函数改变属性值
    public msgEvent(int msgId, String msgInfo) {
        super();
        this.msgId = msgId;
        this.msgInfo = msgInfo;
10      }
11   
12      @Override
13      public String toString() {
14          return "msgId:"+msgId+",msgInfo:"+msgInfo;
15      }
16  }

 

文件名:BaseUntil.java

public class BaseUntil {
   
    public static int Add(int n){
        return n+100;
    }
   
    public static String UpdataText(String str){
        return str+",你好!";
    }
10     
11      public static orderEvent getEvent(){
12          orderEvent event = new orderEvent();
13          event.setName("张三");
14          event.setSalary(50000);
15          return event;
16      }
17  }

 

文件名:orderListener

/**
 * 用于监听某个EPL在引擎中的运行情况,事件进入并产生结果后会回调UpdateListener
 * 必须实现 UpdateListener 接口
 */
public class orderListener implements UpdateListener {
 
    /**
     * arg0对应newEvent,arg1对应oldEvent
     */
10      @Override
11      public void update(EventBean[] arg0, EventBean[] arg1) {
12          if (null != arg0) {
13              for (int i=0;i<arg0.length;i++){
14                  System.out.println("orderEvent Count is "+arg0.length+",EventBean is "+arg0[i].getUnderlying());
15              }
16          } else {
17              System.out.println("EventBean is Null ");
18          }
19      }
20  }

 

文件名:orderMainTest.java

public class orderMainTest {
 
    public static void main(String[] args) throws InterruptedException {
 
        // 添加配置(包所在路劲),方面后面的引用自动添加包名前缀
        Configuration config = new Configuration();
        config.addEventTypeAutoName("cn.chenx.esper.insert");
        //
        EPServiceProvider epServiceProvider = EPServiceProviderManager
10                  .getDefaultProvider(config);
11          EPAdministrator epAdmin = epServiceProvider.getEPAdministrator();
12         
13          ConfigurationOperations configOper = epAdmin.getConfiguration();
14          configOper.addVariable("ifbool", Boolean.classfalse);
15          configOper.addImport(BaseUntil.class);
16   
17          // 事件流名称
18          String className = "orderEvent";// orderEvent.class.getName();
19          System.out.println("className is " + className);
20     
21          String insql = "insert into msgEvent select BaseUntil.getEvent() from orderEvent";
22          epAdmin.createEPL(insql);
23          String epsql = "select * from msgEvent";
24   
25          System.out.println("epsql:" + epsql);
26          EPStatement epstate = epAdmin.createEPL(epsql);
27          epstate.addListener(new orderListener());
28          EPRuntime epRuntime = epServiceProvider.getEPRuntime();
29          //
30          for (int i = 0; i < 5; i++) {
31              int seed = (int) (Math.random() * 100);
32              orderEvent event = new orderEvent("" + seed, 100 + seed);
33              System.out.println("seed name:" + event.getName() + ",salary:"
34                      + event.getSalary());
35              orderBean bean = new orderBean();
36              bean.setKey("BeanKey:" + i);
37              bean.setValue(seed+i);
38              event.setBean(bean);
39   
40              List<orderBean> list = new ArrayList<orderBean>();
41              for (int j = 0; j < 10; j++) {
42                  bean = new orderBean();
43                  bean.setKey("ListKey:" + j);
44                  bean.setValue(seed+j);
45                  list.add(bean);
46              }
47              event.setOrderBeans(list);
48   
49              Map<Integer, orderBean> map = new HashMap<Integer, orderBean>();
50              for (int k = 0; k < 10; k++) {
51                  bean = new orderBean();
52                  bean.setKey("MapKey" + k);
53                  bean.setValue(seed+k);
54                  map.put(k, bean);
55              }
56              event.setOrderMap(map);
57   
58              epRuntime.sendEvent(event);
59          }
60      }
61  }

 

Updating an Insert Stream

功能:
在事件即将被用于计算前,改变其自身的属性值,然后再将其用于后面的计算.

格式:

update istream event_type [as stream_name]
  set property_name = set_expression [, property_name = set_expression] [,...]
  [where where_expression]

说明:

  • 因为istream的限制,所以该语法只支持新输入的事件.
  • event_type代表要更新的事件,set之后的property_name是要更新的事件属性,最后可以用where子句进行简单的过滤.
  • update句首用@Priority这个注解,使更新事件的顺序是以优先级最高的最先更新;

注意:

  • 如果事件是POJO,那么要实现java.io.Serializable接口。因为引擎内部的update操作实际上是要先深复制原事件再更新拷贝后的事件,不会对原事件作出任何修改。
  • 设置属性的表达式不能用聚合函数.
  • 如果事件是xml,update语法则不适用.
  • update操作不可用于嵌套的事件.
时间: 2024-08-03 09:30:14

Esper系列(五)Order by、Limit、构建事件流、Updating an Insert Stream的相关文章

MySql学习(二) —— where / having / group by / order by / limit 简单查询

这篇博客主要记录sql的五种子句查询语法! 一个重要的概念:将字段当做变量看,无论是条件,还是函数,或者查出来的字段. select五种子句 where 条件查询 group by 分组 having 筛选 order by 排序 limit 限制结果条数 为了练习上面5种子句,先建立一张goods表,主要用于查询操作,表结构如下: 1.基础查询 —— where where常用运算符: 1.1 查出主键为20的商品 :mysql> SELECT goods_id,cat_id,goods_sn

VSTO之旅系列(五):创建Outlook解决方案

原文:VSTO之旅系列(五):创建Outlook解决方案 本专题概要 引言 Outlook对象模型 自定义Outlook窗体 小结 一.引言 在上一个专题中,为大家简单介绍了下如何创建Word解决方案的,所以本专题中将为大家介绍下Outlook相关的内容.我们从Visual Studio 2010 中Office节点下的模板中我们可以看到,Outlook只有外接程序的模板,并没有提供像Word或Excel这样的文档级的模板,所以VSTO没有为Outlook解决方案创建宿主项和宿主控件(Excel

UIPro实例讲解之QQ2014 UI模仿系列五 - 聊天气泡

UIPro的宗旨是:让Windows UI开发像写小网页一样简单! 猜测了下QQ的气泡模式的实现方法: 分为两层,上面一层是windowless richedit, 透明模式:下面一层一个容器,包含有用户头像和气泡图片:下面一层随richedit一起滚动. 添加每一个paragraph后,可以得到这个段落的rang的左上角和右下角,从而得到该paragraph所在的矩形区域rect.然后就在下层的容器中,插入一个气泡,设置其rect:richedit滚动的时候,下层容器处理下事件. 未完待续 U

用Qt写软件系列五:一个安全防护软件的制作(1)

引言 又有许久没有更新了.Qt,我心爱的Qt,为了找工作不得不抛弃一段时间,业余时间来学一学了.本来计划要写一系列关于Qt组件美化的博文,但是写了几篇之后就没坚持下去了.技术上倒是问题不大,主要是时间不够充裕.这段时间写几篇关于界面整体设计的博文,从最基础的界面元素开始,到最后构建一个页面元素丰富的桌面应用程序.Trojan Assessment Platform是一个原型设计项目,只是实现了有限的一部分功能.远远还称不上是一个评估平台.这里仅仅侧重于用Qt做界面的实现. 界面预览 首先还是看看

Java学习之Xml系列五:SAX解析——搜索xml内容

本文对SAX解析进一步说明. 另外主要给利用SAX解析方法找到指定条件(如标签名称)的xml文档内容. 首先按需要介绍一下DefaultHandler. DefaultHandler类是SAX2事件处理程序的默认基类.它继承了EntityResolver.DTDHandler.ContentHandler和ErrorHandler这四个接口.包含这四个接口的所有方法,所以我们在编写事件处理程序时,可以不用直接实现这四个接口,而继承该类,然后重写我们需要的方法.(注意:ContentHandler

基于Kafka构建事件溯源模式的微服务

概要 本文中我们将讨论如何借助Kafka实现分布式消息管理,使用事件溯源(Event Sourcing)模式实现原子化数据处理,使用CQRS模式(Command-Query Responsibility Segregation )实现查询职责分离,使用消费者群组解决单点故障问题,理解分布式协调框架Zookeeper的运行机制.整个应用的代码实现使用Go语言描述. 第一部分 引子.环境准备.整体设计及实现第二部分 消息消费者及其集群化第三部分 测试驱动开发.Docker部署和持续集成第一部分 引子

Apache Kafka系列(五) Kafka Connect及FileConnector示例

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka系列(三) Java API使用 Apache Kafka系列(四) 多线程Consumer方案 Apache Kafka系列(五) Kafka Connect及FileConnector示例 一. Kafka Connect简介 Kafka是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析).为何集成其他系统和解耦应用,经常使用Producer来发送消

Esper系列(二)时间窗口、长度窗口、cast、注解、自定义函数、静态方法

长度窗口实现原理图 说明: 上图长度窗口为5,事件W1至W5进入引擎后属于NewEvents队列,事件W6进入引擎后,W2至W6就属于NewEvents队列,而事件W1就属于OldEvents队列了.NewEvents为先进先出队列,队列长度为EPL语句中制定的长度窗口大小,OldEvent队列为过期数据的存放队列. EPL长度窗口示例 1  select count(*) as result from orderEvent.win:time_batch(3 sec)   时间窗口实现原理图 说

RX系列五 | Schedulers线程控制

RX系列五 | Schedulers线程控制 在我们上一篇文章中的,我们的小例子里有这么一段代码 //网络访问 .observeOn(Schedulers.io()) 事实上,我们在使用网络操作的时候,便可以控制其运行在哪个线程中,而Schedulers类,有四个方法,分别是 Schedulers.immediate(); Schedulers.newthread(); Schedulers.io(); Schedulers.computation(); 以及RxAndroid中的Android