流式数据中的数学统计量计算

在科技飞速发展的今天,每天都会产生大量新数据,例如银行交易记录,卫星飞行记录,网页点击信息,用户日志等。为了充分利用这些数据,我们需要对数据进行分析。在数据分析领域,很重要的一块内容是流式数据分析。流式数据,也即数据是实时到达的,无法一次性获得所有数据。通常情况下我们需要对其进行分批处理或者以滑动窗口的形式进行处理。分批处理也即每次处理的数据之间没有交集,此时需要考虑的问题是吞吐量和批处理的大小。滑动窗口计算表示处理的数据每次向前移N个单位,N小于要处理数据的长度。例如,在语音识别中,每个包处理大约25ms的音频数据,然后以步幅10ms向前移动处理下一个包的数据。语音识别就是一个典型的流式数据通过滑动窗口方式进行处理的例子。在本文中,我们关注N=1的情况,也即每次处理完一个包之后,向前移动一个单位继续处理下一个包,如下图所示。
图1 基于滑动窗口的流式数据处理示例
我们主要关注几个常见的数学统计量:最小(大)值、平均值和中位数。事实上,只要知道了最大值和最小值的求法,很容易计算极差;知道了平均值的求法,就可以很容易地计算方差和标准差。针对上述统计量的计算都有一个na?ve算法,也即不考虑前后两个包之间数据重叠,将每个包看成独立的,对每一个包分别计算上述统计量。如果总数据长度为n,每个包的长度为k,则计算上述统计量的复杂度为O(nk)(针对给定数组求中位数的问题,存在复杂度O(k)的算法,实现方法是基于快排进行改进,网上资料很多在此不再做介绍)。我们尝试在na?ve算法的基础上降低每个统计量的计算复杂度,下面开始正式的介绍。
1. 最小(大)值
这是一个经典问题,通常被称为滑动极值问题。问题描述:给定一个长度为n的数列a0,a1,...,an?1和一个整数k,求数列bi=min{ai,ai+1,...,ai+k?1}(i=0,1,...,n?k)。
通过使用单调队列可以在O(n)的时间内解决。单调队列维护数列的下标,队列内的元素满足:
设单调队列从头部开始的元素值为xi,则xi<xi+1且axi<axi+1。
简单来说单调队列就是下标对应的元素是严格递增的顺序(当然在实际应用过程中,可能不严格单调,也可能是递减的顺序)。
考虑以ai结尾的k个元素,求bi?k+1。假定单调递增队列中维护了ai之前的k-1个元素相关的最小值下标,为了求bi?k+1,我们需要将ai和单调队列中元素进行比较。当队列末尾的元素j满足aj≥ai,则不断取出末尾元素,直到队列为空或者aj<ai。 ai不仅会影响bi?k+1的计算,也会影响后续k-1个bi的计算。如果ai是这一段的最小值,则它在单调队列中就不会被删除,进而可以用O(1)的时间求单个bi。
当删除单调队列的元素时,需要判断头部元素是否还需要。如果已经脱离计算bi的范围,则可以删除头部元素。求单个bi的值,只需要返回单调队列的头部元素即可。均摊复杂度为O(n)。求最小值的代码如下:

#define MAX_N 100000

static int a[MAX_N];
static int b[MAX_N];
static int deque[MAX_N];

