movie-recommendation-with-mllib

In this chapter, we will use MLlib to make personalized movie recommendations tailored for you. We will work with 10 million ratings from 72,000 users on 10,000 movies, collected by MovieLens.  This dataset is pre-loaded in the HDFS on your cluster in /movielens/large. For quick testing of your code, you may want to use a smaller dataset under /movielens/medium, which contains 1 million ratings from 6000 users on 4000 movies.

1. Data set

We will use two files from this MovieLens dataset: “ratings.dat” and “movies.dat”. All ratings are contained in the file “ratings.dat” and are in the following format:

UserID::MovieID::Rating::Timestamp

Movie information is in the file “movies.dat” and is in the following format:

MovieID::Title::Genres

2. Collaborative filtering

Collaborative filtering is commonly used for recommender systems. These techniques aim to fill in the missing entries of a user-item association matrix, in our case, the user-movie rating matrix. MLlib currently supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries. In particular, we implement the alternating least squares (ALS) algorithm to learn these latent factors.

3. Setup

We will be using a standalone project template for this exercise. In your AMI, this has been setup in /root/machine-learning/scala/. You should find the following items in the directory.

  • sbt: Directory containing the SBT tool
  • build.sbt: SBT project file
  • MovieLensALS.scala: Main Scala program that you are going to edit, compile and run
  • solution: Directory containing the solution code

The main file you are going to edit, compile, and run for the exercises is MovieLensALS.scala. It should look as follows:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

import java.util.Random

import org.apache.log4j.Logger

import org.apache.log4j.Level

import scala.io.Source

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

import org.apache.spark.rdd._

import org.apache.spark.mllib.recommendation.{ALS, Rating, MatrixFactorizationModel}

object MovieLensALS {

def main(args: Array[String]) {

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

if (args.length != 1) {

println("Usage: sbt/sbt package \"run movieLensHomeDir\"")

exit(1)

}

// set up environment

val jarFile = "target/scala-2.10/movielens-als_2.10-0.0.jar"

val sparkHome = "/root/spark"

val master = Source.fromFile("/root/spark-ec2/cluster-url").mkString.trim

val masterHostname = Source.fromFile("/root/spark-ec2/masters").mkString.trim

val conf = new SparkConf()

.setMaster(master)

.setSparkHome(sparkHome)

.setAppName("MovieLensALS")

.set("spark.executor.memory", "8g")

.setJars(Seq(jarFile))

val sc = new SparkContext(conf)

// load ratings and movie titles

val movieLensHomeDir = "hdfs://" + masterHostname + ":9000" + args(0)

val ratings = sc.textFile(movieLensHomeDir + "/ratings.dat").map { line =>

val fields = line.split("::")

// format: (timestamp % 10, Rating(userId, movieId, rating))

(fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))

}

val movies = sc.textFile(movieLensHomeDir + "/movies.dat").map { line =>

val fields = line.split("::")

// format: (movieId, movieName)

(fields(0).toInt, fields(1))

}.collect.toMap

// your code here

// clean up

sc.stop();

}

/** Compute RMSE (Root Mean Squared Error). */

def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long) = {

// ...

}

/** Elicitate ratings from command-line. */

def elicitateRatings(movies: Seq[(Int, String)]) = {

// ...

}

}

XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

Let’s first take a closer look at our template code in a text editor on the cluster itself, then we’ll start adding code to the template. Locate the MovieLensALS class and open it with a text editor.

cd /root/machine-learning/scala
vim MovieLensALS.scala  # If you don‘t know vim, you can use emacs or nano

The cluster machines have vim, emacs, and nano installed for editing. Alternatively, you can use your favorite text editor locally and then copy-paste content into vim, emacs, or nano before running it.

