Spark Streaming实时计算海量用户UV

提出需求

实时统计业务系统(web,APP之类)的访问人数,即所谓UV,或者DAU指标.

这个需求怕是流计算最最最常见的需求了.

计算UV的关键点就在于去重,即同一个人访问两次是只计一个UV的.在离线计算中统计UV比较容易想到的方法就是用group或distinct机制来去重.但是在实时计算场景,还用group就不太科学了,一个是全量数据的group是比较费时的,第二个是全量数据的group是很费内存和CPU的.特别是当用户量巨大的时候,还要做到秒级更新就更难了.

总结起来,需求就是:海量用户场景UV实时计算.

分享之前我还是要推荐下我自己创建的大数据学习交流Qun531629188

无论是大牛还是想转行想学习的大学生

小编我都挺欢迎,今天的已经资讯上传到群文件,不定期分享干货,

接受挑战

不难发现,问题的主要难点就是去重.

Spark Streaming目前没有给出内置方案(这个其实可以有),但是海量数据去重问题早就有解决办法了. 所以Spark Streaming程序完全可以利用其他系统的现有方案解决去重问题,比如Redis.

Redis的海量去重计数方案

Bitmap方案

所谓的Bitmap就是用一个bit位来标记某个元素对应的Value,比如ID为2的用户,就用第2个bit位来表示,然后用该位的值来表示该用户是否访问过.如果要计算UV,那就只要数一下有多少个1就行啦.

假设我们有40亿用户,使用Bitmap需要2^32个bit位,算下来也就500M左右.

你可能没想到的是,Redis中最常用的数据结构string,就可以实现bitmap算法.

Redis提供了如下命令


1

2

3

4

5

6

7


<span class="hljs-comment"><span class="hljs-comment">// 插入</span></span>

setbit key offset value

<span class="hljs-comment"><span class="hljs-comment">//获取</span></span>

getbit key offset

<span class="hljs-comment"><span class="hljs-comment">//计数</span></span>

BITCOUNT key <span class="hljs-selector-attr"><span class="hljs-selector-attr">[start]</span></span> <span class="hljs-selector-attr"><span class="hljs-selector-attr">[end]</span></span>

这里offset最大值就是2^32. 比如ID为2的用户,可以setbit uv 2 1,来记录. 最后要计算UV,就直接 BITCOUNT uv. 这步计数非常快,复杂度是O(1).

HyperLogLog方案

若要计算很多页面的UV,用bitmap还是比较费空间的,N个页面就得有N个500M.这时候HyperLogLog结构就是一个比较好的选择.

Redis 在 2.8.9 版本添加了 HyperLogLog 结构。 Redis HyperLogLog 是用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定 的、并且是很小的。 在 Redis 里面,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基 数。这和计算基数时,元素越多耗费内存就越多的集合形成鲜明对比。 但是,因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。

也就是说HyperLogLog是一种基数统计算法,计算结果是近似值, 12 KB 内存就可以计算2^64 个不同元素的基数.

Redis命令如下:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18


<span class="hljs-selector-tag"><span class="hljs-selector-tag">redis</span></span> 127<span class="hljs-selector-class"><span class="hljs-selector-class">.0</span></span><span class="hljs-selector-class"><span class="hljs-selector-class">.0</span></span><span class="hljs-selector-class"><span class="hljs-selector-class">.1</span></span><span class="hljs-selector-pseudo"><span class="hljs-selector-pseudo">:6379</span></span>&gt; <span class="hljs-selector-tag"><span class="hljs-selector-tag">PFADD</span></span> <span class="hljs-selector-tag"><span class="hljs-selector-tag">runoobkey</span></span> "<span class="hljs-selector-tag"><span class="hljs-selector-tag">redis</span></span>"

1) (<span class="hljs-selector-tag"><span class="hljs-selector-tag">integer</span></span>) 1

