场景分析
用户行为分析应用的场景很多,像线上网站访问统计,线下客流分析(比如图像人脸识别、wifi探针等),比较核心的指标有几个:
PV | UV | SD | SC
指标说明:
PV(Page View):网站浏览量或者商场门店的访问量
UV(Unique Visitor):独立访客数,即去重后的人数
SD(Session Duration):单次会话停留时间
SC(Session Count):会话次数
用户行为分析的原始数据通常是一系列时间离散数据,比如网站访问记录:用户在一个时间点访问了一个网页,然后又在下个时间点访问了下个网页;
这些原始数据可以抽象为:
User | Timestamp | Target
即用户在什么时间点访问了什么目标;
统计PV、UV比较简单,但是在时间离散数据的基础上,要计算SD、SC这两个指标,常用的方式是设置过期时间阈值,如果用户两次访问的时间间隔超过阈值,则认为是两次Session;然后在一次Session的所有数据中取时间最早和最晚的数据来统计本次Session Duration;
统计示例
输入数据
(user1, 2018-12-01 01:00:00, t1)
(user1, 2018-12-01 01:01:30, t1)
(user1, 2018-12-01 01:06:00, t1)
(user1, 2018-12-01 01:20:00, t1)
(user1, 2018-12-01 01:24:00, t1)
可以统计出
PV=5,UV=1
过期时间阈值设置为5分钟,以上数据应该统计出来2次Session,分别是:
Session1: (2018-12-01 01:00:00 到 2018-12-01 01:06:00),Duration:6分钟
Session2: (2018-12-01 01:20:00 到 2018-12-01 01:24:00),Duration:4分钟
实际处理时还要数据乱序的问题,尤其是在实时计算中,你想好怎样做了吗?
Scala代码实现
下面给出scala实现,来见证scala的强大:
scala核心代码(一步foldLeft)
scala
val expireInSecond = 300 def mergeTimeArray(arr1 : ArrayBuffer[(Long, Long)], arr2 : ArrayBuffer[(Long, Long)]) : ArrayBuffer[(Long, Long)] = { if (arr1.head._1.equals(0l)) arr2 else if (arr2.head._1.equals(0l)) arr1 else (arr1 ++ arr2).sortBy(_._1).foldLeft(ArrayBuffer[(Long, Long)]())((result, item) => if (!result.isEmpty && result.last._2 + expireInSecond >= item._1) {result.update(result.length - 1, (result.last._1, math.max(result.last._2, item._2))); result} else result += item) }
spark核心代码(2步map 1步aggregateByKey)
scala
/** * @param data (user, timestamp, target) * @return (user, target, session_count, session_duration) */ def process(data : RDD[(String, Long, String)]) : RDD[(String, String, Integer, Double)] = { //((user, target), timestamp) data.map(item => ((item._1, item._3), item._2)) //((user, target), Array[(startTime, endTime)]) .aggregateByKey(ArrayBuffer((0l, 0l)))((result : ArrayBuffer[(Long, Long)], timestamp: Long) => mergeTimeArray(result, ArrayBuffer((timestamp, timestamp))), (result1 : ArrayBuffer[(Long, Long)], result2 : ArrayBuffer[(Long, Long)]) => mergeTimeArray(result1, result2)) //(user, target, session_count, session_duration) .map(item => (item._1._1, item._1._2, item._2.length, item._2.foldLeft(0l)((result, item) => result + (item._2 - item._1)).toDouble / item._2.length)) }
测试运行
def main(args : Array[String]) : Unit = { val conf = new SparkConf().setAppName("UserAnalysis").setMaster("local[2]") val sc = new SparkContext(conf) val arr = Array(("user1", 1546054000l, "t1"), ("user1", 1546054090l, "t1"), ("user1", 1546054360l, "t1"), ("user1", 1546055200l, "t1"), ("user1", 1546055440l, "t1")) //(user, timestamp, target) val data : RDD[(String, Long, String)] = sc.parallelize(arr) this.process(data).foreach(println) }
输出
(user1,t1,2,300.0)
原文地址:https://www.cnblogs.com/barneywill/p/10193591.html