Calcite中的流式SQL

Calcite中的流式SQL


Calcite中的流式SQL总体设计思路

总体语法应该兼容SQL,这个是和目前流处理SQL的发展趋势是一致的。

如果部分功能标准SQL中没有包含,则尽量采用业界标杆(Oracle)。比如模式匹配的功能,目前流处理中还没有针对语法达成共识,那么在设计上,就采用Oracle data warehouse的Match Recognize的方式。还有滑窗功能。

如果还有功能目前业界标杆都没有,那么就通过函数的方式拓展,翻滚窗口和跳动窗口,这两个窗口在标准SQL中都是不包含的,所以就采用了Ceil,Tumble,Hop等函数的方式来实现功能。

总体思路就是在兼容标准SQL的基础上做尽可能少的拓展,保证语义和标准SQL一致,尽最大可能减少私有化的语法

Calcite StreamSQL说明

  1. 在DDL中明确定义schema是流Or表,TODO:官网没有示例,待补充

    比如有三张schema:

    Orders (rowtime, productId, orderId, units) - 既是表,又是流

    Products (rowtime, productId, name) - 表

    Shipments (rowtime, orderId) - 流

  2. 查询中如果包含Stream关键字,就是流式查询,如果不包含,就是表查询。表查询可以马上返回结果并结束,流式查询只会输出结果但并不结束。

    比如下面一个流查询示例:

SELECT STREAM *
FROM Orders;

  rowtime | productId | orderId | units
----------+-----------+---------+-------
 10:17:00 |        30 |       5 |     4
 10:17:05 |        10 |       6 |     1
 10:18:05 |        20 |       7 |     2
 10:18:07 |        30 |       8 |    20
 11:02:00 |        10 |       9 |     6
 11:04:00 |        10 |      10 |     1
 11:09:30 |        40 |      11 |    12
 11:24:11 |        10 |      12 |     4

表查询示例:

SELECT *
FROM Orders;

  rowtime | productId | orderId | units
----------+-----------+---------+-------
 08:30:00 |        10 |       1 |     3
 08:45:10 |        20 |       2 |     1
 09:12:21 |        10 |       3 |    10
 09:27:44 |        30 |       4 |     2

4 records returned.

流和表的查询不能混用,否则会报错

SELECT * FROM Shipments;

ERROR: Cannot convert stream ‘SHIPMENTS‘ to a table

SELECT STREAM * FROM Products;

ERROR: Cannot convert table ‘PRODUCTS‘ to a stream
  1. 其他过滤,排序,having等操作和标准sql一致,不再举例。
  2. 子查询只需要在外层语句写Stream关键字即可,内层写了无效。

    如:

SELECT STREAM rowtime, productId
FROM (
  SELECT TUMBLE_END(rowtime, INTERVAL ‘1‘ HOUR) AS rowtime,
    productId,
    COUNT(*) AS c,
    SUM(units) AS su
  FROM Orders
  GROUP BY TUMBLE(rowtime, INTERVAL ‘1‘ HOUR), productId)
WHERE c > 2 OR su > 10;

  rowtime | productId
----------+-----------
 10:00:00 |        30
 11:00:00 |        10
 11:00:00 |        40

窗口功能说明

翻滚窗(Tumbling window)

两个窗口之间数据没有重叠。

SELECT STREAM CEIL(rowtime TO HOUR) AS rowtime,
  productId,
  COUNT(*) AS c,
  SUM(units) AS units
FROM Orders
GROUP BY CEIL(rowtime TO HOUR), productId;

  rowtime | productId |       c | units
----------+-----------+---------+-------
 11:00:00 |        30 |       2 |    24
 11:00:00 |        10 |       1 |     1
 11:00:00 |        20 |       1 |     7
 12:00:00 |        10 |       3 |    11
 12:00:00 |        40 |       1 |    12

该示例每个小时结束的时候,输出这个小时的统计结果,11点整,输出1点的统计结果。以事件为驱动,内部不包含定时器。