<span class="hljs-selector-tag"><span class="hljs-selector-tag">redis</span></span> 127<span class="hljs-selector-class"><span class="hljs-selector-class">.0</span></span><span class="hljs-selector-class"><span class="hljs-selector-class">.0</span></span><span class="hljs-selector-class"><span class="hljs-selector-class">.1</span></span><span class="hljs-selector-pseudo"><span class="hljs-selector-pseudo">:6379</span></span>&gt; <span class="hljs-selector-tag"><span class="hljs-selector-tag">PFADD</span></span> <span class="hljs-selector-tag"><span class="hljs-selector-tag">runoobkey</span></span> "<span class="hljs-selector-tag"><span class="hljs-selector-tag">mongodb</span></span>"

1) (<span class="hljs-selector-tag"><span class="hljs-selector-tag">integer</span></span>) 1

<span class="hljs-selector-tag"><span class="hljs-selector-tag">redis</span></span> 127<span class="hljs-selector-class"><span class="hljs-selector-class">.0</span></span><span class="hljs-selector-class"><span class="hljs-selector-class">.0</span></span><span class="hljs-selector-class"><span class="hljs-selector-class">.1</span></span><span class="hljs-selector-pseudo"><span class="hljs-selector-pseudo">:6379</span></span>&gt; <span class="hljs-selector-tag"><span class="hljs-selector-tag">PFADD</span></span> <span class="hljs-selector-tag"><span class="hljs-selector-tag">runoobkey</span></span> "<span class="hljs-selector-tag"><span class="hljs-selector-tag">mysql</span></span>"

1) (<span class="hljs-selector-tag"><span class="hljs-selector-tag">integer</span></span>) 1

<span class="hljs-selector-tag"><span class="hljs-selector-tag">redis</span></span> 127<span class="hljs-selector-class"><span class="hljs-selector-class">.0</span></span><span class="hljs-selector-class"><span class="hljs-selector-class">.0</span></span><span class="hljs-selector-class"><span class="hljs-selector-class">.1</span></span><span class="hljs-selector-pseudo"><span class="hljs-selector-pseudo">:6379</span></span>&gt; <span class="hljs-selector-tag"><span class="hljs-selector-tag">PFCOUNT</span></span> <span class="hljs-selector-tag"><span class="hljs-selector-tag">runoobkey</span></span>

(<span class="hljs-selector-tag"><span class="hljs-selector-tag">integer</span></span>) 3

代码实现

下面给出HyperLogLog方案的参考实现:


1

2

3

4

5

6

7

8

9

10

11

12

13


stream.foreachRDD { rdd =&gt;

<span class="hljs-comment"><span class="hljs-comment">//统计人数</span></span>

rdd.foreachPartition { partition =&gt;

<span class="hljs-comment"><span class="hljs-comment">//从分区所属executor的redis线程池获取一个连接.</span></span>

<span class="hljs-keyword">val</span> redis = <span class="hljs-type">RedisUtil</span>.getRedis

partition.<span class="hljs-keyword">foreach</span> { <span class="hljs-keyword"><span class="hljs-keyword">case</span></span> (date, userId) =&gt;

<span class="hljs-comment"><span class="hljs-comment">//统计当前userId</span></span>

redis.pfadd(<span class="hljs-string">s<span class="hljs-string">"uv:</span><span class="hljs-subst"><span class="hljs-string">$date</span></span><span class="hljs-string">"</span></span>, userId)

}

redis.close()

}

}

关于Redis的连接,如果是用java或scala可以使用JedisPool,注意处理序列化即可.

原文地址:http://blog.51cto.com/13750742/2117785

时间: 2024-12-17 21:55:56

Spark Streaming实时计算海量用户UV的相关文章

Spark Streaming实时计算框架介绍

http://www.cnblogs.com/Leo_wl/p/3530464.html 随着大数据的发展,人们对大数据的处理要求也越来越高,原有的批处理框架MapReduce适合离线计算,却无法满足实时性要求较高的业务,如实时推荐.用户行为分析等. Spark Streaming是建立在Spark上的实时计算框架,通过它提供的丰富的API.基于内存的高速执行引擎,用户可以结合流式.批处理和交互试查询应用.本文将详细介绍Spark Streaming实时计算框架的原理与特点.适用场景. Spar

