第三篇:一个Spark推荐系统引擎的实现

前言

经过2节对MovieLens数据集的学习,想必读者对MovieLens数据集认识的不错了;同时也顺带回顾了些Spark编程技巧,Python数据分析技巧。

而本节将是让人兴奋的一节,它将实现一个基于Spark的推荐系统引擎。

关于推荐算法相关的知识,请读者先自行学习,本文仅仅介绍基于ALS矩阵分解推荐引擎的Spark实现。

PS: 全文示例将采用Scala语言。

第一步:提取有效特征

1. 首先,启动spark-shell并分配足够内存:

2. 载入用户对影片的评级数据:

1 // 载入评级数据
2 val rawData = sc.textFile("/home/kylin/ml-100k/u.data")
3 // 展示一条记录
4 rawData.first()

结果为:

3. 切分记录并返回新的RDD:

1 // 格式化数据集
2 val rawRatings = rawData.map(_.split("\t").take(3))
3 // 展示一条记录
4 rawRatings.first()

4. 接下来需要将评分矩阵RDD转化为Rating格式的RDD:

1 // 导入rating类
2 import org.apache.spark.mllib.recommendation.Rating
3 // 将评分矩阵RDD中每行记录转换为Rating类型
4 val ratings = rawRatings.map { case Array(user, movie, rating) => Rating(user.toInt, movie.toInt, rating.toDouble) }

这是因为MLlib的ALS推荐系统算法包只支持Rating格式的数据集。

第二步:训练推荐模型

接下来可以进行ALS推荐系统模型训练了。MLlib中的ALS算法接收三个参数:

a) rank:对应的是隐因子的个数,这个值设置越高越准,但是也会产生更多的计算量。一般将这个值设置为10-200;
b) iterations:对应迭代次数,一般设置个10就够了;
c) lambda:该参数控制正则化过程,其值越高,正则化程度就越深。一般设置为0.01。

1. 首先,执行以下代码,启动ALS训练:

1 // 导入ALS推荐系统算法包
2 import org.apache.spark.mllib.recommendation.ALS
3 // 启动ALS矩阵分解
4 val model = ALS.train(ratings, 50, 10, 0.01)

这步将会使用ALS矩阵分解算法,对评分矩阵进行分解,且隐特征个数设置为50,迭代10次,正则化参数设为了0.01。

相对其他步骤,训练耗费的时间最多。运行结果如下:

2. 返回类型为MatrixFactorizationModel对象,它将结果分别保存到两个(id,factor)RDD里面,分别名为userFeatures和productFeatures。

也就是评分矩阵分解后的两个子矩阵:

上面展示了id为4的用户的“隐因子向量”。请注意ALS实现的操作都是延迟性的转换操作。

第三步:使用ALS推荐模型

1. 预测用户789对物品123的评分:

2. 为用户789推荐前10个物品:

1 val userId = 789
2 val K = 10
3
4 // 获取推荐列表
5 val topKRecs = model.recommendProducts(userId, K)
6 // 打印推荐列表
7 println(topKRecs.mkString("\n"))

结果为:

3. 初步检验推荐效果

获取到各个用户的推荐列表后,想必大家都想先看看用户评分最高的电影,和给他推荐的电影是不是有相似。

3.1 创建电影id - 电影名字典:

1 // 导入电影数据集
2 val movies = sc.textFile("/home/kylin/ml-100k/u.item")
3 // 建立电影id - 电影名字典
4 val titles = movies.map(line => line.split("\\|").take(2)).map(array => (array(0).toInt, array(1))).collectAsMap()
5 // 查看id为123的电影名
6 titles(123)

结果为:

这样后面就可以根据电影的id找到电影名了。

3.2 获取某用户的所有观影记录并打印:

1 // 建立用户名-其他RDD,并仅获取用户789的记录
2 val moviesForUser = ratings.keyBy(_.user).lookup(789)
3 // 获取用户评分最高的10部电影,并打印电影名和评分值
4 moviesForUser.sortBy(-_.rating).take(10).map(rating => (titles(rating.product), rating.rating)).foreach(println)

结果为:

3.3 获取某用户推荐列表并打印:

读者可以自行对比这两组列表是否有相似性。

第四步:物品推荐

很多时候还有另一种需求:就是给定一个物品,找到它的所有相似物品。

遗憾的是MLlib里面竟然没有包含内置的函数,需要自己用jblas库来实现 = =#。

1. 导入jblas库矩阵类,并创建一个余弦相似度计量函数:

1 // 导入jblas库中的矩阵类
2 import org.jblas.DoubleMatrix
3 // 定义相似度函数
4 def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {
5     vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
6 }

2. 接下来获取物品(本例以物品567为例)的因子特征向量,并将它转换为jblas的矩阵格式:

1 // 选定id为567的电影
2 val itemId = 567
3 // 获取该物品的隐因子向量
4 val itemFactor = model.productFeatures.lookup(itemId).head
5 // 将该向量转换为jblas矩阵类型
6 val itemVector = new DoubleMatrix(itemFactor)