下面的例子和上面的例子等价

SELECT STREAM TUMBLE_END(rowtime, INTERVAL ‘1‘ HOUR) AS rowtime,
  productId,
  COUNT(*) AS c,
  SUM(units) AS units
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL ‘1‘ HOUR), productId;

  rowtime | productId |       c | units
----------+-----------+---------+-------
 11:00:00 |        30 |       2 |    24
 11:00:00 |        10 |       1 |     1
 11:00:00 |        20 |       1 |     7
 12:00:00 |        10 |       3 |    11
 12:00:00 |        40 |       1 |    12

又比如,需要没半个小时输出一次结果,以12分钟为对齐时间,

SELECT STREAM
  TUMBLE_END(rowtime, INTERVAL ‘30‘ MINUTE, TIME ‘0:12‘) AS rowtime,
  productId,
  COUNT(*) AS c,
  SUM(units) AS units
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL ‘30‘ MINUTE, TIME ‘0:12‘),
  productId;

  rowtime | productId |       c | units
----------+-----------+---------+-------
 10:42:00 |        30 |       2 |    24
 10:42:00 |        10 |       1 |     1
 10:42:00 |        20 |       1 |     7
 11:12:00 |        10 |       2 |     7
 11:12:00 |        40 |       1 |    12
 11:42:00 |        10 |       1 |     4

跳动窗(HOP window)

两个窗口之间的数据有一定重叠。

跳动窗口是广义化的翻滚窗,允许数据在窗口中保存更长时间。

比如下面的例子,数据输出时间为11:00,但是其中还包含08:00到11:00的数据,以及09:00到12:00的数据,一个输入行对应三个输出行。

SELECT STREAM
  HOP_END(rowtime, INTERVAL ‘1‘ HOUR, INTERVAL ‘3‘ HOUR) AS rowtime,
  COUNT(*) AS c,
  SUM(units) AS units
FROM Orders
GROUP BY HOP(rowtime, INTERVAL ‘1‘ HOUR, INTERVAL ‘3‘ HOUR);

  rowtime |        c | units
----------+----------+-------
 11:00:00 |        4 |    27
 12:00:00 |        8 |    50

滑动窗(sliding window)

Calcite中的滑动窗采用标准的Over方式,直接套用了标准SQL中的分析函数。

SELECT STREAM rowtime,
  productId,
  units,
  SUM(units) OVER (ORDER BY rowtime RANGE INTERVAL ‘1‘ HOUR PRECEDING) unitsLastHour
FROM Orders;

下面的一个例子展示了过去10分钟的平均订单大小和上周平均订单的比较数据

SELECT STREAM *
FROM (
  SELECT STREAM rowtime,
    productId,
    units,
    AVG(units) OVER product (RANGE INTERVAL ‘10‘ MINUTE PRECEDING) AS m10,
    AVG(units) OVER product (RANGE INTERVAL ‘7‘ DAY PRECEDING) AS d7
  FROM Orders
  WINDOW product AS (
    ORDER BY rowtime
    PARTITION BY productId))
WHERE m10 > d7;

在这个例子中,使用Window子句来定义部分窗口,然后在每个OVER子句中在进行细化。初次之外,还可以在window子句中定义所有的窗口。

这种实现方式中,在后台同时维护了两张表,10分钟和7天的窗口数据。你可以直接访问到这些表,不需要做显示的查询。

这种语法还可以实现其他一些功能:

* 行组窗口

* 引用尚未到达的行,流将等待,直到它们到达。

* 可以支持其他RANK等统计分析函数

级联窗口(叠加窗口)

下面的查询显示了这样一个场景,返回每个记录的统计结果,但是该结果会在固定时间被重置。

SELECT STREAM rowtime,
  productId,
  units,
  SUM(units) OVER (PARTITION BY FLOOR(rowtime TO HOUR)) AS unitsSinceTopOfHour
FROM Orders;

