基于Spark ALS在线推荐系统

所用技术:

Bootstrap、flat-ui 、 Servlet、Spark1.4.1、Hadoop2.6.0、JDK

说明:本系统不涉及ssh相关内容,只有简单的Servlet和JSP、HTML页面,系统架构相对简单。

系统部署:

1. 拷贝spark-assembly-1.4.1-hadoop2.6.0.jar到WebContent/WEB-INF/lib目录;
(spark-assembly-1.4.1-hadoop2.6.0.jar文件由原生spark-assembly-1.4.1-hadoop2.6.0.jar删除javax/servlet包获得,由于太大,所以就没有上传了);
2. 拷贝原生spark-assembly-1.4.1-hadoop2.6.0.jar文件到HDFS(目录和代码中一致);
3. 拷贝WebContent/WEB-INF/lib目录中的Spark141-als.jar到HDFS(目录和代码中保持一致);
4. 拷贝Hadoop集群(调用所使用的集群,每个人不一样)配置文件yarn-site.xml到HDFS(目录和代码中保持一致);

5. 修改相关配置文件,由于hadoop相关配置、系统的一些属性需要修改为实际的配置及属性,所以针对这些需要进行修改(后面版本中会对此单独一个配置文件),例如:

系统使用数据为movielens上面的数据,下载地址为:http://grouplens.org/datasets/movielens/ ,本测试使用的数据是:

可以根据自己集群的实际情况选择下载数据集的大小。

代码下载地址:https://github.com/fansy1990/movie_recommend ;

系统界面及相关功能实现

1. 系统首页

系统首页如下图所示:

首页直接使用bootstrap的tab界面,分为三栏,分别对应:首页介绍、算法调用和推荐;

2. 初始化后台任务

在启动tomcat的时候,后台会打印相关日志:

信息: Starting Servlet Engine: Apache Tomcat/7.0.52
initial begin...
2016-08-23 12:33:28,189 WARN [org.apache.hadoop.util.NativeCodeLoader] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2016-08-23 12:33:29,836 INFO [util.Utils] - Movies data size:3883
2016-08-23 12:33:33,638 INFO [util.Utils] - Users data size:6040
initial end!
八月 23, 2016 12:33:33 下午 org.apache.coyote.AbstractProtocol start

这里是初始化的相关打印,初始化使用InitServlet,在里面调用了Utils的init方法,init方法主要初始化了movies变量和userWithRatedMovies变量和allMovieIds变量,各个变量表示意思如下:

  • movies:所有的电影ID和电影所有相关信息的Map映射;
  • userWithRatedMovies:用户ID和当前用户所有评分过的电影ID集合的Map映射;
  • allMovieIds:所有电影ID的Set集合;

这里看到初始化的电影有3883个,而用户数有6040个;

3. 建模前台与后台功能实现

建模界面如下所示:

用户输入或选择对应的参数,即可点击“建模”,提交Spark ALS任务到YARN(Hadoop集群),进行算法调用。

建模流程:

  1. 用户输入相关算法参数后,点击建模;
  2. 后台RunALS Servlet获取提交的算法参数,封装Spark ALS算法,然后提交给YARN;
  3. YARN在分配了相关资源后,会返回一个任务ID:applicationID,这时启动一个线程,专门获取该applicationId的任务进度,更新全局allAppStatus变量(Map变量<applicationId,任务状态>),后台返回前台此applicationId;
  4. 前台获取到此applicationId后,如果获取的applicationId为Null,那么就会弹出一个模态框提示建模提交任务失败;否则,会弹出一个进度条模态框(此进度条模态框下面会有详细介绍);
  5. 前台启动定时任务,去后台获取全局allAppStatus变量对应applicationId的状态,返回前台,更新进度条模态框对应进度;
  6. 一直到任务成功或失败,给出对应的提示;

3.1进度条模态框实现

1. 进度条模态框div定义如下:

<div class="modal fade" id="myModal1" tabindex="-1" role="dialog"
							aria-labelledby="myModalLabel">
							<div class="modal-dialog" role="document">
								<div class="modal-content">
									<div class="progress progress-striped active"
										style="margin-bottom: 0px; height: 25px; border-radius: 5px;">
										<div id="progressId" class="progress-bar"
											style="width: 1%; height: 100%;">0%</div>
									</div>
								</div>
							</div>
						</div>