3. 计算物品567和所有其他物品的相似度:

 1 // 计算电影567与其他电影的相似度
 2 val sims = model.productFeatures.map{ case (id, factor) =>
 3     val factorVector = new DoubleMatrix(factor)
 4     val sim = cosineSimilarity(factorVector, itemVector)
 5     (id, sim)
 6 }
 7 // 获取与电影567最相似的10部电影
 8 val sortedSims = sims.top(K)(Ordering.by[(Int, Double), Double] { case (id, similarity) => similarity })
 9 // 打印结果
10 println(sortedSims.mkString("\n"))

结果为:

其中0.999999当然就是自己跟自己的相似度了。

4. 查看推荐结果:

1 // 打印电影567的影片名
2 println(titles(567))
3 // 获取和电影567最相似的11部电影(含567自己)
4 val sortedSims2 = sims.top(K + 1)(Ordering.by[(Int, Double), Double] { case (id, similarity) => similarity })
5 // 再打印和电影567最相似的10部电影
6 sortedSims2.slice(1, 11).map{ case (id, sim) => (titles(id), sim) }.mkString("\n")

结果为:

看看,这些电影是不是和567相似?

第五步:推荐效果评估

在Spark的ALS推荐系统中,最常用到的两个推荐指标分别为MSE和MAPK。其中MSE就是均方误差,是基于评分矩阵的推荐系统的必用指标。那么MAPK又是什么呢?

它称为K值平均准确率,最多用于TopN推荐中,它表示数据集范围内K个推荐物品与实际用户购买物品的吻合度。具体公式请读者自行参考有关文档。

本文推荐系统就是一个[基于用户-物品评分矩阵的TopN推荐系统],下面步骤分别用来获取本文推荐系统中的这两个指标。

PS:记得先要导入jblas库。

1. 首先计算MSE和RMSE:

 1 // 创建用户id-影片id RDD
 2 val usersProducts = ratings.map{ case Rating(user, product, rating)  => (user, product)}
 3 // 创建(用户id,影片id) - 预测评分RDD
 4 val predictions = model.predict(usersProducts).map{
 5     case Rating(user, product, rating) => ((user, product), rating)
 6 }
 7 // 创建用户-影片实际评分RDD,并将其与上面创建的预测评分RDD join起来
 8 val ratingsAndPredictions = ratings.map{
 9     case Rating(user, product, rating) => ((user, product), rating)
10 }.join(predictions)
11
12 // 导入RegressionMetrics类
13 import org.apache.spark.mllib.evaluation.RegressionMetrics
14 // 创建预测评分-实际评分RDD
15 val predictedAndTrue = ratingsAndPredictions.map { case ((user, product), (actual, predicted)) => (actual, predicted) }
16 // 创建RegressionMetrics对象
17 val regressionMetrics = new RegressionMetrics(predictedAndTrue)
18
19 // 打印MSE和RMSE
20 println("Mean Squared Error = " + regressionMetrics.meanSquaredError)
21 println("Root Mean Squared Error = " + regressionMetrics.rootMeanSquaredError)

基本原理是将实际评分-预测评分扔到RegressionMetrics类里,该类提供了mse和rmse成员,可直接输出获取。

结果为:

2. 计算MAPK:

// 创建电影隐因子RDD,并将它广播出去
val itemFactors = model.productFeatures.map { case (id, factor) => factor }.collect()
val itemMatrix = new DoubleMatrix(itemFactors)
val imBroadcast = sc.broadcast(itemMatrix)

// 创建用户id - 推荐列表RDD
val allRecs = model.userFeatures.map{ case (userId, array) =>
  val userVector = new DoubleMatrix(array)
  val scores = imBroadcast.value.mmul(userVector)
  val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
  val recommendedIds = sortedWithId.map(_._2 + 1).toSeq
  (userId, recommendedIds)
}

// 创建用户 - 电影评分ID集RDD
val userMovies = ratings.map{ case Rating(user, product, rating) => (user, product) }.groupBy(_._1)

// 导入RankingMetrics类
import org.apache.spark.mllib.evaluation.RankingMetrics
// 创建实际评分ID集-预测评分ID集 RDD
val predictedAndTrueForRanking = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) =>
  val actual = actualWithIds.map(_._2)
  (predicted.toArray, actual.toArray)
}
// 创建RankingMetrics对象
val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)
// 打印MAPK
println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)

结果为:

比较坑的是不能设置K,也就是说,计算的实际是MAP...... 正如属性名:meanAveragePrecision。

小结

感觉MLlib的推荐系统真的很一般,一方面支持的类型少 - 只支持ALS;另一方面支持的推荐系统算子也少,连输出个RMSE指标都要写好几行代码,太不方便了。

唯一的好处是因为接近底层,所以可以让使用者看到些实现的细节,对原理更加清晰。

时间: 2024-10-11 05:46:50

第三篇:一个Spark推荐系统引擎的实现的相关文章

第三篇:Spark SQL Catalyst源码分析之Analyzer