这种方式类似滑窗的查询,但是单调表达式发生在Partition by子句中。随着时间从10:59:59到11:00:00,Floor从10:00:00变为11:00:00,因此,一个新的分组开始产生了。sum的统一结果开始重置。

Calcite知道旧分组永远不会再次使用,因此会从内部存储中删除该分组的所有统计结果。

行组窗

使用Window语法和Over方式可以做到。

单调和准单调

这是作者在Calcite的StreamSQL中提出的概念。

如果一个列或者表达式是递增或者递减的,那么就成为是单调的。

如果列或者表达式是乱序的,并且有一种机制(比如标点符号或者水印)来生成特定值永远不会被看到,那么这列或者表达式就是准单调的。

概念很南理解,但是其实就是要求流上的数据是全局有序的。可以是事件顺序,或者事件id的顺序。一般情况下,我们会自动为事件补齐时间。

有了这种顺序,我们就能很容易实现水印这样的功能了。

流和表的说明

CREATE VIEW HourlyOrderTotals (rowtime, productId, c, su) AS
  SELECT TUMBLE_END(rowtime, INTERVAL ‘1‘ HOUR),
    productId,
    COUNT(*),
    SUM(units)
  FROM Orders
  GROUP BY TUMBLE(rowtime, INTERVAL ‘1‘ HOUR), productId;

SELECT STREAM rowtime, productId
FROM HourlyOrderTotals
WHERE c > 2 OR su > 10;

  rowtime | productId
----------+-----------
 10:00:00 |        30
 11:00:00 |        10
 11:00:00 |        40

看看上面的视图,这个是一张表还是一个流?

因为它没有使用Stream关键字,所以必然是一个关系,是一张表,但是它是可以被转化为流的表。你可以在流和关系的查询中使用它。

和它等价的查询还有:

WITH HourlyOrderTotals (rowtime, productId, c, su) AS (
  SELECT TUMBLE_END(rowtime, INTERVAL ‘1‘ HOUR),
    productId,
    COUNT(*),
    SUM(units)
  FROM Orders
  GROUP BY TUMBLE(rowtime, INTERVAL ‘1‘ HOUR), productId)
SELECT STREAM rowtime, productId
FROM HourlyOrderTotals
WHERE c > 2 OR su > 10;

  rowtime | productId
----------+-----------
 10:00:00 |        30
 11:00:00 |        10
 11:00:00 |        40

这种方法不限于子查询和视图,流式SQL中的每个查询都被定义为关系查询,并且使用最顶层Select子句中的Stream关键字被转换为流

流上的Join分为两种,流和表的Join以及流和流的Join

流上的Join实际都是窗口和窗口的JOin,或者窗口和表的Join,本质上都是表之间的Join,因为窗口就是一张表

比如下面一个流和表之间的Join

 SELECT STREAM o.rowtime, o.productId, o.orderId, o.units,
  p.name, p.unitPrice
FROM Orders AS o
JOIN Products AS p
  ON o.productId = p.productId;

  rowtime | productId | orderId | units | name   | unitPrice
----------+-----------+---------+-------+ -------+-----------
 10:17:00 |        30 |       5 |     4 | Cheese |        17
 10:17:05 |        10 |       6 |     1 | Beer   |      0.25
 10:18:05 |        20 |       7 |     2 | Wine   |         6
 10:18:07 |        30 |       8 |    20 | Cheese |        17
 11:02:00 |        10 |       9 |     6 | Beer   |      0.25
 11:04:00 |        10 |      10 |     1 | Beer   |      0.25
 11:09:30 |        40 |      11 |    12 | Bread  |       100
 11:24:11 |        10 |      12 |     4 | Beer   |      0.25

Order是流,Products是表。两个Join之后结果肯定是流,然后,因为没有窗口,所以默认情况下应该是一个仅仅保存当前数据的长度为1的窗口,当前Order数据和Products做Join。

流和流的Join如下:

