基于Spark的用户行为路径分析

研究背景

互联网行业越来越重视自家客户的一些行为偏好了,无论是电商行业还是金融行业,基于用户行为可以做出很多东西,电商行业可以归纳出用户偏好为用户推荐商品,金融行业可以把用户行为作为反欺诈的一个点,本文主要介绍其中一个重要的功能点,基于行为日志统计用户行为路径,为运营人员提供更好的运营决策。可以实现和成熟产品如adobe analysis类似的用户行为路径分析。最终效果如图。使用的是开源大数据可视化工具。如图所示,用户行为路径的数据非常巨大,uv指标又不能提前计算好(时间段未定),如果展示5级,一个页面的数据量就是10的5次方,如果有几千个页面,数据量是无法估量的,所以只能进行实时计算,而Spark非常适合迭代计算,基于这样的考虑,Spark是不错的选择。

解决方案

流程描述

客户搜索某一起始页面的行为路径明细数据时,RPC请求到后台,Spark程序实时计算返回数据,Java解析数据并展现。

准备工作

1.首先要有行为数据啦,用户行为日志数据必须包含必须包含以下四个字段,访问时间、设备指纹、会话id、页面名称,其中页面名称可以自行定义,用来标示一种或者一类页面,此页面名称最好不要有重复。

2.然后对行为日志进行一级清洗(基于Hive),将数据统一清洗成如下格式

