Flink去重第三弹:HyperLogLog去重

HyperLogLog算法 也就是基数估计统计算法,预估一个集合中不同数据的个数,也就是我们常说的去重统计,在redis中也存在hyperloglog 类型的结构,能够使用12k的内存,允许误差在0.81%的情况下统计2^64个数据,在这种大数据量情况下能够减少存储空间的消耗,但是前提是允许存在一定的误差。关于HyperLogLog算法原理可以参考这篇文章:https://www.jianshu.com/p/55defda6dcd2里面做了详细的介绍,其算法实现在开源java流式计算库stream-lib提供了其具体实现代码,由于代码比较长就不贴出来(可以后台回复hll ,获取flink使用hll去重的完整代码)。

测试一下其使用效果,准备了97320不同数据:

public static void main(String[] args) throws Exception{

        String filePath = "000000_0";        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));

        Set<String> values =new HashSet<>();        HyperLogLog logLog=new HyperLogLog(0.01); //允许误差

        String line = "";        while ((line = br.readLine()) != null) {            String[] s = line.split(",");            String uuid = s[0];            values.add(uuid);            logLog.offer(uuid);        }

        long rs=logLog.cardinality();    }

当误差值为0.01 时; rs为98228,需要内存大小int[1366] //内部数据结构
当误差值为0.001时;rs为97304 ,需要内存大小int[174763]
误差越小也就越来越接近其真实数据,但是在这个过程中需要的内存也就越来越大,这个取舍可根据实际情况决定。

在开发中更多希望通过sql方式来完成,那么就将hll与udaf结合起来使用,实现代码如下:

public class HLLDistinctFunction extends AggregateFunction<Long,HyperLogLog> {

    @Override public HyperLogLog createAccumulator() {        return new HyperLogLog(0.001);    }

    public void accumulate(HyperLogLog hll,String id){      hll.offer(id);    }

    @Override public Long getValue(HyperLogLog accumulator) {        return accumulator.cardinality();    }}

定义的返回类型是long 也就是去重的结果,accumulator是一个HyperLogLog类型的结构。

测试:

case class AdData(id:Int,devId:String,datatime:Long)object Distinct1 {  def main(args: Array[String]): Unit = {    val env=StreamExecutionEnvironment.getExecutionEnvironment    val tabEnv=StreamTableEnvironment.create(env)    tabEnv.registerFunction("hllDistinct",new HLLDistinctFunction)    val kafkaConfig=new Properties()   kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")    kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1")    val consumer=new FlinkKafkaConsumer[String]("topic1",new SimpleStringSchema,kafkaConfig)    consumer.setStartFromLatest()    val ds=env.addSource(consumer)      .map(x=>{        val s=x.split(",")        AdData(s(0).toInt,s(1),s(2).toLong)      })    tabEnv.registerDataStream("pv",ds)    val rs=tabEnv.sqlQuery(      """ select hllDistinct(devId) ,datatime                                          from pv group by datatime      """.stripMargin)    rs.writeToSink(new PaulRetractStreamTableSink)    env.execute()  }}

准备测试数据

1,devId1,15778080000001,devId2,15778080000001,devId1,1577808000000

得到结果:

4> (true,1,1577808000000)4> (false,1,1577808000000)4> (true,2,1577808000000)

其基本使用介绍到这里,后续还将进一步优化。

原文地址:https://www.cnblogs.com/pucheung/p/12184785.html

时间: 2024-10-14 00:22:46

Flink去重第三弹:HyperLogLog去重的相关文章

js实现数组去重的三个方法、数组的快速排序