SELECT STREAM o.rowtime, o.productId, o.orderId, s.rowtime AS shipTime
FROM Orders AS o
JOIN Shipments AS s
  ON o.orderId = s.orderId
  AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL ‘1‘ HOUR;

  rowtime | productId | orderId | shipTime
----------+-----------+---------+----------
 10:17:00 |        30 |       5 | 10:55:00
 10:17:05 |        10 |       6 | 10:20:00
 11:02:00 |        10 |       9 | 11:58:00
 11:24:11 |        10 |      12 | 11:44:00

这个查询中没有显式的定义窗口,但是实际上已经通过where条件来锁定了数据范围。也就是说,会自动将数据保存在一个窗口中。

DML语句

可以使用Create View语句来创建视图,上面已经可以看到,同时,也可以使用Insert AS select的方式将流上的数据导入其他流。

比如:

CREATE VIEW LargeOrders AS
SELECT STREAM * FROM Orders WHERE units > 1000;

INSERT INTO LargeOrders
SELECT STREAM * FROM Orders WHERE units > 1000;

还可以通过Upsert语句来维护窗口数据

UPSERT INTO OrdersSummary
SELECT STREAM productId,
  COUNT(*) OVER lastHour AS c
FROM Orders
WINDOW lastHour AS (
  PARTITION BY productId
  ORDER BY rowtime
  RANGE INTERVAL ‘1‘ HOUR PRECEDING)

开发计划

已经完成

  • 流上的 SELECT, WHERE, GROUP BY, HAVING, UNION ALL, ORDER BY
  • FLOOR和CEIL函数
  • 单调性(Monotonicity)
  • 禁用流式结果集

    比如:

    > SELECT STREAM * FROM (VALUES (1, ‘abc‘));
    ERROR: Cannot stream VALUES

未完成

  • 流和流的Join
  • 流和表的Join
  • 基于视图的流
  • 流上的包含Order by的Union ALL(流合并)
  • 流上的关系型查询
  • 流上的窗口聚合(滑窗和级联窗口)
  • 忽略视图和子查询中的Stream关键字
  • 流上的Order by不能包含Offset和Limit
  • 运行时候检查是否有足够的历史记录数据来进行查询
  • 准单调性–需要有一种机制,能够申明数据一旦计算完毕,就不再更新,或者是计算完毕之后,如果最新结果有刷新,再来更新。比如水印功能。
  • HOP和TUMBLE函数,以及辅助性的HOP_START, HOP_END, TUMBLE_START, TUMBLE_END函数

引用:

http://calcite.apache.org/docs/stream.html

时间: 2024-12-15 07:19:11

Calcite中的流式SQL的相关文章

移动web中的流式布局和viewport知识介绍

1   流式布局 其实  流式布局  就是百分比布局,通过盒子的宽度设置成百分比来根据屏幕的宽度来进行伸缩,不受固定像素的限制,内容向两侧填充. 这样的布局方式  就是移动web开发使用的常用布局方式 2    Viewport 我们猜想下pc页面在移动设备上显示情况. 放不下,缩放? 我们测试下pc页面在移动设备上显示. 默认的缩放的显示的 认识viewport 在移动端用来承载网页的这个区域,就是我们的视觉窗口,viewport(视口),这个区域可是设置高度宽度,可是按比例放大缩小,而且能设

流式数据处理的计算模型 转

分类: 大数据 接触这块将近3个月左右,期间给自己的定位也是业务层开发.对平台级的产品没有太深入的理解和研究,所以也不能大谈特谈什么storm架构之类的了. 说说业务中碰到流式计算问题吧: 1.还是要介绍下简要的架构(原谅我不会画图) 流式数据接入层------------------->流式数据处理层------------------->结果数据归档层 || || || V 中间数据存储层 所有的数据通过接入层源源不断地进入到这个系统, 在数据处理层得到相应的计算存储, 最后将结果写入到归

自定义流式布局

