spark item-based collaborative filtering

mahout已经提供了item-based cf 算法,但是要想在mahout 算法上修改item-based cf相对来说比较繁琐,比如加入流行度因子降权(降低流行用户与其它用户的相似度)等因素,另一方面mahout执行速度比较慢。目前spark日趋流行,同时spark性能方面的优势渐渐显现出来。但是在spark官方没有提供基于item或者user的协同过滤算法,本文参考了Movie Recommendations and More With Spark文章,详情见http://mlnick.github.io/blog/2013/04/01/movie-recommendations-and-more-with-spark/,书写了基于spark的item-based cf算法,鉴于scala的简练性,代码可读性较高,可以根据自己需求对协同过滤算法进行修改,加入各种修正因子等。

下面是spark item-based cf代码,供大家参考:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._

object CollaborativeFilter
{
  def main(args: Array[String]) {
    /**
     * Parameters to regularize correlation.
     */
    val PRIOR_COUNT = 10
    val PRIOR_CORRELATION = 0

    val sparkConf = new SparkConf().setAppName("item-based cf")
    val sc = new SparkContext(sparkConf)

    // extract (userid, movieid, rating) from ratings data
    val ratings = sc.textFile("/path/to/input").map(line => {
        val fields = line.split("\t")
        (fields(0).toInt, fields(1).toInt, fields(2).toInt)
    })

    // get num raters per movie, keyed on item id
    val item2manyUser = ratings.groupBy(tup => tup._2)

    val numRatersPerItem = item2manyUser.map(grouped => (grouped._1, grouped._2.size))

    // join ratings with num raters on item id
    val ratingsWithSize = item2manyUser.join(numRatersPerItem).
      flatMap(joined => {
        joined._2._1.map(f => (f._1, f._2, f._3, joined._2._2))
    })
   // ratingsWithSize now contains the following fields: (user, item, rating, numRaters).

    // dummy copy of ratings for self join
    val ratings2 = ratingsWithSize.keyBy(tup => tup._1)

    // join on userid and filter item pairs such that we don‘t double-count and exclude self-pairs

    val ratingPairs =ratings2.join(ratings2).filter(f => f._2._1._2 < f._2._2._2)

    // compute raw inputs to similarity metrics for each item pair
    val vectorCalcs =
      ratingPairs.map(data => {
        val key = (data._2._1._2, data._2._2._2)
        val stats =
          (data._2._1._3 * data._2._2._3, // rating 1 * rating 2
            data._2._1._3,                // rating item 1
            data._2._2._3,                // rating item 2
            math.pow(data._2._1._3, 2),   // square of rating item 1
            math.pow(data._2._2._3, 2),   // square of rating item 2
            data._2._1._4,                // number of raters item 1
            data._2._2._4)                // number of raters item 2
        (key, stats)
      }).groupByKey().map(data => {
        val key = data._1
        val vals = data._2
        val size = vals.size
        val dotProduct = vals.map(f => f._1).sum
        val ratingSum = vals.map(f => f._2).sum
        val rating2Sum = vals.map(f => f._3).sum
        val ratingSq = vals.map(f => f._4).sum
        val rating2Sq = vals.map(f => f._5).sum
        val numRaters = vals.map(f => f._6).max
        val numRaters2 = vals.map(f => f._7).max
        (key, (size, dotProduct, ratingSum, rating2Sum, ratingSq, rating2Sq, numRaters, numRaters2))
      })

    // compute similarity metrics for each item pair
    val similarities =
      vectorCalcs.map(fields => {
        val key = fields._1
        val (size, dotProduct, ratingSum, rating2Sum, ratingNormSq, rating2NormSq, numRaters, numRaters2) = fields._2
//        val corr = correlation(size, dotProduct, ratingSum, rating2Sum, ratingNormSq, rating2NormSq)
//        val regCorr = regularizedCorrelation(size, dotProduct, ratingSum, rating2Sum,
//          ratingNormSq, rating2NormSq, PRIOR_COUNT, PRIOR_CORRELATION)
        val cosSim = cosineSimilarity(dotProduct, scala.math.sqrt(ratingNormSq), scala.math.sqrt(rating2NormSq))
//        val jaccard = jaccardSimilarity(size, numRaters, numRaters2)

        (key._1,(key._2, cosSim))
      })

      val inverseSim = similarities.map(ori=>(ori._2._1,(ori._1,ori._2._2)))
      val simTotal = inverseSim ++ similarities
      val cutNumSim = simTotal.groupByKey().map(sim=>(sim._1,sim._2.toList.sortBy(x=>x._2).take(50)))

      val ratingsInverse = ratings.map(rating=>(rating._2,(rating._1,rating._3)))
      val userRecommend = ratingsInverse.join(cutNumSim).flatMap(obj=>obj._2._2.map(x=>((obj._2._1._1,x._1),obj._2._1._2*x._2)))
      val filterItem = ratings.map(x=>((x._1,x._2),Double.NaN))
      val totalScore = userRecommend ++ filterItem

      val finalResult = totalScore.reduceByKey(_+_).filter(x=> !(x._2 equals(Double.NaN))).
        map(x=>(x._1._1,x._1._2,x._2)).groupBy(x=>x._1).flatMap(x=>(x._2.toList.sortBy(o=>o._3).take(50)))
      finalResult.saveAsTextFile("/path/to/savefile")
  }

