研究背景
互联网行业越来越重视自家客户的一些行为偏好了,无论是电商行业还是金融行业,基于用户行为可以做出很多东西,电商行业可以归纳出用户偏好为用户推荐商品,金融行业可以把用户行为作为反欺诈的一个点,本文主要介绍其中一个重要的功能点,基于行为日志统计用户行为路径,为运营人员提供更好的运营决策。可以实现和成熟产品如adobe analysis类似的用户行为路径分析。最终效果如图。使用的是开源大数据可视化工具。如图所示,用户行为路径的数据非常巨大,uv指标又不能提前计算好(时间段未定),如果展示5级,一个页面的数据量就是10的5次方,如果有几千个页面,数据量是无法估量的,所以只能进行实时计算,而Spark非常适合迭代计算,基于这样的考虑,Spark是不错的选择。
解决方案
流程描述
客户搜索某一起始页面的行为路径明细数据时,RPC请求到后台,Spark程序实时计算返回数据,Java解析数据并展现。
准备工作
1.首先要有行为数据啦,用户行为日志数据必须包含必须包含以下四个字段,访问时间、设备指纹、会话id、页面名称,其中页面名称可以自行定义,用来标示一种或者一类页面,此页面名称最好不要有重复。
2.然后对行为日志进行一级清洗(基于Hive),将数据统一清洗成如下格式
设备指纹 | 会话id | 页面路径(按时间升序 | 时间 |
fpid1 | sessionid1 | A_B_C_D_E_F_G | 2017-01-13 |
A、B、C代表页面名称,清洗过程采用row_number函数,concat_ws函数,具体用法可以百度。清洗完之后落地到hive表,后续会用到。T+1清洗此数据。
3.弄清楚递归的定义
Spark处理
流程概述:
1.构建一个多叉树的类,类主要属性描述,name全路径如A_B_C,childList儿子链表,多叉树的构建和递归参考了这里
2.按时间读取上一步的数据,递归计算每一级页面的属性指标,并根据页面路径插入到初始化的Node类根节点中。
3.递归遍历上一步初始化的根节点对象,并替换其中的name的id为名称,其中借助Spark DataFrame查询数据。
4.将root对象转化成json格式,返回前端。
附上代码如下。
import java.util import com.google.gson.Gson import org.apache.spark.SparkContext import org.apache.log4j.{Level, Logger => LG} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.hive.HiveContext /** * 用户行为路径实时计算实现 * Created by chouyarn on 2016/12/12. */ /** * 树结构类 * * @param name 页面路径 * @param visit 访次 * @param pv pv * @param uv uv * @param childList 儿子链表 */ class Node( var name: String, var path:Any, var visit: Any, var pv: Any, var uv: Any, var childList: util.ArrayList[Node]) extends Serializable { /** * 添加子节点 * * @param node 子节点对象 * @return */ def addNode(node: Node) = { childList.add(node) } /** * 遍历节点,深度优先 */ def traverse(): Unit = { if (childList.isEmpty) return // node. val childNum = childList.size for (i <- 0 to childNum - 1) { val child: Node = childList.get(i) child.name = child.name.split("_").last//去除前边绝对路径 child.traverse() } } /** * 遍历节点,深度优先 */ def traverse(pages:DataFrame): Unit = { if (childList.isEmpty||childList.size()==0) return // node. val childNum = childList.size for (i <- 0 to childNum - 1) { val child: Node = childList.get(i) child.name = child.name.split("_").last val id =pages.filter("page_id=‘"+child.name+"‘").select("page_name").first().getString(0)//替换id为name child.name = id child.traverse(pages) } } /** * 动态插入节点 * * @param node 节点对象 * @return */ def insertNode(node: Node): Boolean = { val insertName = node.name if (insertName.stripSuffix("_" + insertName.split("_").last).equals(name)) { // node.name=node.name.split("_").last addNode(node) true } else { val childList1 = childList val childNum = childList1.size var insetFlag = false for (i <- 0 to childNum - 1) { val childNode = childList1.get(i) insetFlag = childNode.insertNode(node) if (insetFlag == true) true } false } } } /** * 处理类 */ class Path extends CleanDataWithRDD { LG.getRootLogger.setLevel(Level.ERROR)//控制spark日志输出级别 val sc: SparkContext = SparkUtil.createSparkContextYarn("path") val hiveContext = new HiveContext(sc) override def handleData(conf: Map[String, String]): Unit = { val num = conf.getOrElse("depth", 5)//路径深度 val pageName = conf.getOrElse("pageName", "")//页面名称 // val pageName = "A_C" val src = conf.getOrElse("src", "")//标示来源pc or wap val pageType = conf.getOrElse("pageType", "")//向前或者向后路径 val startDate = conf.getOrElse("startDate", "")//开始日期 val endDate = conf.getOrElse("endDate", "")//结束日期 // 保存log缓存以保证后续使用 val log = hiveContext.sql(s"select fpid,sessionid,path " + s"from specter.t_pagename_path_sparksource " + s"where day between ‘$startDate‘ and ‘$endDate‘ and path_type=$pageType and src=‘$src‘ ") .map(s => { (s.apply(0) + "_" + s.apply(1) + "_" + s.apply(2)) }).repartition(10).persist() val pages=hiveContext.sql("select page_id,page_name from specter.code_pagename").persist()//缓存页面字典表 // 本地测试数据 // val log = sc.parallelize(Seq("fpid1_sessionid1_A_B", // "fpid2_sessionid2_A_C_D_D_B_A_D_A_F_B", // "fpid1_sessionid1_A_F_A_C_D_A_B_A_V_A_N")) var root: Node = null /** * 递归将计算的节点放入树结构 * * @param pageName 页面名称 */ def compute(pageName: String): Unit = { val currenRegex = pageName.r //页面的正则表达式 val containsRdd = log.filter(_.contains(pageName)).persist() //包含页面名称的RDD,后续步骤用到 val currentpv = containsRdd.map(s => {//计算pv currenRegex findAllIn (s) }).map(_.mkString(",")) .flatMap(_.toString.split(",")) .filter(_.size > 0) .count() val tempRdd = containsRdd.map(_.split("_")).persist() //分解后的RDD val currentuv = tempRdd.map(_.apply(0)).distinct().count() //页面uv val currentvisit = tempRdd.map(_.apply(1)).distinct().count() //页面访次 // 初始化根节点或添加节点 if (root == null) { root = new Node(pageName,pageName.hashCode, currentvisit, currentpv, currentuv, new util.ArrayList[Node]()) } else { root.insertNode(new Node(pageName,pageName.hashCode, currentvisit, currentpv, currentuv, new util.ArrayList[Node]())) } if (pageName.split("_").size == 5||tempRdd.isEmpty()) {//递归出口 return } else { // 确定下个页面名称正则表达式 val nextRegex = s"""${pageName}_[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}""".r // 本地测试 // val nextRegex =s"""${pageName}_[A-Z]""".r val nextpvMap = containsRdd.map(s => {//下一级路径的pv数top9 nextRegex findAllIn (s) }).map(_.mkString(",")) .flatMap(_.toString.split(",")) .filter(_.size > 0) .map(s => (s.split("_").last, 1)) .filter(!_._1.contains(pageName.split("_")(0))) .reduceByKey(_ + _).sortBy(_._2, false).take(9).toMap nextpvMap.keySet.foreach(key => {//递归计算 compute(pageName + "_" + key) }) } } //触发计算 compute(pageName) val gson: Gson = new Gson() root.traverse(pages) root.name=pages.filter("page_id=‘"+pageName+"‘").select("page_name").first().getString(0) println(gson.toJson(root))//转化成JSON并打印,Alibaba fsatjson不可用,还是google得厉害。 } override def stop(): Unit = { sc.stop() } } object Path { def main(args: Array[String]): Unit = { // println("ss".hashCode) var num=5 try { num=args(5).toInt }catch { case e:Exception => } val map = Map("pageName" -> args(0), "pageType" -> args(1), "startDate" -> args(2), "endDate" -> args(3), "src" -> args(4), "depth" -> num.toString) val path = new Path() path.handleData(map) } }
总结
Spark基本是解决了实时计算行为路径的问题,缺点就是延迟稍微有点高,因为提交Job之后要向集群申请资源,申请资源和启动就耗费将近30秒,后续这块可以优化。据说spark-jobserver提供一个restful接口,为Job预启动容器,博主没时间研究有兴趣的可以研究下啦。
fastjson在对复杂对象的转换中不如Google 的Gson。
使用递归要慎重,要特别注意出口条件,若出口不明确,很有可能导致死循环。