1.概述 何为FlowLayout,就是控件根据ViewGroup的宽,自动的往右添加,如果当前行剩余空间不足,则自动添加到下一行.有点所有的控件都往左飘的感觉,第一行满了,往第二行飘~所以也叫流式布局.Android并没有提供流式布局,但是某些场合中,流式布局还是非常适合使用的,比如关键字标签,搜索热词列表等,比如下图: 这些都特别适合使用FlowLayout 2.简单的分析 1.对于FlowLayout,需要指定的LayoutParams,我们目前只需要能够识别margin即可,即使用Mar

理解 Node.js 中 Stream(流)

Stream(流) 是 Node.js 中处理流式数据的抽象接口. stream 模块用于构建实现了流接口的对象. Node.js 提供了多种流对象. 例如,对 HTTP 服务器的request请求和 process.stdout(标准输出), 都是流的实例. 流可以是可读的.可写的.或者可读可写的. 所有的流都是 EventEmitter 的实例. Stream 的4种类型 1. Readable - 可读的流(fs.createReadStream()) 2. Writable - 可写的流

Demo:基于 Flink SQL 构建流式应用

Flink 1.10.0 于近期刚发布,释放了许多令人激动的新特性.尤其是 Flink SQL 模块,发展速度非常快,因此本文特意从实践的角度出发,带领大家一起探索使用 Flink SQL 如何快速构建流式应用. 本文将基于 Kafka, MySQL, Elasticsearch, Kibana,使用 Flink SQL 构建一个电商用户行为的实时分析应用.本文所有的实战演练都将在 Flink SQL CLI 上执行,全程只涉及 SQL 纯文本,无需一行 Java/Scala 代码,无需安装 I

流式数据中的数学统计量计算

在科技飞速发展的今天,每天都会产生大量新数据,例如银行交易记录,卫星飞行记录,网页点击信息,用户日志等.为了充分利用这些数据,我们需要对数据进行分析.在数据分析领域,很重要的一块内容是流式数据分析.流式数据,也即数据是实时到达的,无法一次性获得所有数据.通常情况下我们需要对其进行分批处理或者以滑动窗口的形式进行处理.分批处理也即每次处理的数据之间没有交集,此时需要考虑的问题是吞吐量和批处理的大小.滑动窗口计算表示处理的数据每次向前移N个单位,N小于要处理数据的长度.例如,在语音识别中,每个包处理

Android中常见的热门标签的流式布局的实现

一.概述:在日常的app使用中,我们会在android 的app中看见 热门标签等自动换行的流式布局,今天,我们就来看看如何 自定义一个类似热门标签那样的流式布局吧(源码下载在下面最后给出) 类似的自定义布局.下面我们就来详细介绍流式布局的应用特点以及用的的技术点: 1.流式布局的特点以及应用场景    特点:当上面一行的空间不够容纳新的TextView时候,    才开辟下一行的空间 原理图: 场景:主要用于关键词搜索或者热门标签等场景2.自定义ViewGroup,重点重写下面两个方法 1.o

golang gin框架中实现大文件的流式上传

一般来说,通过c.Request.FormFile()获取文件的时候,所有内容都全部读到了内存.如果是个巨大的文件,则可能内存会爆掉:且,有的时候我们需要一边上传一边处理. 以下的代码实现了大文件流式上传. 还非常不完美,但是可以作为参考: upload.html <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>up

流式处理框架对比

分布式流处理是对无边界数据集进行连续不断的处理.聚合和分析的过程,与MapReduce一样是一种通用计算框架,期望延迟在毫秒或者秒级别.这类系统一般采用有向无环图(DAG).DAG是任务链的图形化表示,用它来描述流处理作业的拓扑.在选择不同的流处理系统时,通常会关注以下几点: 运行时和编程模型:平台框架提供的编程模型决定了许多特色功能,编程模型要足够处理各种应用场景. 函数式原语:流处理平台应该能提供丰富的功能函数,比如,map或者filter这类易扩展.处理单条信息的函数;处理多条信息的函数a