一:数组去重方法1 (思路:新建一个空的result数组,将需要去重的数组中的第一个元素传入,依次与剩余的元素进行对比,不重复则把元素传入到result数组中.) Array.prototype.removeDup = function(){ var result = [this[0]]; for(var i = 1 ; i<this.length; i++){ var repeat = false; for(var j = 0; j<result.length;j++){ if(this[i

日均百万PV架构第三弹(分布内容为王)

接续接上篇 缓存时代来临 为蓝本,继续改造我们的百万级站点架构,这次我们 拿之前存储静态内容的 nfs 开刀,众所周知 nfs 的多台集群节点下可能由于多重 原因(磁盘io , 网络带宽, 并发场景),不适合做文件共享系统的基础结构. 互联网站点中,存在大量图片或其他静态内容,并且这些内容一般在1M之内,对于 海量小文件,我们将采用mogilefs分布式文件系统来完成.其中概念自行google. # mogilefs分布式文件系统工作流程 架构已经愈发复杂,我们需要从新梳理一下.从下表中应该很容

《我与希乐仑》第三弹

此案编号:黄劳人仲 (2014) 办字 第518号 开庭前,发生了一个戏剧性的场景,对方来了三个人,一位是外服的李盼,一位是徐敏,但还有一位叫王霞的,尽然说是Selerant的HR,扯淡嘛!此人来审判庭的目的非常值得怀疑,首先我有充分的证据证明她不属于Selerant,很简单!我们可以去调阅她的人事档案,至少在2014年3月26日当天,她肯定不属于Selerant,因为我清楚地记得她是徐敏的朋友,所以罗,纯粹是打酱油的,也或许是来拉关系的.你说一个不相干的人,庭上基本没说话,那她来干什么?这又不

codechef营养题 第三弹

第三弾が始まる! codechef problems 第三弹 一.Motorbike Racing 题面 It's time for the annual exciting Motorbike Race in Byteland. There are N motorcyclists taking part in the competition. Johnny is watching the race. At the present moment (time 0), Johnny has taken

深究angularJS系列 - 第三弹

深究angularJS系列 - 初识 深究angularJS系列 - 第二弹 深究angularJS系列 - 第三弹,我们一起深入探究angular的服务和自定义指令O(∩_∩)O~~ Angular服务 $http: $http是angular中的一个核心服务; $http利用浏览器的xmlhttprequest或JSONP与远程HTTP服务器进行交互; $http的支持多种method的请求,get.post.put.delete.jsonp等. 下面通过JSONP方法进行$http服务的使

Android Window PhoneWindow Activity学习心得--第三弹

Android Window  PhoneWindow Activity学习心得--第三弹 前面 我们完成了从Activity到PhoneWindow的整体跨度 正如我们所知道的与Activity组件关联的一个应用程序窗口视图对象关联一个ViewRoot对象,而将 一个Activity组件的应用程序窗口视图对象与一个ViewRoot对象关联是通过该Activity组件所使用的 窗口管理器(WindowManager)来执行的. 在我们初始化DecorView完成之后,我们需要关联应用程序窗口视图

[爱上Swift]第三弹:使用Swift建立App基本基石

搭架子 首先这次我们会主要使用IOS自带的导航Controller为一个APP建立一个简单的基石, 新建一个空的Application并创建3个swift文件,分别命名为:FirstViewController,SecondViewController,ThirdViewController; 同时在三个Swift的Controller中重写继承类的viewDidLoad()方法: override func viewDidLoad(){ super.viewDidLoad(); } 在整个程序

Android 特效View第三弹之闪烁View

Android  特效View第三弹之闪烁View 动态效果图我只做了半天还是失败了,给一个截图,剩下的全靠想象了 <FrameLayout android:layout_width="match_parent" android:layout_height="match_parent" > <com.example.empty.FlickerTextView android:id="@+id/flicker" android:l

AndroidStudio使用教程(第三弹)

AndroidStudio使用教程(第三弹) 熟悉了基本的使用之后,可能关心的就是版本控制了. SVN 下载Subversion command line 方法一 下载地址是Subversion里面有不同系统的版本. 以Windows为例,我们采用熟悉的VisualSVN. 进入下载页后下载Apache Subversion command line tools, 解压即可. 方法二 Windows下的Tortoise SVN也是带有command line的,但是安装的时候默认是不安装这个选项