For any Spark computation, we first create a SparkConf object and use it to create a Spark context object. For Scala or Java programs, we do that by providing the Spark cluster URL, the Spark home directory, and the JAR file that will be generated when we compile our program. For Python programs, we only need to provide the Spark cluster URL. Finally, we also name our program “MovieLensALS” to identify it in Spark’s web UI.

This is what it looks like in our template code:


1

2

3

4

5

6

7

val conf = new SparkConf()

.setMaster(master)

.setSparkHome(sparkHome)

.setAppName("MovieLensALS")

.set("spark.executor.memory", "8g")

.setJars(Seq(jarFile))

val sc = new SparkContext(conf)

XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

Next, the code uses the SparkContext to read in ratings. Recall that the rating file is a text file with “::” as the delimiter. The code parses each line to create a RDD for ratings that contains (Int, Rating) pairs. We only keep the last digit of the timestamp as a random key. The Rating class is a wrapper around tuple (user: Int, product: Int, rating: Double) defined in org.apache.spark.mllib.recommendation package.


1

2

3

4

5

6

7

val movieLensHomeDir = "hdfs://" + masterHostname + ":9000" + args(0)

val ratings = sc.textFile(movieLensHomeDir + "/ratings.dat").map { line =>

val fields = line.split("::")

// format: (timestamp % 10, Rating(userId, movieId, rating))

(fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))

}

XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

Next, the code read in movie ids and titles, collect them into a movie id to title map.


1

2

3

4

5

val movies = sc.textFile(movieLensHomeDir + "/movies.dat").map { line =>

val fields = line.split("::")

// format: (movieId, movieName)

(fields(0).toInt, fields(1))

}.collect.toMap

XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

Now, let’s make our first edit to add code to get a summary of the ratings.


1

2

3

4

5

6

val numRatings = ratings.count

val numUsers = ratings.map(_._2.user).distinct.count

val numMovies = ratings.map(_._2.product).distinct.count

println("Got " + numRatings + " ratings from "

+ numUsers + " users on " + numMovies + " movies.")

XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

4. Running the program

Before we compute movie recommendations, here is a quick reminder on how you can run the program at any point during this exercise. Save the MovieLensALS file run the following commands:

cd /root/machine-learning/scala
# change the folder name from "medium" to "large" to run on the large data set
sbt/sbt package "run /movielens/medium"

This command will compile the MovieLensALS class and create a JAR file in /root/machine-learning/scala/target/scala-2.10/. Finally, it will run the program. You should see output similar to the following on your screen:

Got 1000209 ratings from 6040 users on 3706 movies.

5. Rating elicitation

To make recommendation for you, we are going to learn your taste by asking you to rate a few movies. The movies should be popular ones to increase the chance of receiving ratings from you. To do this, we need to count ratings received for each movie and sort movies by rating counts. Then, take the top, e.g., 50, most rated movies and sample a small subset for rating elicitation.

View Solution


1

2

3

4

5

6

7

8

9

10

val mostRatedMovieIds = ratings.map(_._2.product) // extract movie ids

.countByValue      // count ratings per movie

.toSeq             // convert map to Seq

.sortBy(- _._2)    // sort by rating count

.take(50)          // take 50 most rated

.map(_._1)         // get their ids

val random = new Random(0)

val selectedMovies = mostRatedMovieIds.filter(x => random.nextDouble() < 0.2)

.map(x => (x, movies(x)))

.toSeq

XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

Then for each of the selected movies, we will ask you to give a rating (1-5) or 0 if you have never watched this movie. The method eclicitateRatings returns your ratings, where you receive a special user id 0. The ratings are converted to a RDD[Rating] instance via sc.parallelize.


1

2

val myRatings = elicitateRatings(selectedMovies)

val myRatingsRDD = sc.parallelize(myRatings)

XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

When you run the application, you should see prompt similar to the following:

Please rate the following movie (1-5 (best), or 0 if not seen):
Raiders of the Lost Ark (1981):

6. Splitting training data

