Spark调研笔记第7篇 - 应用实战: 如何利用Spark集群计算物品相似度

本文是Spark调研笔记的最后一篇,以代码实例说明如何借助Spark平台高效地实现推荐系统CF算法中的物品相似度计算。

在推荐系统中,最经典的推荐算法无疑是协同过滤(Collaborative Filtering, CF),而item-cf又是CF算法中一个实现简单且效果不错的算法。

在item-cf算法中,最关键的步骤是计算物品之间的相似度。本文以代码实例来说明如何利用Spark平台快速计算物品间的余弦相似度。

Cosine Similarity是相似度的一种常用度量,根据《推荐系统实践》一书第2.4.2节关于Item-CF算法部分的说明,其计算公式如下:

举个例子,若对item1有过行为的用户集合为{u1, u2, u3},对item2有过行为的用户集合为{u1, u3, u4, u5},则根据上面的式子,item1和item2间的相似度为2/(3*4),其中分子的2是因为item1的user_list与item2的user_list的交集长度为2,即item1和item2的共现(co-occurence)次数是2。

在工程实现上,根据论文"Empirical Analysis of Predictive Algorithms for Collaborative Filtering"的分析,为对活跃用户做惩罚,引入了IUF (Inverse User Frequency)的概念(与TF-IDF算法引入IDF的思路类似:活跃用户对物品相似度的贡献应该小于不活跃的用户),因此,对余弦相似度做改进后相似度计算公式如下:

可以看到,上式分子部分的1/log(1 + N(u))体现了对活跃用户的惩罚。

此外,通常认为用户在相隔很短的时间内喜欢的物品具有更高相似度。因此,工程实现上,还会考虑时间衰减效应。一种典型的时间衰减函数如下所示:

最终,时间上下文相关的Item-CF算法中的相似度计算公式如下:

上式中,分母部分与标准的相似度公式分母保持一致;分子部分参与运算的是item_i和item_j的共现用户集合,其中,f(t)是时间衰减效应的体现,N(u)对活跃用户做了惩罚。

下面的Python代码是计算物品相似度的Spark任务的代码片段(从HDFS加载用户历史行为日志,计算物品相似度,相似列表取TopN,将相似度计算结果写会HDFS),供大家参考:

#!/bin/env/python

import pyspark as ps
import math
import datetime as dt
import util

def generate_item_pair(x):
    """
        Find co-occurence items of every given user
        Return a tuple in the format of ((item_0, item_1), cooccurrence_factor).
    """
    items = x[1]
    item_cnt = len(items)
    alpha = 1
    for i in items:
        item1 = i[0]
        ts1   = i[1]
        for j in items:
            item2 = j[0]
            ts2   = j[1]
            if item1 != item2:
                ## introduce time decay and penalize active users
                ft = 1.0 / (1 + alpha * abs(ts1 - ts2))
                yield ((item1, item2), (ft / math.log(1 + item_cnt)))

def compute_item_similarity(x):
    items = x[0]
    cooccurrence = float(x[1])
    item_dict = g_item_freq_d
    norm_factor = 5
    if items[0] in item_dict and items[1] in item_dict:
        freq_0 = item_dict[items[0]]
        freq_1 = item_dict[items[1]]
        ## calculate similarity between the item pair
        sim = cooccurrence / math.sqrt(freq_0 * freq_1)
        ## normalize similarity
        norm_sim = (cooccurrence / (cooccurrence + norm_factor)) * sim
        yield (items[0], (items[1], norm_sim))

def sort_items(x):
    """
        For a given item, sort all items similar to it as descent (using similarity scores), take topN similar items, and return as the following format:
        given_item \t sorted_item_0$sorted_score_0,sorted_item_1$sorted_score_1,...
    """
    similar_items = list(x[1])
    if len(similar_items) > 0:
        ## sort list of (item, score) tuple by score from high to low
        similar_items.sort(key=lambda x: x[1], reverse=True)
        ## format the list of sorted items as a string
        similar_items_str = ",".join(["$".join(map(str,item)) for item in similar_items[0:50]])
        yield "\t".join([str(x[0]), similar_items_str])

def main():
    base_hdfs_uri = "hdfs://to/user/behavior/log"
    today = dt.date.today()
    knn_similarity_file = '%s/%s/knn_sim' % (base_hdfs_uri, today.strftime('%Y%m%d'))

    sc = ps.SparkContext()

    ## load user behavior from hdfs log
    ## each element in user_item is a tuple: (user, (item, timestamp))
    history_s = (today - dt.timedelta(8)).strftime('%Y%m%d')
    history_e = (today - dt.timedelta(2)).strftime('%Y%m%d')
    input_files = util.get_input_files(action='play', start=history_s, end=history_e)
    user_item = sc.textFile(",".join(input_files))        .mapPartitions(util.parse_user_item)         .map(lambda x: (x[0], (x[1], x[2])))         .distinct()         .cache()

    ## compute item frequency and store as a global dict
    item_freq = user_item.map(lambda x: (x[1][0], 1))         .reduceByKey(lambda x, y: x + y)         .collect()
    global g_item_freq_d
    g_item_freq_d = dict()
    for x in item_freq:
        g_item_freq_d[x[0]] = x[1]

    ## compute item similarity and find top n most similar items
    item_pair_sim = user_item.groupByKey()         .flatMap(generate_item_pair)         .reduceByKey(lambda x, y: x + y)         .flatMap(compute_item_similarity)         .groupByKey()         .flatMap(sort_items)         .cache()

    ## dump to hdfs
    item_pair_sim.repartition(1).saveAsTextFile(knn_similarity_file)

if __name__ == '__main__':
    main()