  // *************************
  // * SIMILARITY MEASURES
  // *************************

  /**
   * The correlation between two vectors A, B is
   *   cov(A, B) / (stdDev(A) * stdDev(B))
   *
   * This is equivalent to
   *   [n * dotProduct(A, B) - sum(A) * sum(B)] /
   *     sqrt{ [n * norm(A)^2 - sum(A)^2] [n * norm(B)^2 - sum(B)^2] }
   */
  def correlation(size : Double, dotProduct : Double, ratingSum : Double,
                  rating2Sum : Double, ratingNormSq : Double, rating2NormSq : Double) = {

    val numerator = size * dotProduct - ratingSum * rating2Sum
    val denominator = scala.math.sqrt(size * ratingNormSq - ratingSum * ratingSum) *
      scala.math.sqrt(size * rating2NormSq - rating2Sum * rating2Sum)

    numerator / denominator
  }

  /**
   * Regularize correlation by adding virtual pseudocounts over a prior:
   *   RegularizedCorrelation = w * ActualCorrelation + (1 - w) * PriorCorrelation
   * where w = # actualPairs / (# actualPairs + # virtualPairs).
   */
  def regularizedCorrelation(size : Double, dotProduct : Double, ratingSum : Double,
                             rating2Sum : Double, ratingNormSq : Double, rating2NormSq : Double,
                             virtualCount : Double, priorCorrelation : Double) = {

    val unregularizedCorrelation = correlation(size, dotProduct, ratingSum, rating2Sum, ratingNormSq, rating2NormSq)
    val w = size / (size + virtualCount)

    w * unregularizedCorrelation + (1 - w) * priorCorrelation
  }

  /**
   * The cosine similarity between two vectors A, B is
   *   dotProduct(A, B) / (norm(A) * norm(B))
   */
  def cosineSimilarity(dotProduct : Double, ratingNorm : Double, rating2Norm : Double) = {
    dotProduct / (ratingNorm * rating2Norm)
  }

  /**
   * The Jaccard Similarity between two sets A, B is
   *   |Intersection(A, B)| / |Union(A, B)|
   */
  def jaccardSimilarity(usersInCommon : Double, totalUsers1 : Double, totalUsers2 : Double) = {
    val union = totalUsers1 + totalUsers2 - usersInCommon
    usersInCommon / union
  }
}
时间: 2024-09-20 23:38:01

spark item-based collaborative filtering的相关文章

【转载】协同过滤(Collaborative Filtering)

Collaborative Filtering 协同过滤的主要目标:由于网络信息量的增多,用户往往被淹没在信息的海洋里,很难很轻易的找到自己感兴趣的topic.协同过滤就是为了把用户最可能感兴趣的信息推送给用户(Recommer system). 协同过滤的方法: model-base,user-base,item-base,content-base. user-based:搜集用户profile.对于一个active user,找到跟其比较接近(或者相似)的几个neighbour.使用这些ne

推荐系统(recommender systems):预测电影评分--构造推荐系统的一种方法:协同过滤(collaborative filtering )

协同过滤(collaborative filtering )能自行学习所要使用的特征 如我们有某一个数据集,我们并不知道特征的值是多少,我们有一些用户对电影的评分,但是我们并不知道每部电影的特征(即每部电影到底有多少浪漫成份,有多少动作成份) 假设我们通过采访用户得到每个用户的喜好,如上图中的Alice喜欢爱情电影,不喜欢动作电影,则我们将θ(1)设为[0,5,0],如此设置θ(2),θ(3),θ(4)的值,这样我们有了每个用户的θ的值以及他们对电影的打分,就可以推断出每部电影的x(特征)的值.

collaborative filtering协同过滤