void range_min(int n,int k)
{
int s=0,t=0;//单调队列的头和尾指针

for (int i=0;i<n;i++)
{
//在单调队列的末尾加入i
while (s<t&&a[deque[t-1]]>=a[i]) t--;//维护严格的单调递增队列
deque[t++]=i;

if (i-k+1>=0)
{
b[i-k+1]=a[deque[s]];
}

//从单调队列头部删除元素
if (deque[s]==i-k+1)
{
s++;
}
}
求滑动最大值只需要将大于等于号改为小于等于号即可,维护一个单调递减队列。通过使用单调队列,流式数据中极值计算的复杂度可以由O(nk)降为O(n),当每个包的长度很大时,算法的优化效果会非常明显。滑动极值问题具有很广泛的应用,希望大家能知道这个优雅的解法。单调队列还有很多其他应用场景,比如解决《leetcode之Largest Rectangle in Histogram》。此外,在一些动态规划问题中,它也可以用来降低时间复杂度。
2. 平均值
滑动平均值的计算比较容易优化,我们需要做的就是维护区间内元素的和,除以区间元素个数k即是区间平均值。当计算下一个区间的平均值时,我们先将上一个区间的和减掉上一个区间第一个元素的值,然后加上当前区间最后一个元素的值,然后除以k即是当前区间的平均值。求区间平均值的代码如下:

#define MAX_N 100000

static int www.jhyl1.cn a[MAX_N];
static int b[MAX_N];

void range_mean(int n,www.vboyl130.cn int k)
{
int sum=0;
for (int i=0;i www.ysylcsvip.cn <n;i++)
{
sum+=a[i];
if(i-k+1>=0)
{
b[i-k+1]=sum/k;
sum-=a[i-k+1];
很明显可以看出上述代码的复杂度为O(n)。求方差可以采用类似的思路,在求和的同时也求一个平方和,之后采用方差的平方和公式即可求得方差。
3. 中位数
中位数是一个非常重要的指标,在很多应用中都会用到,但是相比前两个统计量,中位数的优化要麻烦很多。
在介绍基于滑动窗口的中位数计算之前,我们先看一个类似的问题:也是流式数据求中位数,但是每次都求前面所有数据的中位数。该问题也很经典,出现在剑指offer一书中,具体解法可参考《数据流中的中位数》。简单来说,就是构造一个最大堆和一个最小堆,最大堆的元素都小于最小堆中的元素,而且最小堆中的元素个数至多比最大堆中的元素个数多1。每次来新元素的时候,根据当前两个堆的元素个数来决定往哪个堆插入元素,在插入的同时保证上面所说的两个前提。插入复杂度是O(log n),查询复杂度是O(1)。
基于滑动窗口的中位数计算解法和上面的问题类似,也需要构造一个最大堆和最小堆,同时也满足上面的两个条件,区别就在于我们每次计算完一次中位数之后,都需要从堆中删除一个最老的元素。可以通过和中位数比较来确定删除哪个堆中的元素。通常的堆操作一般是插入和删除堆顶元素,在此需要实现一个函数可以删除任意位置的堆元素,同时保证堆的结构不被破坏,这不是一个困难的问题,实现和删除堆顶元素类似。如果数据是以数组形式一次给定,最老的元素可以通过访问原数组获得,如果流式数据一次只给定一个数据,我们可以通过循环队列保存最近的k个元素来获得最老的元素。代码实现可以参考博客《找滑动窗口的中位数》,在此就不给出详细代码。每来一个数据都需要执行一次插入和删除,复杂度是O(log k),所以针对流式数据的中位数问题算法复杂度是O(nlogk),相比朴素算法也有明显地提升。

时间: 2024-10-13 07:02:24

流式数据中的数学统计量计算的相关文章

《TCP/IP详解》之二:流式数据交互

和UDP这种“滚珠”式的协议不同(一份数据就是一个udp packet),TCP以报文段的方式传递数据,其大小受网络链路的限制.在SYN报文段中互相通告最大报文段长(MSS).所以业务层交付的数据,会被TCP拆分/合并为合适的报文段(这也就是为嘛TCP数据跟水流似的,没有边界). 对于每个报文段而言,就很像UDP的“滚珠”了,不保证顺序.不保证到达.TCP要对收到的报文重新排序,再才交给应用层.发出一个报文段后,会启动一个定时器,等待对端ACK确认收到,否则将重传该报文.由于重传机制,报文段可能

第三中情况可以用Storm分布式处理框架处理实时流式数据

http://www.blogbus.com/hrl-logs/296460063.htmlhttp://www.blogbus.com/anylt-logs/296460134.htmlhttp://www.blogbus.com/anylt-logs/296460131.htmlhttp://www.blogbus.com/hrl-logs/296460199.htmlhttp://www.blogbus.com/anylt-logs/296460425.htmlhttp://www.blo

Calcite中的流式SQL

Calcite中的流式SQL Calcite中的流式SQL总体设计思路 总体语法应该兼容SQL,这个是和目前流处理SQL的发展趋势是一致的. 如果部分功能标准SQL中没有包含,则尽量采用业界标杆(Oracle).比如模式匹配的功能,目前流处理中还没有针对语法达成共识,那么在设计上,就采用Oracle data warehouse的Match Recognize的方式.还有滑窗功能. 如果还有功能目前业界标杆都没有,那么就通过函数的方式拓展,翻滚窗口和跳动窗口,这两个窗口在标准SQL中都是不包含的

流式数据处理的计算模型 转

分类: 大数据 接触这块将近3个月左右,期间给自己的定位也是业务层开发.对平台级的产品没有太深入的理解和研究,所以也不能大谈特谈什么storm架构之类的了. 说说业务中碰到流式计算问题吧: 1.还是要介绍下简要的架构(原谅我不会画图) 流式数据接入层------------------->流式数据处理层------------------->结果数据归档层 || || || V 中间数据存储层 所有的数据通过接入层源源不断地进入到这个系统, 在数据处理层得到相应的计算存储, 最后将结果写入到归

什么是流式计算?

一.流式计算的背景 在日常生活中,我们通常会先把数据存储在一张表中,然后再进行加工.分析,这里就涉及到一个时效性的问题.如果我们处理以年.月为单位的级别的数据,那么多数据的实时性要求并不高:但如果我们处理的是以天.小时,甚至分钟为单位的数据,那么对数据的时效性要求就比较高.在第二种场景下,如果我们仍旧采用传统的数据处理方式,统一收集数据,存储到数据库中,之后在进行分析,就可能无法满足时效性的要求. 二.流式计算与批量计算 大数据的计算模式主要分为批量计算(batch computing).流式计

流式计算(二)-Kafka Stream

前面说了Java8的流,这里还说流处理,既然是流,比如水流车流,肯定得有流的源头,源可以有多种,可以自建,也可以从应用端获取,今天就拿非常经典的Kafka做源头来说事,比如要来一套应用日志实时分析框架,或者是高并发实时流处理框架,正是Kafka的拿手好戏. 环境:Idea2019.03/Gradle6.0.1/JDK11.0.4/Lambda/RHEL8.0/VMWare15.5/Springboot2.2.1.RELEASE/Zookeeper3.5.5/Kafka2.3.1 难度:新手--战

轻量级流式日志计算分析plog+(zabbix+grafana)

plog是一个用python写的流式计算分析框架,适用于轻量级流式数据的分析场景,大数据场景下大家自然想到使用spark等方案. 拿当前的业务场景看,需要对机器上nginx的流日志进行状态码.响应时间.QPS的实时分析,通过zabbix展现在grafana里,QPS在1000以内.传统方法是用shell脚本来计算各种数据,然后通过主动或被动模式传到zabbix里,此种方法有很大局限性,一是grep或awk过滤日志时,很难控制好过滤的数量,过滤的多了严重影响性能,可能上一个数据都没计算出来,这一次

翻译-In-Stream Big Data Processing 流式大数据处理

相当长一段时间以来,大数据社区已经普遍认识到了批量数据处理的不足.很多应用都对实时查询和流式处理产生了迫切需求.最近几年,在这个理念的推动下,催生出了一系列解决方案,Twitter Storm,Yahoo S4,Cloudera Impala,Apache Spark和Apache Tez纷纷加入大数据和NoSQL阵营.本文尝试探讨流式处理系统用到的技术,分析它们与大规模批量处理和OLTP/OLAP数据库的关系,并探索一个统一的查询引擎如何才能同时支持流式.批量和OLAP处理. 在Grid Dy

从Storm和Spark Streaming学习流式实时分布式计算系统的设计要点

0. 背景 最近我在做流式实时分布式计算系统的架构设计,而正好又要参见CSDN博文大赛的决赛.本来想就写Spark源码分析的文章吧.但是又想毕竟是决赛,要拿出一些自己的干货出来,仅仅是源码分析貌似分量不够.因此,我将最近一直在做的系统架构的思路整理出来,形成此文.为什么要参考Storm和Spark,因为没有参照效果可能不会太好,尤其是对于Storm和Spark由了解的同学来说,可能通过对比,更能体会到每个具体实现背后的意义. 本文对流式系统出现的背景,特点,数据HA,服务HA,节点间和计算逻辑间