【Streaming】30分钟概览Spark Streaming 实时计算

本文主要介绍四个问题: 什么是Spark Streaming实时计算? Spark实时计算原理流程是什么? Spark 2.X下一代实时计算框架Structured Streaming Spark Streaming相对其他实时计算框架该如何技术选型? 本文主要针对初学者,如果有不明白的概念可了解之前的博客内容. 1.什么是Spark Streaming? 与其他大数据框架Storm.Flink一样,Spark Streaming是基于Spark Core基础之上用于处理实时计算业务的框架.其实

【转】Spark Streaming 实时计算在甜橙金融监控系统中的应用及优化

系统架构介绍 整个实时监控系统的架构是先由 Flume 收集服务器产生的日志 Log 和前端埋点数据, 然后实时把这些信息发送到 Kafka 分布式发布订阅消息系统,接着由 Spark Streaming 消费 Kafka 中的消息,同时消费记录由 Zookeeper 集群统一管理,这样即使 Kafka 宕机重启后也能找到上次的消费记录继而进行消费.在这里 Spark Streaming 首先从 MySQL 读取规则然后进行 ETL 清洗并计算多个聚合指标,最后将结果的一部分存储到 Hbase

在C#中使用二叉树实时计算海量用户积分排名的实现

从何说起 前些天和朋友讨论一个问题,他们的应用有几十万会员然后对应有积分,现在想做积分排名的需求,问有没有什么好方案.这个问题也算常见,很多地方都能看到,常规做法一般是数据定时跑批把计算结果到中间表然后直接查表就行,或者只显示个TOP N的排行榜,名次高的计算真实名次,名次比较低的直接显示在xxx名开外这种.但是出于探索问题的角度,我还是想找一下有没有实时计算的办法,并且效率能够接受. 在博客园搜到一篇不错的文章,基本罗列了常用的方案,每种算法详细介绍了具体思路,其中基于二叉树的算法是个非常不错

Spark Streaming实时流处理项目实战

第1章 课程介绍   1-1 -导学-   1-2 -授课习惯和学习建议   1-3 -OOTB环境使用演示   1-4 -Linux环境及软件版本介绍   1-5 -Spark版本升级第2章 初识实时流处理   2-1 -课程目录   2-2 -业务现状分析   2-3 -实时流处理产生背景   2-4 -实时流处理概述   2-5 -离线计算和实时计算对比   2-6 -实时流处理框架对比   2-7 -实时流处理架构及技术选型   2-8 -实时流处理在企业中的应用第3章 分布式日志收集框

Spark发行版笔记5:贯通Spark Streaming流计算框架的运行源码

本章节内容: 一.在线动态计算分类最热门商品案例回顾 二.基于案例贯通Spark Streaming的运行源码 先看代码(源码场景:用户.用户的商品.商品的点击量排名,按商品.其点击量排名前三): package com.dt.spark.sparkstreaming import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveContext impo

【慕课网实战】Spark Streaming实时流处理项目实战笔记八之铭文升级版

铭文一级: Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Spark Streaming个人的定义: 将不同的数据源的数据经过Spark Streaming处理之后将结果输出到外部文件系统 特点 低延时 能从错误中高效的恢复:fault-toler

(版本定制)第5课:基于案例分析Spark Streaming流计算框架的运行源码

本期内容: 1.在线动态计算分类最热门商品案例回顾与演示 2.基于案例分析Spark Streaming的运行源码 第一部分案例: package com.dt.spark.sparkstreaming import com.robinspark.utils.ConnectionPoolimport org.apache.spark.SparkConfimport org.apache.spark.sql.Rowimport org.apache.spark.sql.hive.HiveConte

第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1 在线动态计算分类最热门商品案例回顾与演示 2 基于案例贯通Spark Streaming的运行源码 一.案例代码 在线动态计算电商中不同类别中最热门的商品排名,例如:手机类别中最热门的三种手机.电视类别中最热门的三种电视等 package com.dt.spark.sparkstreaming import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.