/** Spark SQL源码分析系列文章*/ 前面几篇文章讲解了Spark SQL的核心执行流程和Spark SQL的Catalyst框架的Sql Parser是怎样接受用户输入sql,经过解析生成Unresolved Logical Plan的.我们记得Spark SQL的执行流程中另一个核心的组件式Analyzer,本文将会介绍Analyzer在Spark SQL里起到了什么作用. Analyzer位于Catalyst的analysis package下,主要职责是将Sql Parser

jquery jtemplates.js模板渲染引擎的详细用法第三篇

jquery jtemplates.js模板渲染引擎的详细用法第三篇 <span style="font-family:Microsoft YaHei;font-size:14px;"><!doctype html> <html lang="zh-CN"> <head> <meta http-equiv="Content-Type" content="text/html; chars

【Spark深入学习 -13】Spark计算引擎剖析

----本节内容------- 1.遗留问题解答 2.Spark核心概念 2.1 RDD及RDD操作 2.2 Transformation和Action 2.3 Spark程序架构 2.4 Spark on Yarn运行流程 2.5 WordCount执行原理 3.Spark计算引擎原理 3.1 Spark内部原理 3.2 生成逻辑执行图 3.3 生成物理执行图 4.Spark Shuffle解析 4.1 Shuffle 简史 4.2  Spark Shuffle ·Shuffle Write

短信开发系列(三):短信接收引擎

短信开发系列目录: 短信开发系列(一):GSM手机短信开发初探短信开发系列(二):GSM手机短信开发之短信解码短信开发系列(三):短信接收引擎 之前写了短信接收处理的一些内容,今年事情实在太多了,就停顿了这么一大段的时间.接下来会继续完成相关的内容. 今天先写用之前写的短信类库的一个应用,短信接收引擎.可以用在处理一些短信的提醒:作为前面两篇文章的一个实战运用,可以作为一个多线程.委托和事件.串口等方面知识的一个综合运用. 先来分析一下整个程序的流程: - 启动线程 - 定时运行线程主函数 -

【第三篇】ASP.NET MVC快速入门之安全策略(MVC5+EF6)

[第一篇]ASP.NET MVC快速入门之数据库操作(MVC5+EF6) [第二篇]ASP.NET MVC快速入门之数据注解(MVC5+EF6) [第三篇]ASP.NET MVC快速入门之安全策略(MVC5+EF6) [第四篇]ASP.NET MVC快速入门之完整示例(MVC5+EF6) [番外篇]ASP.NET MVC快速入门之免费jQuery控件库(MVC5+EF6) 请关注三石的博客:http://cnblogs.com/sanshi 表单身份验证(Forms Authentication

使用Windows GDI 做一个3D”软引擎“-Part1

前: 最近几天一个很虎比的教程吸引了我的视线,原作者使用c# / JavaScript逐步实现了一个基本的3D软引擎. 我不懂上面提到的语言,所以,准备用我熟悉的C++和Win32实现重造这个轮子.:) 注意: 这不是一篇关于DirectX / OpenGL (GPU)的文章,本系列文章将实现一个软件(CPU)驱动的“DirectX”,很有趣吧,啊哈. 本文假设读者有一定的计算机图形学的基础,使用OpenGL / DirectX 写过程序. 本文假设读者有一定的Win32基础(不是MFC),最起

cocos2dx基础篇(4)——浅析cocos2dx引擎目录

通过前面几节的学习,相信大家都已经配置好了VS+cocos2dx2.2.3的环境,并且成功运行了官方的案例HelloWorld. 一.窥探文件目录 要想学好cocos2dx,首先就需要对引擎目录下的各个文件有所了解.接下来,就让我们先来分析一下cocos2dx2.2.3引擎的文件目录吧. 从目录中我们主要了解一下一下几个文件: cocos2dx:cocos2d-x引擎的核心部分,存放了引擎的大部分源文件. CocosDenshion:声音模块相关源文件. Debug.win32:在Windows

(转) 从0开始搭建SQL Server AlwaysOn 第三篇(配置AlwaysOn)

原文地址: http://www.cnblogs.com/lyhabc/p/4682986.html 这一篇是从0开始搭建SQL Server AlwaysOn 的第三篇,这一篇才真正开始搭建AlwaysOn,前两篇是为搭建AlwaysOn 做准备的 步骤 这一篇依然使用step by step的方式介绍怎麽搭建AlwaysOn 请先使用本地用户Administrator登录这两个集群节点并执行下面的操作,先不要用域用户DCADMIN登录 1.两个集群节点都需先安装.NET Framework

Mahout推荐系统引擎UserCF中的IRStats部分源码解析

Mahout提供推荐系统引擎是模块化的,分为5个主要部分组成: 1. 数据模型 2. 相似度算法 3. 近邻算法 4. 推荐算法 5. 算法评分器 今天好好看了看关于推荐算法以及算法评分部分的源码. 以http://blog.csdn.net/jianjian1992/article/details/46582713 里边数据的为例进行实验. 整体流程的代码如下,依照上面的5个模块,看起来倒是很简单呀. public static RecommenderBuilder userRecommend