全部使用bootstrap的基本样式;

2. 弹出该模态框以及更新进度相关代码:

// 弹出窗提示程序正在运行
				setProgress("progressId", "0%");

				// 开启进度条模态框
				openModal("myModal1");

				// 定时请求任务进度
				t=setTimeout("queryTaskProgress(‘"+ret+"‘)",1000);

相关函数:

/**
 * 设置进度条
 * @param id
 * @param value
 */
function setProgress(id,value){
	$("#"+id).css("width",value);
	$("#"+id).html(value);
}

/**
 * 开启模态框
 * @param id
 */
function openModal(id){
	$(‘#‘+id).on(‘show.bs.modal‘, function(){
        var $this = $(this);
        var $modal_dialog = $this.find(‘.modal-dialog‘);
        // 关键代码,如没将modal设置为 block,则$modala_dialog.height() 为零
        $this.css(‘display‘, ‘block‘);
        $modal_dialog.css({‘margin-top‘: Math.max(0, ($(window).height() - $modal_dialog.height()) / 2) });
   });
	$(‘#‘+id).modal({backdrop: ‘static‘, keyboard: false});
}

定时函数,查看进度:

/**
	 * 请求任务进度
	 */
function queryTaskProgress(appId){
	// ajax 发送请求获取任务运行状态,如果返回运行失败或成功则关闭弹框
	$.ajax({
		type : "POST",
		url : "Monitor",
//			dataType : "json",
		async:false,// 同步执行
		data:{APPID:appId},
		success : function(data) {
//			console.info("success:"+data);
			if(data.indexOf("%")==-1){// 不包含 ,任务运行完成(失败或成功)
				clearTimeout(t);// 关闭计时器
				// 关闭弹窗进度条
				$(‘#myModal1‘).modal("hide");
				// 开启提示条模态框

				$(‘#tipId‘).html(data=="FINISHED"?"模型训练完成!":
					(data=="FAILED"?"调用建模失败!":"模型训练被杀死!"));

				openModal("myModal2");
				console.info("closed!");
				return ;
			}

			setProgress("progressId", data);
			// 进度查询每次间隔1500ms
			t=setTimeout("queryTaskProgress(‘"+appId+"‘)",1500);
		},
		error: function(data){
			console.info("error"+data);

		}
	});
}

3.2进度条模态框效果

3.2 Eclipse提交Spark任务到YARN后台实现

提交任务参考了部分Spark源码实现,下面是代码:

1. 封装Spark ALS算法程序,准备提交任务到YARN;

String[] runArgs=new String[]{
                "--name","ALS Model Train ",
                "--class","als.ALSModelTrainer",
                "--driver-memory","512m",
                "--num-executors", "2",
                "--executor-memory", "512m",
                "--jar","hdfs://master:8020/user/root/Spark141-als.jar",//
                "--files","hdfs://master:8020/user/root/yarn-site.xml",
                "--arg",input,
                "--arg",output,
                "--arg",train_percent,
                "--arg",ranks,
                "--arg",lambda,
                "--arg",iteration
        };
		FileSystem.get(Utils.getConf()).delete(new Path(output), true);
		return Utils.runSpark(runArgs);

(注意:1. 这里的部分参数应该是需要隔离到配置文件里面的,比如--class 或--driver-memory的值等;2. 本来在allAppStatus中设置的是一个全局变量,所以我本意是可以多用户提交任务,进而监控也是分开的,但是这里会有个问题,就是模型的输出目录,这个应该是需要和用户挂钩,同时在建模的时候,每个用户的推荐也需要采用各自对应的模型,但是目前来说,这个功能有点复杂,暂时就考虑一个用户,一个模型;)

2. 提交Spark任务到YARN,同时开启对应监控,更新任务状态

/**
	 * 调用Spark 加入监控模块
	 *
	 * @param args
	 * @return Application ID字符串
	 */
	public static String runSpark(String[] args) {
		try {
			System.setProperty("SPARK_YARN_MODE", "true");
			SparkConf sparkConf = new SparkConf();
			sparkConf.set("spark.yarn.jar", "hdfs://master:8020/user/root/spark-assembly-1.4.1-hadoop2.6.0.jar");
			sparkConf.set("spark.yarn.scheduler.heartbeat.interval-ms", "1000");

			ClientArguments cArgs = new ClientArguments(args, sparkConf);

			Client client = new Client(cArgs, getConf(), sparkConf);
			// client.run(); // 去掉此种调用方式,改为有监控的调用方式

			/**
			 * 调用Spark ,含有监控
			 */
			ApplicationId appId = null;
			try{
				appId = client.submitApplication();
			}catch(Throwable e){
				e.printStackTrace();
				//  返回null
				return null;
			}
			// 开启监控线程
			updateAppStatus(appId.toString(),"2%" );// 提交任务完成,返回2%
			log.info(allAppStatus.toString());
			new Thread(new MonitorThread(appId,client)).start();
			return appId.toString();
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}
	}

之前直接使用Client的run方法,提交任务,但是这样就获取不到applicationId,如下图

所以就去掉这种方式,参考Client中的run方法的具体实现,编写对应代码来进行任务提交;(需要注意这种提交方式,当任务失败或完成后,需要删除相关临时文件);

后台监控:

相关代码,在更新任务状态时进行:

// 完成/ 失败/杀死
			if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.FAILED
					|| state == YarnApplicationState.KILLED) {
				Utils.cleanupStagingDir(appId);
				// return (state, report.getFinalApplicationStatus);
				//  更新 app状态
				log.info("Thread:"+Thread.currentThread().getName()+
						appId.toString()+"完成,任务状态是:"+state.name());
				Utils.updateAppStatus(appId.toString(), state.name());
				return;
			}