设备指纹 会话id 页面路径(按时间升序 时间
fpid1 sessionid1 A_B_C_D_E_F_G 2017-01-13

A、B、C代表页面名称,清洗过程采用row_number函数,concat_ws函数,具体用法可以百度。清洗完之后落地到hive表,后续会用到。T+1清洗此数据。

3.弄清楚递归的定义

Spark处理

流程概述:

1.构建一个多叉树的类,类主要属性描述,name全路径如A_B_C,childList儿子链表,多叉树的构建和递归参考了这里

2.按时间读取上一步的数据,递归计算每一级页面的属性指标,并根据页面路径插入到初始化的Node类根节点中。

3.递归遍历上一步初始化的根节点对象,并替换其中的name的id为名称,其中借助Spark DataFrame查询数据。

4.将root对象转化成json格式,返回前端。

附上代码如下。

import java.util

import com.google.gson.Gson
import org.apache.spark.SparkContext
import org.apache.log4j.{Level, Logger => LG}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.hive.HiveContext

/**
  * 用户行为路径实时计算实现
  * Created by chouyarn on 2016/12/12.
  */

/**
  * 树结构类
  *
  * @param name      页面路径
  * @param visit     访次
  * @param pv        pv
  * @param uv        uv
  * @param childList 儿子链表
  */
class Node(
            var name: String,
            var path:Any,
            var visit: Any,
            var pv: Any,
            var uv: Any,
            var childList: util.ArrayList[Node]) extends Serializable {
  /**
    * 添加子节点
    *
    * @param node 子节点对象
    * @return
    */
  def addNode(node: Node) = {
    childList.add(node)
  }

  /**
    * 遍历节点,深度优先
    */
  def traverse(): Unit = {
    if (childList.isEmpty)
      return
    //    node.
    val childNum = childList.size

    for (i <- 0 to childNum - 1) {
      val child: Node = childList.get(i)
      child.name = child.name.split("_").last//去除前边绝对路径
      child.traverse()
    }
  }

  /**
    * 遍历节点,深度优先
    */
  def traverse(pages:DataFrame): Unit = {
    if (childList.isEmpty||childList.size()==0)
      return
    //    node.
    val childNum = childList.size

    for (i <- 0 to childNum - 1) {
      val child: Node = childList.get(i)
      child.name = child.name.split("_").last
      val id =pages.filter("page_id=‘"+child.name+"‘").select("page_name").first().getString(0)//替换id为name
      child.name = id
      child.traverse(pages)
    }
  }

  /**
    * 动态插入节点
    *
    * @param node 节点对象
    * @return
    */
  def insertNode(node: Node): Boolean = {
    val insertName = node.name
    if (insertName.stripSuffix("_" + insertName.split("_").last).equals(name)) {
      //      node.name=node.name.split("_").last
      addNode(node)
      true
    } else {
      val childList1 = childList
      val childNum = childList1.size
      var insetFlag = false
      for (i <- 0 to childNum - 1) {
        val childNode = childList1.get(i)
        insetFlag = childNode.insertNode(node)
        if (insetFlag == true)
          true
      }
      false
    }
  }
}

/**
  * 处理类
  */
class Path extends CleanDataWithRDD {
  LG.getRootLogger.setLevel(Level.ERROR)//控制spark日志输出级别

  val sc: SparkContext = SparkUtil.createSparkContextYarn("path")
  val hiveContext = new HiveContext(sc)

  override def handleData(conf: Map[String, String]): Unit = {

    val num = conf.getOrElse("depth", 5)//路径深度
    val pageName = conf.getOrElse("pageName", "")//页面名称
    //    val pageName = "A_C"
    val src = conf.getOrElse("src", "")//标示来源pc or wap

    val pageType = conf.getOrElse("pageType", "")//向前或者向后路径
    val startDate = conf.getOrElse("startDate", "")//开始日期
    val endDate = conf.getOrElse("endDate", "")//结束日期
    //        保存log缓存以保证后续使用
    val log = hiveContext.sql(s"select fpid,sessionid,path " +
      s"from specter.t_pagename_path_sparksource " +
      s"where day between ‘$startDate‘ and ‘$endDate‘ and path_type=$pageType and src=‘$src‘ ")
      .map(s => {
        (s.apply(0) + "_" + s.apply(1) + "_" + s.apply(2))
      }).repartition(10).persist()

    val pages=hiveContext.sql("select page_id,page_name from specter.code_pagename").persist()//缓存页面字典表
    // 本地测试数据
    // val log = sc.parallelize(Seq("fpid1_sessionid1_A_B",
    //      "fpid2_sessionid2_A_C_D_D_B_A_D_A_F_B",
    //      "fpid1_sessionid1_A_F_A_C_D_A_B_A_V_A_N"))
    var root: Node = null
    /**
      * 递归将计算的节点放入树结构
      *
      * @param pageName 页面名称
      */
    def compute(pageName: String): Unit = {
      val currenRegex = pageName.r //页面的正则表达式
      val containsRdd = log.filter(_.contains(pageName)).persist() //包含页面名称的RDD,后续步骤用到
      val currentpv = containsRdd.map(s => {//计算pv
        currenRegex findAllIn (s)
      }).map(_.mkString(","))
        .flatMap(_.toString.split(","))
        .filter(_.size > 0)
        .count()

      val tempRdd = containsRdd.map(_.split("_")).persist() //分解后的RDD
      val currentuv = tempRdd.map(_.apply(0)).distinct().count() //页面uv
      val currentvisit = tempRdd.map(_.apply(1)).distinct().count() //页面访次

      //      初始化根节点或添加节点
      if (root == null) {
        root = new Node(pageName,pageName.hashCode, currentvisit, currentpv, currentuv, new util.ArrayList[Node]())
      } else {
        root.insertNode(new Node(pageName,pageName.hashCode, currentvisit, currentpv, currentuv, new util.ArrayList[Node]()))
      }

      if (pageName.split("_").size == 5||tempRdd.isEmpty()) {//递归出口
        return
      } else {
        //          确定下个页面名称正则表达式
        val nextRegex =
        s"""${pageName}_[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}""".r
        // 本地测试
        // val nextRegex =s"""${pageName}_[A-Z]""".r
        val nextpvMap = containsRdd.map(s => {//下一级路径的pv数top9
          nextRegex findAllIn (s)
        }).map(_.mkString(","))
          .flatMap(_.toString.split(","))
          .filter(_.size > 0)
          .map(s => (s.split("_").last, 1))
          .filter(!_._1.contains(pageName.split("_")(0)))
          .reduceByKey(_ + _).sortBy(_._2, false).take(9).toMap

        nextpvMap.keySet.foreach(key => {//递归计算
          compute(pageName + "_" + key)
        })
      }
    }
    //触发计算
    compute(pageName)
    val gson: Gson = new Gson()

    root.traverse(pages)
    root.name=pages.filter("page_id=‘"+pageName+"‘").select("page_name").first().getString(0)
    println(gson.toJson(root))//转化成JSON并打印,Alibaba fsatjson不可用,还是google得厉害。
  }

  override def stop(): Unit = {
    sc.stop()
  }
}

object Path {
  def main(args: Array[String]): Unit = {
    //    println("ss".hashCode)
    var num=5
    try {
      num=args(5).toInt
    }catch {
      case e:Exception =>
    }

    val map = Map("pageName" -> args(0),
      "pageType" -> args(1),
      "startDate" -> args(2),
      "endDate" -> args(3),
      "src" -> args(4),
      "depth" -> num.toString)
    val path = new Path()
    path.handleData(map)
  }
}

总结

Spark基本是解决了实时计算行为路径的问题,缺点就是延迟稍微有点高,因为提交Job之后要向集群申请资源,申请资源和启动就耗费将近30秒,后续这块可以优化。据说spark-jobserver提供一个restful接口,为Job预启动容器,博主没时间研究有兴趣的可以研究下啦。

fastjson在对复杂对象的转换中不如Google 的Gson。

使用递归要慎重,要特别注意出口条件,若出口不明确,很有可能导致死循环。

时间: 2024-10-20 17:11:49

基于Spark的用户行为路径分析的相关文章

基于Spark MLlib平台的协同过滤算法---电影推荐系统

基于Spark MLlib平台的协同过滤算法---电影推荐系统 又好一阵子没有写文章了,阿弥陀佛...最近项目中要做理财推荐,所以,回过头来回顾一下协同过滤算法在推荐系统中的应用. 说到推荐系统,大家可能立马会想到协同过滤算法.本文基于Spark MLlib平台实现一个向用户推荐电影的简单应用.其中,主要包括三部分内容: 协同过滤算法概述 基于模型的协同过滤应用---电影推荐 实时推荐架构分析     一.协同过滤算法概述 本人对算法的研究,目前还不是很深入,这里简单的介绍下其工作原理. 通常,

基于spark排序的一种更廉价的实现方案-附基于spark的性能测试

排序可以说是很多日志系统的硬指标(如按照时间逆序排序),如果一个大数据系统不能进行排序,基本上是这个系统属于不可用状态,排序算得上是大数据系统的一个"刚需",无论大数据采用的是hadoop,还是spark,还是impala,hive,总之排序是必不可少的,排序的性能测试也是必不可少的. 有着计算奥运会之称的Sort Benchmark全球排序每年都会举行一次,每年巨头都会在排序上进行巨大的投入,可见排序速度的高低有多么重要!但是对于大多数企业来说,动辄上亿的硬件投入,实在划不来.甚至远

基于Spark ALS构建商品推荐引擎

基于Spark ALS构建商品推荐引擎 一般来讲,推荐引擎试图对用户与某类物品之间的联系建模,其想法是预测人们可能喜好的物品并通过探索物品之间的联系来辅助这个过程,让用户能更快速.更准确的获得所需要的信息,提升用户的体验.参与度以及物品对用户的吸引力. 在开始之前,先了解一下推荐模型的分类: 1.基于内容的过滤:利用物品的内容或是属性信息以及某些相似度定义,求出与该物品类似的物品 2.协同过滤:利用大量已有的用户偏好来估计用户对其未接触过的物品的喜好程度 3.矩阵分解(包括显示矩阵分解.隐式矩阵

京东基于Spark的风控系统架构实践和技术细节

京东基于Spark的风控系统架构实践和技术细节 时间 2016-06-02 09:36:32  炼数成金 原文  http://www.dataguru.cn/article-9419-1.html 主题 Spark软件架构 1.背景 互联网的迅速发展,为电子商务兴起提供了肥沃的土壤.2014年,中国电子商务市场交易规模达到13.4万亿元,同比增长31.4%.其中,B2B电子商务市场交易额达到10万亿元,同比增长21.9%.这一连串高速增长的数字背后,不法分子对互联网资产的觊觎,针对电商行业的恶

基于Spark的异构分布式深度学习平台

导读:本文介绍百度基于Spark的异构分布式深度学习系统,把Spark与深度学习平台PADDLE结合起来解决PADDLE与业务逻辑间的数据通路问题,在此基础上使用GPU与FPGA异构计算提升每台机器的数据处理能力,使用YARN对异构资源做分配,支持Multi-Tenancy,让资源的使用更有效. 深层神经网络技术最近几年取得了巨大的突破,特别在语音和图像识别应用上有质的飞跃,已经被验证能够使用到许多业务上.如何大规模分布式地执行深度学习程序,使其更好地支持不同的业务线成为当务之急.在过去两年,百

基于ArcGIS API for WPF路径分析源码实例

说明: 本实例主要演示网络分析数据集制作,服务发布,最后基于ArcGIS API for WPF做路径分析. 本实例参考ArcGIS官方文档,想了解GIS网络分析可查阅官方文档. 本实例数据为西藏道路数据,若无数据可新建简单的线要素. 本实例使用软件及版本:ArcGIS10.2,ArcGIS APIfor WPF24,VS2010. 最后为了增加效果叠加局部离线卫星地图,需要安装水经注万能地图下载器(相当方便的卫星地图下载器),如果没有安装本软件,可以百度"水经注软件"到官方网站下载.

走在大数据的边缘 基于Spark的机器学习-智能客户系统项目实战(项目实战)

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程 视频课程包含: 38套大数据和人工智能精品高级课包含:大数据,云计算,架构,数据挖掘实战,实时推荐系统实战,电视收视率项目实战,实时流统计项目实战,离线电商分析项目实战,Spark大型项目实战用户分析,智能客户系统项目实战,Linux基础,Hadoop,Spark,Storm,Docker,Mapreduce,Kafka,Flume,OpenStack,Hiv

基于Spark的机器学习实践 (九) - 聚类算法

0 相关源码 1 k-平均算法(k-means clustering)概述 1.1 回顾无监督学习 ◆ 分类.回归都属于监督学习 ◆ 无监督学习是不需要用户去指定标签的 ◆ 而我们看到的分类.回归算法都需要用户输入的训练数据集中给定一个个明确的y值 1.2 k-平均算法与无监督学习 ◆ k-平均算法是无监督学习的一种 ◆ 它不需要人为指定一个因变量,即标签y ,而是由程序自己发现,给出类别y ◆ 除此之外,无监督算法还有PCA,GMM等 源于信号处理中的一种向量量化方法,现在则更多地作为一种聚类

SpringMVC+Apache Shiro+JPA(hibernate)案例教学(四)基于Shiro验证用户权限,且给用户授权

最新项目比较忙,写文章的精力就相对减少了,但看到邮箱里的几个催更,还是厚颜把剩下的文档补上. 一.修改ShiroDbRealm类,实现它的doGetAuthorizationInfo方法 package org.shiro.demo.service.realm; import java.util.ArrayList; import java.util.List; import javax.annotation.Resource; import org.apache.commons.lang.St