Flink去重第二弹:SQL方式

Flink去重第一弹:MapState去重中介绍了使用编码方式完成去重,但是这种方式开发周期比较长,我们可能需要针对不同的业务逻辑实现不同的编码,对于业务开发来说也需要熟悉Flink编码,也会增加相应的成本,我们更多希望能够以sql的方式提供给业务开发完成自己的去重逻辑。本篇介绍如何使用sql方式完成去重。
为了与离线分析保持一致的分析语义,Flink SQL 中提供了distinct去重方式,使用方式:

  1. SELECT DISTINCT devId FROM pv

表示对设备ID进行去重,得到一个明细结果,那么我们在使用distinct来统计去重结果通常有两种方式, 仍然以统计每日网站uv为例。

第一种方式

  1. SELECT datatime,count(DISTINCT devId) FROM pv group by datatime

该语义表示计算网页每日的uv数量,其内部核心实现主要依靠DistinctAccumulator与CountAccumulator,DistinctAccumulator 内部包含一个map结构,key 表示的是distinct的字段,value表示重复的计数,CountAccumulator就是一个计数器的作用,这两部分都是作为动态生成聚合函数的中间结果accumulator,透过之前的聚合函数的分析可知中间结果是存储在状态里面的,也就是容错并且具有一致性语义的
其处理流程是:

  1. 将devId 添加到对应的DistinctAccumulator对象中,首先会判断map中是否存在该devId, 不存在则插入map中并且将对应value记1,并且返回True;存在则将对应的value+1更新到map中,并且返回False
  2. 只有当返回True时才会对CountAccumulator做累加1的操作,以此达到计数目的

第二种方式

  1. select count(*),datatime from(
  2. select distinct devId,datatime from pv ) a
  3. group by datatime

内部是一个对devId,datatime 进行distinct的计算,在flink内部会转换为以devId,datatime进行分组的流并且进行聚合操作,在内部会动态生成一个聚合函数,该聚合函数createAccumulators方法生成的是一个Row(0) 的accumulator 对象,其accumulate方法是一个空实现,也就是该聚合函数每次聚合之后返回的结果都是Row(0),通过之前对sql中聚合函数的分析(可查看GroupAggProcessFunction函数源码), 如果聚合函数处理前后得到的值相同那么可能会不发送该条结果也可能发送一条撤回一条新增的结果,但是其最终的效果是不会影响下游计算的,在这里我们简单理解为在处理相同的devId,datatime不会向下游发送数据即可,也就是每一对devId,datatime只会向下游发送一次数据;
外部就是一个简单的按照时间维度的计数计算,由于内部每一组devId,datatime 只会发送一次数据到外部,那么外部对应datatime维度的每一个devId都是唯一的一次计数,得到的结果就是我们需要的去重计数结果。

两种方式对比

    1. 这两种方式最终都能得到相同的结果,但是经过分析其在内部实现上差异还是比较大,第一种在分组上选择datatime ,内部使用的累加器DistinctAccumulator 每一个datatime都会与之对应一个对象,在该维度上所有的设备id, 都会存储在该累加器对象的map中,而第二种选择首先细化分组,使用datatime+devId分开存储,然后外部使用时间维度进行计数,简单归纳就是:
      第一种: datatime->Value{devI1,devId2..}
      第二种: datatime+devId->row(0)
      聚合函数中accumulator 是存储在ValueState中的,第二种方式的key会比第一种方式数量上多很多,但是其ValueState占用空间却小很多,而在实际中我们通常会选择Rocksdb方式作为状态后端,rocksdb中value大小是有上限的,第一种方式很容易到达上限,那么使用第二种方式会更加合适;
    2. 这两种方式都是全量保存设备数据的,会消耗很大的存储空间,但是我们的计算通常是带有时间属性的,那么可以通过配置StreamQueryConfig设置状态ttl。

原文地址:https://www.cnblogs.com/pucheung/p/12184771.html

时间: 2024-10-18 00:56:28

Flink去重第二弹:SQL方式的相关文章

Flink去重第一弹:MapState去重

去重计算应该是数据分析业务里面常见的指标计算,例如网站一天的访问用户数.广告的点击用户数等等,离线计算是一个全量.一次性计算的过程通常可以通过distinct的方式得到去重结果,而实时计算是一种增量.长期计算过程,我们在面对不同的场景,例如数据量的大小.计算结果精准度要求等可以使用不同的方案.此篇介绍如何通过编码方式实现精确去重,以一个实际场景为例:计算每个广告每小时的点击用户数,广告点击日志包含:广告位ID.用户设备ID(idfa/imei/cookie).点击时间. 实现步骤分析: 为了当天