每次我想看电影的时候,都会去问我的朋友,小健.一般他推荐的电影,我都比较喜欢.显然不是所有人都有小健这样的能力.因为我碰巧和小健有类似的品味. 这个生活中的经验,实际上有着广泛的用途. 当系统需要为某个人做出推荐时,一种机器学习的算法是这样工作的:就是在一大群人中找出一部分与他品味类似的人,把这些人的喜欢的东西排序,然后推荐给他. 自然引出两个问题: 谁是与他相近品味的人: 怎么对这些人喜欢的东西排序: 对于上述两个问题有多种不同的答案,不同的答案意味着不同的算法. --- 问题1的答案有两种:

亚马逊 协同过滤算法 Collaborative filtering

这节课时郭强的三维课.他讲的是MAYA和max .自己对这个也不怎么的感兴趣.而且这个课感觉属于数字媒体.自己对游戏,动画,这些东西一点都不兴趣,比如大一的时候刚开学的时候,张瑞的数字媒体的导论课.还有就是秀霞的动画课,自己记录一下自己的思想我在网上看见了这样一个说法,说的是跟着本科生导师做项目.就比如一个人说的,先找一个APP运行一遍,然后再这个基础上修改,各种的粘贴代码.是继续的做这个项目,还是学一点计算机的基础知识了.开始写算法,亚马逊的协同过滤算法 第一:初次印象,进入一个网站的时候,可

[论文阅读&amp;翻译]Item-to-Item Collaborative Filtering

Amazon.com Recommendations Item-to-Item Collaborative Filtering 个人感受: 这篇论文首先介绍了历史上的三种算法:传统协同过滤.聚类.基于搜索的算法.第一种方法在计算效率.少量数据上表现欠佳:第二种方法准确率欠佳:第三种方法比较"简单",因此提出了大量计算在线下的物品-物品的相似记录进行推荐. 揣测一下作者的思路,计算量大是因为数据量大,但是其中有价值的就是其中购买的物品,而且对于购买的物品来说,我们推荐的也是根据购买的物品

Item-to-Item Collaborative Filtering

Amazon.com Recommendations Item-to-Item Collaborative Filtering 个人感受: 这篇论文首先介绍了历史上的三种算法:传统协同过滤.聚类.基于搜索的算法.第一种方法在计算效率.少量数据上表现欠佳:第二种方法准确率欠佳:第三种方法比较"简单",因此提出了大量计算在线下的物品-物品的相似记录进行推荐. 揣测一下作者的思路,计算量大是因为数据量大,但是其中有价值的就是其中购买的物品,而且对于购买的物品来说,我们推荐的也是根据购买的物品

【RS】Factorization Meets the Neighborhood: a Multifaceted Collaborative Filtering Model - 当因式分解遇上邻域:多层面协同过滤模型

[论文标题]Factorization Meets the Neighborhood: a Multifaceted Collaborative Filtering Model   (35th-ICML,PMLR) [论文作者]Yehuda Koren [论文链接]Paper (9-pages // Double column) [摘要] 推荐系统为用户提供个性化的产品或服务建议.这些系统通常依赖于协同过滤(CF),通过分析过去的事务来建立用户和产品之间的关联.比较成功的CF方法有两种,一种是直

协同过滤 Collaborative Filtering

协同过滤 collaborative filtering 人以类聚,物以群分 相似度 1. Jaccard 相似度 定义为两个集合的交并比: Jaccard 距离,定义为 1 - J(A, B),衡量两个集合的区分度: 为什么 Jaccard 不适合协同过滤?—— 只考虑用户有没有看过,没考虑评分大小 2. 余弦相似度 根据两个向量夹角的余弦值来衡量相似度: 为什么余弦相似度不适合协同过滤?—— 不同用户各自评分总和不一样,导致评分占总比不一样,可能计算出和事实相反的结果. 3. Pearson

item Collaborative Filtering

算法步骤: 1.计算物品相似度 2.根据用户购买记录,推荐相似物品 物品相似度定义: A. 购买i的人里面,有多少比例购买了j 缺点(推荐系统需要能挖掘长尾信息,此处若j很热门,则w趋向于很大,则买了i的人都会被推荐j,热门商品更加热门) B. 在A的基础上,加入了对热门物品j的惩罚 C. 活跃用户的贡献度应该要低(例子:一个在当当上买书的人,是一个自己开书店的人) 相似度归一化:可提高准确率.召回率.覆盖率.新颖度 --分析: 假设用户喜欢看两类电影(科幻片,爱情片),而科幻片的相似度普遍比爱