上面的代码中,import util中引入的util只是负责从HDFS获取用户历史日志的文件名列表,非常简单,实现细节这里不赘述。

【参考资料】

1. wikipedia: Collaborative filtering

2. 推荐系统实践(项亮著)第2.4.2节: 基于物品的协同过滤算法

3. Paper: Empirical Analysis of Predictive Algorithms for Collaborative Filtering

4. 推荐系统实践(项亮著)第5.1.6节: 时间上下文相关的ItemCF算法

5. Spark Programming Guide

========================== EOF ===========================

时间: 2024-10-25 14:38:10

Spark调研笔记第7篇 - 应用实战: 如何利用Spark集群计算物品相似度的相关文章

Spark调研笔记第6篇 - Spark编程实战FAQ

本文主要记录我使用Spark以来遇到的一些典型问题及其解决办法,希望对遇到同样问题的同学们有所帮助. 1. Spark环境或配置相关 Q: Spark客户端配置文件spark-defaults.conf中,spark.executor.memory和spark.cores.max应该如何合理配置? A: 配置前,需要对spark集群中每个节点机器的core和memory的配置有基本了解.比如由100台机器搭建的spark集群中,每个节点的配置是core=32且memory=128GB,那么,向该

Spark调研笔记第1篇 - Spark简介

在公司线上项目中引入Spark已经将近1年时间了,从效果来看,Spark确实是能提高生产力的优秀分布式计算平台. 从本篇笔记开始,会把之前调研Spark时的调研报告分享出来(限于篇幅,会分成几篇文章),以便帮助刚接触Spark的朋友们尽快入门. 下面开始正文. 1. 项目背景 Spark项目于2009年诞生于UC Berkeley AMP Lab并于2010年正式提交Apache Software Foundation成为开源项目.目前已经成为Apache下的明星项目,其代码提交活跃度在整个社区

Spark调研笔记第5篇 - Spark API简单介绍

因为Spark是用Scala实现的,所以Spark天生支持Scala API.此外,还支持Java和Python API. 以Spark 1.3版本号的Python API为例.其模块层级关系例如以下图所看到的: 从上图可知,pyspark是Python API的顶层package,它包括了几个重要的subpackages.当中: 1) pyspark.SparkContext 它抽象了指向spark集群的一条连接,可用来创建RDD对象,它是API的主入口. 2) pyspark.SparkCo

Spark调研笔记第4篇 - PySpark Internals

其实,有两个名为PySpark的概念,一个是指Spark客户端内置的pyspark脚本,而另一个是指Spark Python API中的名为pyspark的package. 本文只对第1个pyspark概念做介绍. 1. Spark客户端内置的pyspark"命令" Spark客户端支持交互模式以方便应用调试,通过调用pyspark可以进入交互环境: cd /path/to/spark/ && ./bin/pyspark 用编辑器查看可知,pyspark其实是个shel

Spark调研笔记第3篇 - Spark集群相应用的调度策略简单介绍

Spark集群的调度分应用间调度和应用内调度两种情况,下文分别进行说明. 1. 应用间调度 1) 调度策略1: 资源静态分区 资源静态分区是指整个集群的资源被预先划分为多个partitions,资源分配时的最小粒度是一个静态的partition. 依据应用对资源的申请需求为其分配静态的partition(s)是Spark支持的最简单的调度策略. 我们已经知道,不同的应用有各自的Spark Context且占用各自的JVM和executor(s).依据Spark Job Scheduling文档的

Spark调研笔记第5篇 - Spark API简介

由于Spark是用Scala实现的,所以Spark天生支持Scala API,此外,还支持Java和Python API.以Spark 1.3版本的Python API为例,其模块层级关系如下图所示: 从上图可知,pyspark是Python API的顶层package,它包含了几个重要的subpackages,其中: 1) pyspark.SparkContext 它抽象了指向spark集群的一条连接,可用来创建RDD对象,它是API的主入口. 2) pyspark.SparkConf 通过它

Spark调研笔记第3篇 - Spark集群对应用的调度策略简介

Spark集群的调度分应用间调度和应用内调度两种情况,下文分别进行说明. 1. 应用间调度 1) 调度策略1: 资源静态分区 资源静态分区是指整个集群的资源被预先划分为多个partitions,资源分配时的最小粒度是一个静态的partition.根据应用对资源的申请需求为其分配静态的partition(s)是Spark支持的最简单的调度策略. 我们已经知道,不同的应用有各自的Spark Context且占用各自的JVM和executor(s).根据Spark Job Scheduling文档的说

日志收集系统Flume调研笔记第1篇 - Flume简介

用户行为数据的收集无疑是构建推荐系统的先决条件,而Apache基金会下的Flume项目正是为分布式的日志收集量身打造的,本文是Flume调研笔记的第1篇,主要介绍Flume的基本架构,下篇笔记将会以实例说明Flume的部署和使用步骤. 本文所用的Flume版本为目前最新版的ver1.5.2,它属于Flume-NG,在系统架构上与Flume-OG有所区别,二者的不同可以参考FlumeWiki文档的说明. 1. Flume是什么 Flume是Apache基金会下的一个开源项目,它实现了一套分布式的.

Spark Streaming性能优化系列-如何获得和持续使用足够的集群计算资源?

一:数据峰值的巨大影响 1. 数据确实不稳定,例如晚上的时候访问流量特别大 2. 在处理的时候例如GC的时候耽误时间会产生delay延迟 二:Backpressure:数据的反压机制 基本思想:根据上一次计算的Job的一些信息评估来决定下一个Job数据接收的速度. 如何限制Spark接收数据的速度? Spark Streaming在接收数据的时候必须把当前的数据接收完毕才能接收下一条数据. 源码解析 RateController: 1. RateController是监听器,继承自Streami