Flink去重第三弹:HyperLogLog去重

HyperLogLog算法 也就是基数估计统计算法,预估一个集合中不同数据的个数,也就是我们常说的去重统计,在redis中也存在hyperloglog 类型的结构,能够使用12k的内存,允许误差在0.81%的情况下统计2^64个数据,在这种大数据量情况下能够减少存储空间的消耗,但是前提是允许存在一定的误差.关于HyperLogLog算法原理可以参考这篇文章:https://www.jianshu.com/p/55defda6dcd2里面做了详细的介绍,其算法实现在开源java流式计算库strea

深究angularJS系列 - 第二弹

深究angularJS系列 - 第二弹,在初步了解了Angular的基础上,进一步的针对Angular的控制器和作用域问题深入探究O(∩_∩)O~~ Angular控制器 控制器(Controller)的理解 控制器是对view的抽象,用来接收view的事件,响应view的请求: 控制器包含view的静态属性和动态的方法: 控制器与view是一对一的关系. 控制器(Controller)的结构 1 .controller("控制器的名字",function($scoppe){ 2 ..

线段树第二弹(区间更新)

上篇文章,我们介绍了线段树的基本概念和单点更新.区间查询,今天,我们来接着上次的线段树问题继续深入研究.在解决线段树问题的过程中,我们会遇到要求修改区间中某一元素值的问题,当然也可能会遇到要求修改一段子区间所有值的问题--即区间更新问题.回忆一下上篇文章单点更新的方法是,由叶节点逐级向上进行更新,此时更新一个节点值的时间复杂度为o(log n),(点击链接了解详情:线段树+RMQ问题第二弹),那么以这样的处理效率来进行区间更新结果会怎样?现在假设待更新区间数据的规模为 n ,那么就需要进行 n

黑马程序员:赶紧下载iOS10开发教程第二弹

虽然6月13日WWDC2016的发布会结束了,但是本届大会的开发者session环节还在持续进行着.黑马程序员本着对技术的狂热,对学生负责的态度,仍然坚持每天对课程进行深入的研发.本文主要是黑马程序员对iOS 10 中SDK所更新的主要内容进行总结.根据黑马程序员惯例,在文章的最后,有相关相关教学视频及Demo会有分享链接,供各位下载! 1.Grand Center Dispatch GCD 在本次一更新主要有以下内容: ?创建私有队列 ?安排异步执行的工作项目(items) ?GCD能自动将工

《我与希乐仑》第二弹

致徐敏: 如果你觉得我的这篇报道侵害了你和贵公司的权益,你可以上法院告我,但我说的都是事实,不怕你告,有事找我律师,谢谢! 我是希乐仑科技发展(上海)有限公司前员工,曾经为希乐仑立下汗马功劳.这公司从2014年2月份开始搞我,我去年的绩效是3.8/5.0,完全没有绩效问题.他们倒好,自从我查完我们公司某商业间谍之后,就给我穿小鞋,说我这个不好,那个拖延,这不是扯淡吗?公司在3月5日非法把我裁掉,而且直到现在还未支付我2月份工资,行吧,那我就不再沉默了,当我吃素的是吧!我现在把这件事情公之于众,望

【学】数组去重的3种方式

数组去重的3种方式 var arr = [1,4,2,3,4,5,6,7,3,4,5,23,2,3,4,5,3,2,3,4,5];   function findInArray(n,arr){ for (var i=0; i<arr.length; i++) { if(arr[i]==n){ return true; } } return false; }   function removeRep1(arr){ //方法2 var arr1 = []; for (var i=0; i<arr.

C/C++中容器vector使用方法&lt;第二弹&gt;

此文总结常用vector操作,是前一篇的续作!只有代码,详细请看代码中的注释.出于反爬虫的目的,你不是在http://blog.csdn.net/zhanh1218上看到的,肯定不是最新最全的. /********************************************************************* * file_name: vector_test.cpp * * Created on: 2014年6月28日 下午3:34:23 * Author: The_T

MongoDB第二弹——基本操作

1 查看各个项目的Project ID编号 mysql -uroot -h10.10.2xx.xx show databases; use bugfree2; desc bf_TestProject; select ProjectID,ProjectName from bf_TestProject;(查询结果如下) 2 在/var/www/html/bugfree/BugFile路径下创建文件夹 mkdir Project2  Project3  Project4  Project5  Proj