该代码在MonitorThread中;(但是,需要注意的是,如果Spark任务正在运行,那么这时关闭Tomcat,就会导致相关临时文件删除不了,为什么?请大家自己思考)

4. 推荐前台与后台功能实现

4.1 推荐页面前台

前台界面如下:

前台有两个功能,一个功能是输入用户ID,查询出当前用户ID评分过的电影信息;一个功能是根据用户ID和推荐个数,对用户进行电影推荐;

查询功能结果:

这里需要注意,评分全部为零,这个是因为在userRatedMovieIds这个变量中存储的只是用户的评分过的电影ID,并没有附加评分,所以可以在这个地方进行修改,以显示正确的电影评分(同时,这里的查询,也可以把所有信息存储在HBase中,进行查询);

推荐功能结果:

推荐功能展示的结果,是按照推荐分降序排列的;

不管是查询还是推荐,前台直接使用一个div来接收这些信息:

<div class="col-md-10 col-md-offset-1" id="movieResultId">

接着使用AJax获取后台对应的数据进行拼接,在赋值给div:

// 绑定推荐button
	$("#recommendId").click(function(){
		var userId = $(‘#userId‘).val();
		var recommendNum = $(‘#recommendNumId‘).val();
		var ret =null;
		$.ajax({
			type : "POST",
			url : "Recommend",
			async:false,// 同步执行
			data : {userId:userId,flag:"recommend",recommendNum:recommendNum},
//			dataType : "json",
			success : function(data) {// data 返回appId
				ret = data;
			},
			error: function(data){
				console.info("error"+data);
				ret = data=="null"?"null":data;
			}
		});

		var showResultHtml = ‘<br>‘+
							‘<p>数据如下:</p>‘+
							‘<div class="table-responsive">‘ +
								‘<table class="table table-striped">‘ +
									‘<thead>‘+
										‘<tr>‘+
											‘<th>MovieId</th>‘+
											‘<th>电影名</th>‘+
											‘<th>标签</th>‘+
											‘<th>推荐分</th>‘+
										‘</tr>‘+
									‘</thead>‘+
									‘<tbody>‘+
									ret +
									‘</tbody>‘+
								‘</table>‘+
							‘</div>‘;
		$(‘#movieResultId‘).html(showResultHtml);
	});

4.2 推荐页面后台

推荐页面的查询,只是简单的Map的数据获取而已;重点是推荐功能。

推荐功能最开始我想的是直接保存Spark ALS的模型,然后调用Spark ALS模型的predict(user,product),即可直接得到用户的推荐分,但是这样是不行的:

参考:http://stackoverflow.com/questions/34288435/using-java-for-running-mllib-model-with-streaming ;Spark里面的模型有些是本地的有些是分布式的,如果是分布式的,那么是不能执行类似predict操作的,而Spark ALS的模型MatrixFactorizationModel 是分布式的,所以不能够直接执行predict操作。这里同样是参考Spark的源码,来进行的。

在建模完成后,把Spark ALS模型的两个参数userFeatures、productFeatures分别存入HDFS,然后在模型推荐的时候把其加载进内存,使用userFeatures和productFeatures两个变量即可完成推荐:

/**
	 * 预测 如果没有初始化,则进行初始化
	 *
	 * @param uid
	 * @param recNum
	 * @return
	 * @throws NoSuchMethodException
	 * @throws InvocationTargetException
	 * @throws InstantiationException
	 * @throws IllegalAccessException
	 */
	public static List<Movie> predict(int uid,int recNum) throws IllegalAccessException, InstantiationException, InvocationTargetException, NoSuchMethodException {
		if (userFeatures.size() <= 0 || productFeatures.size() <= 0) {
			try {
				userFeatures = getModelFeatures(userFeaturePath);
				productFeatures = getModelFeatures(productFeaturePath);
			} catch (IOException e) {
				return null;
			}
			if (userFeatures.size() <= 0 || productFeatures.size() <= 0) {
				System.err.println("模型加载失败!");
				return null;
			}
		}

		// 使用模型进行预测
		// 1. 找到uid没有评价过的movieIds
		Set<Integer> candidates = Sets.difference((Set<Integer>) allMovieIds, userWithRatedMovies.get(uid));

		// 2. 构造推荐排序堆栈
		FixSizePriorityQueue<Movie> recommend = new FixSizePriorityQueue<Movie>(recNum);
		Movie movie = null;
		double[] pFeature = null;
		double[] uFeature = userFeatures.get(uid);
		double score = 0.0;
		BLAS blas = BLAS.getInstance();
		for (int candidate : candidates) {
			movie = movies.get(candidate).deepCopy();
			pFeature = productFeatures.get(candidate);
			if (pFeature == null)
				continue;
			score = blas.ddot(pFeature.length, uFeature, 1, pFeature, 1);
			movie.setRated((float) score);
			recommend.add(movie);
		}

		return recommend.sortedList();
	}

中间的score= blas.ddot就是计算推荐分的;

总结

1. 基本完成相关推荐系统功能;

2. 相关参数需要额外添加配置文件,而不是直接硬编码到代码中;

3. 推荐只能针对已经存在的用,不能进行匿名推荐(同时使用SPark ALS模型推荐的结果基本一样,这个是Spark的bug?还是调用哪里有问题?);

4. 添加多用户调用支持;

5. 查询用户评分过的功能完善(对应评分获取);

分享,成长,快乐

脚踏实地,专注

转载请注明blog地址:http://blog.csdn.net/fansy1990

时间: 2024-11-03 04:01:39

基于Spark ALS在线推荐系统的相关文章

基于Spark ALS构建商品推荐引擎

基于Spark ALS构建商品推荐引擎 一般来讲,推荐引擎试图对用户与某类物品之间的联系建模,其想法是预测人们可能喜好的物品并通过探索物品之间的联系来辅助这个过程,让用户能更快速.更准确的获得所需要的信息,提升用户的体验.参与度以及物品对用户的吸引力. 在开始之前,先了解一下推荐模型的分类: 1.基于内容的过滤:利用物品的内容或是属性信息以及某些相似度定义,求出与该物品类似的物品 2.协同过滤:利用大量已有的用户偏好来估计用户对其未接触过的物品的喜好程度 3.矩阵分解(包括显示矩阵分解.隐式矩阵

基于Spark MLlib平台的协同过滤算法---电影推荐系统

基于Spark MLlib平台的协同过滤算法---电影推荐系统 又好一阵子没有写文章了,阿弥陀佛...最近项目中要做理财推荐,所以,回过头来回顾一下协同过滤算法在推荐系统中的应用. 说到推荐系统,大家可能立马会想到协同过滤算法.本文基于Spark MLlib平台实现一个向用户推荐电影的简单应用.其中,主要包括三部分内容: 协同过滤算法概述 基于模型的协同过滤应用---电影推荐 实时推荐架构分析     一.协同过滤算法概述 本人对算法的研究,目前还不是很深入,这里简单的介绍下其工作原理. 通常,

一个简单的在线推荐系统的实现

推荐系统,主要采用对历史数据的分析计算,得到某种模型,对未来的数据进行预测.说到底,还是分类问题. Mahout,是一个可扩展的机器学习库,可用于单机,也可用于Hadoop. Mahout的API非常简单,实现一个推荐功能只需要如下的几行代码: DataModel model = new FileDataModel(new File(file));//建立数据模型 UserSimilarity similarity = new PearsonCorrelationSimilarity(model

京东基于Spark的风控系统架构实践和技术细节

京东基于Spark的风控系统架构实践和技术细节 时间 2016-06-02 09:36:32  炼数成金 原文  http://www.dataguru.cn/article-9419-1.html 主题 Spark软件架构 1.背景 互联网的迅速发展,为电子商务兴起提供了肥沃的土壤.2014年,中国电子商务市场交易规模达到13.4万亿元,同比增长31.4%.其中,B2B电子商务市场交易额达到10万亿元,同比增长21.9%.这一连串高速增长的数字背后,不法分子对互联网资产的觊觎,针对电商行业的恶

基于Spark的异构分布式深度学习平台

导读:本文介绍百度基于Spark的异构分布式深度学习系统,把Spark与深度学习平台PADDLE结合起来解决PADDLE与业务逻辑间的数据通路问题,在此基础上使用GPU与FPGA异构计算提升每台机器的数据处理能力,使用YARN对异构资源做分配,支持Multi-Tenancy,让资源的使用更有效. 深层神经网络技术最近几年取得了巨大的突破,特别在语音和图像识别应用上有质的飞跃,已经被验证能够使用到许多业务上.如何大规模分布式地执行深度学习程序,使其更好地支持不同的业务线成为当务之急.在过去两年,百

走在大数据的边缘 基于Spark的机器学习-智能客户系统项目实战(项目实战)

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程 视频课程包含: 38套大数据和人工智能精品高级课包含:大数据,云计算,架构,数据挖掘实战,实时推荐系统实战,电视收视率项目实战,实时流统计项目实战,离线电商分析项目实战,Spark大型项目实战用户分析,智能客户系统项目实战,Linux基础,Hadoop,Spark,Storm,Docker,Mapreduce,Kafka,Flume,OpenStack,Hiv

基于大数据技术推荐系统算法案例实战视频教程(项目实战)

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程 视频课程包含: 38套大数据和人工智能精品高级课包含:大数据,云计算,架构,数据挖掘实战,实时推荐系统实战,电视收视率项目实战,实时流统计项目实战,离线电商分析项目实战,Spark大型项目实战用户分析,智能客户系统项目实战,Linux基础,Hadoop,Spark,Storm,Docker,Mapreduce,Kafka,Flume,OpenStack,Hiv

基于spark排序的一种更廉价的实现方案-附基于spark的性能测试

排序可以说是很多日志系统的硬指标(如按照时间逆序排序),如果一个大数据系统不能进行排序,基本上是这个系统属于不可用状态,排序算得上是大数据系统的一个"刚需",无论大数据采用的是hadoop,还是spark,还是impala,hive,总之排序是必不可少的,排序的性能测试也是必不可少的. 有着计算奥运会之称的Sort Benchmark全球排序每年都会举行一次,每年巨头都会在排序上进行巨大的投入,可见排序速度的高低有多么重要!但是对于大多数企业来说,动辄上亿的硬件投入,实在划不来.甚至远

UserView--第二种方式(避免第一种方式Set饱和),基于Spark算子的java代码实现

UserView--第二种方式(避免第一种方式Set饱和),基于Spark算子的java代码实现 测试数据 java代码 1 package com.hzf.spark.study; 2 3 import java.util.Map; 4 import java.util.Set; 5 6 import org.apache.spark.SparkConf; 7 import org.apache.spark.api.java.JavaPairRDD; 8 import org.apache.s