We will use MLlib’s ALS to train a MatrixFactorizationModel, which takes a RDD[Rating] object as input. ALS has training parameters such as rank for matrix factors and regularization constants. To determine a good combination of the training parameters, we split the data into three non-overlapping subsets, named training, test, and validation, based on the last digit of the timestamp, and cache them. We will train multiple models based on the training set, select the best model on the validation set based on RMSE (Root Mean Squared Error), and finally evaluate the best model on the test set. We also add your ratings to the training set to make recommendations for you. We hold the training, validation, and test sets in memory by calling persist because we need to visit them multiple times.


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

val numPartitions = 20

val training = ratings.filter(x => x._1 < 6)

.values

.union(myRatingsRDD)

.repartition(numPartitions)

.persist

val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8)

.values

.repartition(numPartitions)

.persist

val test = ratings.filter(x => x._1 >= 8).values.persist

val numTraining = training.count

val numValidation = validation.count

val numTest = test.count

println("Training: " + numTraining + ", validation: " + numValidation + ", test: " + numTest)

XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

After the split, you should see

Training: 602251, validation: 198919, test: 199049.

7. Training using ALS

In this section, we will use ALS.train to train a bunch of models, and select and evaluate the best.  Among the training paramters of ALS, the most important ones are rank, lambda (regularization constant), and number of iterations. The train method of ALS we are going to use is defined as the following:


1

2

3

4

5

6

7

object ALS {

def train(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double)

: MatrixFactorizationModel = {

// ...

}

}

XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

Ideally, we want to try a large number of combinations of them in order to find the best one. Due to time constraint, we will test only 8 combinations resulting from the cross product of 2 different ranks (8 and 12), 2 different lambdas (1.0 and 10.0), and two different numbers of iterations (10 and 20). We use the provided method computeRmse to compute the RMSE on the validation set for each model. The model with the smallest RMSE on the validation set becomes the one selected and its RMSE on the test set is used as the final metric.

View Solution


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

val ranks = List(8, 12)

val lambdas = List(0.1, 10.0)

val numIters = List(10, 20)

var bestModel: Option[MatrixFactorizationModel] = None

var bestValidationRmse = Double.MaxValue

var bestRank = 0

var bestLambda = -1.0

var bestNumIter = -1

for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {

val model = ALS.train(training, rank, numIter, lambda)

val validationRmse = computeRmse(model, validation, numValidation)

println("RMSE (validation) = " + validationRmse + " for the model trained with rank = "

+ rank + ", lambda = " + lambda + ", and numIter = " + numIter + ".")

if (validationRmse < bestValidationRmse) {

bestModel = Some(model)

bestValidationRmse = validationRmse

bestRank = rank

bestLambda = lambda

bestNumIter = numIter

}

}

val testRmse = computeRmse(bestModel.get, test, numTest)

println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda

+ ", and numIter = " + bestNumIter + ", and its RMSE on the test set is " + testRmse + ".")

XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

Spark might take a minute or two to train the models. You should see the following on the screen:

The best model was trained using rank 8 and lambda 10.0, and its RMSE on test is 0.8808492431998702.

8. Recommending movies for you

As the last part of our tutorial, let’s take a look at what movies our model recommends for you. This is done by generating (0, movieId) pairs for all movies you haven’t rated and calling the model’s predict method to get predictions. Recall that 0 is the special user id assigned to you.


1

2

3

4

5

class MatrixFactorizationModel {

def predict(userProducts: RDD[(Int, Int)]): RDD[Rating] = {

// ...

}

}

XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

After we get all predictions, let us list the top 50 recommendations and see whether they look good to you.

View Solution


1

2

3

4

5

6

7

8

9

10

11

12

13

14

val myRatedMovieIds = myRatings.map(_.product).toSet

val candidates = sc.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq)

val recommendations = bestModel.get

.predict(candidates.map((0, _)))

.collect

.sortBy(-_.rating)

.take(50)

var i = 1

println("Movies recommended for you:")

recommendations.foreach { r =>

println("%2d".format(i) + ": " + movies(r.product))

i += 1

}

XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

The output should be similar to

Movies recommended for you:
 1: Silence of the Lambs, The (1991)
 2: Saving Private Ryan (1998)
 3: Godfather, The (1972)
 4: Star Wars: Episode IV - A New Hope (1977)
 5: Braveheart (1995)
 6: Schindler‘s List (1993)
 7: Shawshank Redemption, The (1994)
 8: Star Wars: Episode V - The Empire Strikes Back (1980)
 9: Pulp Fiction (1994)
10: Alien (1979)
...

YMMV, and don’t expect to see movies from this decade, becaused the data set is old.

9. Exercises

9.1. Comparing to a naive baseline

Does ALS output a non-trivial model? We can compare the evaluation result with a naive baseline model that only outputs the average rating (or you may try one that outputs the average rating per movie). Computing the baseline’s RMSE is straightforward:

View Solution


1

2

3

4

5

val meanRating = training.union(validation).map(_.rating).mean

val baselineRmse = math.sqrt(test.map(x => (meanRating - x.rating) * (meanRating - x.rating))

.reduce(_ + _) / numTest)

val improvement = (baselineRmse - testRmse) / baselineRmse * 100

println("The best model improves the baseline by " + "%1.2f".format(improvement) + "%.")

XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

The output should be similar to

The best model improves the baseline by 20.96%.

It seems obvious that the trained model would outperform the naive baseline. However, a bad combination of training parameters would lead to a model worse than this naive baseline. Choosing the right set of parameters is quite important for this task.

9.2. Augmenting matrix factors

In this tutorial, we add your ratings to the training set. A better way to get the recommendations for you is training a matrix factorization model first and then augmenting the model using your ratings. If this sounds interesting to you, you can take a look at the implementation of MatrixFactorizationModel and see how to update the model for new users and new movies.

10. Solution code

In case you want to see your recommendation first or the complete source code, we put the solution under /root/machine-learning/scala/solution.

时间: 2024-11-08 19:25:06

movie-recommendation-with-mllib的相关文章

MLlib-协同过滤

协同过滤 显示vs隐式反馈 参数调整 实例 教程 协同过滤 协同过滤是推荐系统的常用方法.可以填充user-item相关矩阵中的缺失值.MLlib支持基于模型的协同过滤,即使用能够预测缺失值的一个隐藏因素集合来表示用户和产品.MLlib使用交替做小二乘法(alternating least squares, ALS)学习隐藏因子.MLlib算法中的参数如下: numBlocks   并行计算的block数(-1为自动配置) rank   模型中隐藏因子数 iterations   算法迭代次数

基于Spark MLlib平台的协同过滤算法---电影推荐系统

基于Spark MLlib平台的协同过滤算法---电影推荐系统 又好一阵子没有写文章了,阿弥陀佛...最近项目中要做理财推荐,所以,回过头来回顾一下协同过滤算法在推荐系统中的应用. 说到推荐系统,大家可能立马会想到协同过滤算法.本文基于Spark MLlib平台实现一个向用户推荐电影的简单应用.其中,主要包括三部分内容: 协同过滤算法概述 基于模型的协同过滤应用---电影推荐 实时推荐架构分析     一.协同过滤算法概述 本人对算法的研究,目前还不是很深入,这里简单的介绍下其工作原理. 通常,

Spark Sreaming与MLlib机器学习

Spark Sreaming与MLlib机器学习 本来这篇是准备5.15更的,但是上周一直在忙签证和工作的事,没时间就推迟了,现在终于有时间来写写Learning Spark最后一部分内容了. 第10-11 章主要讲的是Spark Streaming 和MLlib方面的内容.我们知道Spark在离线处理数据上的性能很好,那么它在实时数据上的表现怎么样呢?在实际生产中,我们经常需要即使处理收到的数据,比如实时机器学习模型的应用,自动异常的检测,实时追踪页面访问统计的应用等.Spark Stream

Spark之MLLib学习

基于Spark On Yarn的淘宝数据挖掘平台:http://www.doc88.com/p-7804379529208.html Spark之MLLib机器学习库:http://blog.csdn.net/johnny_lee/article/details/25656343 Spark之ALS(推荐系统)学习文档:http://spark.apache.org/docs/0.9.0/api/mllib/index.html#org.apache.spark.mllib.recommenda

基于MLlib的机器学习--协同过滤与推荐

<Spark快速大数据分析> 11.5.4 协同过滤与推荐 协同过滤是一种根据用户对各种产品的交互与评分来推荐新产品的推荐系统技术. 协同过滤引入的地方就在于它只需要输入一系列用户/产品的交互记录: 无论是显式的交互(例如在购物网站上进行评分)还是隐式的(例如用户访问了一个 产品的页面但是没有对产品评分)交互皆可.仅仅根据这些交互,协同过滤算法就能 够知道哪些产品之间比较相似(因为相同的用户与它们发生了交互)以及哪些用户之间 比较相似,然后就可以做出新的推荐. 尽管MLlib的API使用了用户

Spark入门实战系列--8.Spark MLlib(上)--机器学习及SparkMLlib简介

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.机器学习概念 1.1 机器学习的定义 在维基百科上对机器学习提出以下几种定义: l“机器学习是一门人工智能的科学,该领域的主要研究对象是人工智能,特别是如何在经验学习中改善具体算法的性能”. l“机器学习是对能通过经验自动改进的计算机算法的研究”. l“机器学习是用数据或以往的经验,以此优化计算机程序的性能标准.” 一种经常引用的英文定义是:A computer program is said

协同过滤算法 R/mapreduce/spark mllib多语言实现

用户电影评分数据集下载 http://grouplens.org/datasets/movielens/ 1) Item-Based,非个性化的,每个人看到的都一样 2) User-Based,个性化的,每个人看到的不一样 对用户的行为分析得到用户的喜好后,可以根据用户的喜好计算相似用户和物品,然后可以基于相似用户或物品进行推荐.这就是协同过滤中的两个分支了,基于用户的和基于物品的协同过滤. 在计算用户之间的相似度时,是将一个用户对所有物品的偏好作为一个向量,而在计算物品之间的相似度时,是将所有

spark Using MLLib in Scala/Java/Python

Using MLLib in ScalaFollowing code snippets can be executed in spark-shell. Binary ClassificationThe following code snippet illustrates how to load a sample dataset, execute a training algorithm on this training data using a static method in the algo

Spark MLlib(下)--机器学习库SparkMLlib实战

1.MLlib实例 1.1 聚类实例 1.1.1 算法说明 聚类(Cluster analysis)有时也被翻译为簇类,其核心任务是:将一组目标object划分为若干个簇,每个簇之间的object尽可能相似,簇与簇之间的object尽可能相异.聚类算法是机器学习(或者说是数据挖掘更合适)中重要的一部分,除了最为简单的K-Means聚类算法外,比较常见的还有层次法(CURE.CHAMELEON等).网格算法(STING.WaveCluster等),等等. 较权威的聚类问题定义:所谓聚类问题,就是给

Spark入门实战系列--8.Spark MLlib(下)--机器学习库SparkMLlib实战

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.MLlib实例 1.1 聚类实例 1.1.1 算法说明 聚类(Cluster analysis)有时也被翻译为簇类,其核心任务是:将一组目标object划分为若干个簇,每个簇之间的object尽可能相似,簇与簇之间的object尽可能相异.聚类算法是机器学习(或者说是数据挖掘更合适)中重要的一部分,除了最为简单的K-Means聚类算法外,比较常见的还有层次法(CURE.CHAMELEON等).网