[转] - spark推荐 - 从50多分钟到3分钟的优化

原文地址

从50多分钟到3分钟的优化

某推荐系统需要基于Spark用ALS算法对近一天的数据进行实时训练, 然后进行推荐. 输入的数据有114G, 但训练时间加上预测的时间需要50多分钟, 而业务的要求是在15分钟左右, 远远达不到实时推荐的要求, 因此, 我们与业务侧一起对Spark应用进行了优化.

另外提一下, 该文最好与之前我写的另一篇blog < Spark + Kafka 流计算优化 > 一起看, 因为一些细节我不会再在该文中描述.

优化分析

从数据分析, 虽然数据有114G, 但ALS的模型训练时间并不长, 反而是数据加载和ALS预测所占用的时间较长. 因此我们把重点放在这两点的优化中.

从Spark的Web UI可以看到, 数据加载的job中task的数量较多, 较小. 模型预测的job也是有这样的问题, 因此, 可以猜测并行度过大造成了集群的协调负荷过重.

仅降低并行度和优化JVM 参数

我们用常见的”rdd.repartitionBy”把并行度降低后, 作业计算耗时减少并不明显, 且在模型预测的job中会有executor死掉的现象. 进而查看日志, 发现是内存占用过多, yarn把Spark应用给kill了.

为解决该现象, 加入这些JVM参数: “ -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:MaxTenuringThreshold=72 -XX:NewRatio=2 -XX:SurvivorRatio=6 -XX:+CMSParallelRemarkEnabled -XX:+ParallelRefProcEnabled -XX:+CMSPermGenSweepingEnabled -XX:+CMSClassUnloadingEnabled -XX:MaxTenuringThreshold=31 -XX:SurvivorRatio=8 -XX:+ExplicitGCInvokesConcurrent -XX:+ExplicitGCInvokesConcurrentAndUnloadsClasses -XX:+AlwaysPreTouch -XX:-OmitStackTraceInFastThrow -XX:+UseCompressedStrings -XX:+UseStringCache -XX:+OptimizeStringConcat -XX:+UseCompressedOops -XX:+CMSScavengeBeforeRemark -XX:+UseBiasedLocking -XX:+AggressiveOpts " 具体说明和原因请参考本人的另一篇blog < Spark + Kafka 流计算优化 >.

笛卡尔积操作中的预先分块

作了上述例行优化后,  ALS预测步骤的耗时减少依然不明显, 为发掘原因, 我们看了下源码, 惊奇的发现在ALS预测中竟然是用了笛卡尔积操作, 114G的数据少说也有几千万行记录, 几千万行记录进行笛卡尔积, 不慢才怪吧.

还好, 我们有扩展版的ALS预测方法, 可以将数据预先分块, 而不必一行行的进行笛卡尔积, 加快了笛卡尔积的速度.

val recommendations = ExtMatrixFactorizationModelHelper.recommendProductsForUsers( model.get, numRecommendations, 420000, StorageLevel.MEMORY_AND_DISK_SER )

该方法会对model中的userFeatures和itemFeatures矩阵进行预先分块, 减少网络包的传输量和笛卡尔积的计算量.

这里的第三个参数是每一个块所包含的行数, 此处的420000表示当我们对userFeatures或itemFeatures进行分块时, 每一个块包含了矩阵的420000行.

如何计算这里的一个块要包含多少行呢, 举例如下:

由于ALS算法中设置的rank是10, 因此生成的userFeatures和itemFeatures的个数是10, 它们的每一行是(Int,Array[Double]), 其中Array.size是10.

因此可根据如下计算每行所占的空间大小:

空Array[10]的大小=16+8*10=96Byte. 数组中的元素是Double, 十个Double对象的大小是 16*10=160Byte. 作为key的Integer的大小是16Byte. 因此每行占空间96+160+16=212Byte.

另外,要计算带宽: 由于是千兆网卡,因此带宽为1Gbit,转换为Byte也就是128MByte的带宽.

考虑到” spark.akka.frameSize=100”以及网络包包头需要占的空间, 和Java的各种封装要占的空间, 我们计划让1个block就几乎占满带宽, 也就是一个block会在100MByte左右.

因此, 一个block要占 60*1024*1024/212=296766行, 因此blocksize=494611, 考虑到各个object也占内存, 因此行数定为420000左右.

在分块后ExtMatrixFactorizationModelHelper.recommendProductsForUsers中会对块进行重新分区, 以达到基于块的均匀分布.

提高文件加载速度

以前都是加载小文件, 每个文件才几M大小, 远远低于Hadoop的块大小, 使得IO频繁, 文件也频繁打开关闭, 加载速度自然就慢. 为解决该问题, 我们使用sc.wholeTextFiles(dirstr,inputSplitNum)来加载HDFS的文件到Spark中. 该方法使用Hadoop的CombineFileInputFormat把多个小文件合并成一个Split再加载到Spark中.

但其实, 加载速度对整个Job的运行效率影响不大, 效果有限.

上图,是val inputData = sc.wholeTextFiles(dirstr,80) 和RangePartition.

貌似加载速度也好不到哪儿去.第一个job用了11min

上图, 用的是val inputData = sc.textFile(dirstr)和RangeRepartition,同等情况下,加载也需要11min. (见job6)

上图,是val inputData = sc.wholeTextFiles(dirstr,120) 和RangePartition.

貌似提高了wholeTextFiles()的split数量可以提升性能, 从8.4min多(此时split为80左右)到现在的8.1min

上图,是val inputData = sc.wholeTextFiles(dirstr,220) 和RangePartition. 第一次加载提升到6.0min.

其它job由于loclity的问题,时间有所拉长, 影响不大.

上图,是val inputData = sc.wholeTextFiles(dirstr,512) 和RangePartition. 第一次加载要7.1min. 估计220个split应该是个比较好的值了. 按比例就是 220(file split数) / 27000(小文件数) = 0.8% , 该场景中每个小文件10M左右. 也就是每个file split包含123个小文件­­­­, 每个file split 1230M, 也就是约1G左右.

减少笛卡尔积计算量

回到对笛卡尔积计算的优化, 因为50分钟的计算量基本上都是耗在笛卡尔积的计算上的.

我们先看一下task的分布图, 由图看到, 数据并不是十分散列:

    猜测原因肯是通过Array.mkString.hashCode作为key并不能保证数据的均匀散列.因此, 我们disable掉了在笛卡尔积计算前的预分块时的再分区, 而是把再分区提到分块之前.

这样一来, task分布稍微均衡了一些, 但依然不甚理想. 为了合理的降低task数量和均匀task的分布, 我们进一步使用了Spark扩展版本的自动分区功能.

val userFactors = ExtSparkHelper.repartionPairRDDBySize(oldUsrFact, ThreeM)

这个方法有两个参数, 第一个是需要分区的RDD, 第二个是我们期望的每一个task的input data的大小. 一般来说, input data与计算产生的data的大小相差不大, 但笛卡尔积却不同, 有可能产生上百倍的中间数据量. 因此, 我这里设置的每个task的input data是3M, 计算产生的中间数据刚好在1G左右, 一个executor可以同时跑3个task, 也算是比较理想的.

优化后的task分布图也比较理想, 十分均匀且没有任何浪费, 如下:

因此, 这一步优化后, 原笛卡尔积的运行速度从几十分钟变为十几秒.

整个ALS应用原来跑114G数据需要20多分钟,如下图现在只需要不到4min:

试了一下跑一天的全量数据, 共231.9G, 则原来需要1个多小时,现在只要不到6分钟, 如下图:

时间: 2024-11-05 20:26:21

[转] - spark推荐 - 从50多分钟到3分钟的优化的相关文章

香港大学推荐的50本经典书籍

香港大学推荐的50本经典书籍(图文版) 发布:2014.01.13 ┊ 分类: 阅读 ┊ 标签: 人生管理 ┊ 19344 人浏览 关注一下我们的微信吧,扫描二维码,微信号:vikilife 前几天微奇生活跟大家分享过香港大学推荐的50本经典书籍,有人做出了图文版.带简介的版本可以戳这里

2016最新 wamp2.5+windows 10安装CoedSgniffer代码格式检查:5分钟安装 30分钟入门和浏览常用命令

14:59 2016/1/112016最新 wamp2.5+windows 10安装CoedSgniffer代码格式检查:注意问题:1.手动安装2.5.0和pear安装方式都成功但是执行时无任何反映,最终发现问题是版本问题,最后手动安装2.4.0就成功了!下载地址:http://pear.php.net/package/PHP_CodeSniffer/download/2.4.02.加入环境变量:d:\dev\tools\PHP_CodeSniffer-2.4.03.下面的 @[email pr

Oracle 5分钟或30分钟分割方法

在最近项目中,有一个客户需求是针对每天所有时间点的数据,分割成每5分钟展示一个用户数总数. 数据情景是: 一个游戏中所有用户在线的时间数据(当然简单的求和,可能会有重复数据).但在这重点是Oracle  SQL 中用于按照一定时间间隔分割的方法,具体5分钟分割实例如下: SELECT tt.reasonContent,to_char(tt.day_id,'hh24:mi')daytime  ,tt.num FROM (   SELECT ll.day_id,ll.reasonContent,CO

谜题35:一分钟又一分钟

下面的程序在模仿一个简单的时钟.它的循环变量表示一个毫秒计数器,其计数值从0开始直至一小时中包含的毫秒数.循环体以定期的时间间隔对一个分钟计数器执行增量操作.最后,该程序将打印分钟计数器.那么它会打印出什么呢? public class Clock { public static void main(String[] args) { int minutes = 0; for (int ms = 0; ms < 60*60*1000; ms++) if (ms % 60*1000 == 0) mi

hadoop的mapReduce和Spark的shuffle过程的详解与对比及优化

https://blog.csdn.net/u010697988/article/details/70173104 大数据的分布式计算框架目前使用的最多的就是hadoop的mapReduce和Spark,mapReducehe和Spark之间的最大区别是前者较偏向于离线处理,而后者重视实现性,下面主要介绍mapReducehe和Spark两者的shuffle过程. MapReduce的Shuffle过程介绍 Shuffle的本义是洗牌.混洗,把一组有一定规则的数据尽量转换成一组无规则的数据,越随

30分钟带你熟练性能优化的那点儿事儿(案例说明)

前言 性能优化是数据库运维人员和中.高级软件开发人员的必备技能,很多时候老司机和新司机的区别就在写出的东西是否优化. 博主接触过近千家客户的系统,这些系统都存在着各种各样的性能问题.那么如何透彻的了解我们的数据库性能问题?今天就用一个案例来说明性能优化的那点儿事儿. PS:很多技术人员对优化有一套自己的理解,在阅读本文前请放下你自己的理解. 正所谓:跟着博主不迷路,博主带你上高速! 点开案例跟着博主的思路看看优化这些事儿 : 本文案例Demo 了解系统环境 优化首先要知道数据库在一个什么样的硬件

用Spark学习矩阵分解推荐算法

在矩阵分解在协同过滤推荐算法中的应用中,我们对矩阵分解在推荐算法中的应用原理做了总结,这里我们就从实践的角度来用Spark学习矩阵分解推荐算法. 1. Spark推荐算法概述 在Spark MLlib中,推荐算法这块只实现了基于矩阵分解的协同过滤推荐算法.而基于的算法是FunkSVD算法,即将m个用户和n个物品对应的评分矩阵M分解为两个低维的矩阵:$$M_{m \times n}=P_{m \times k}^TQ_{k \times n}$$ 其中k为分解成低维的维数,一般远比m和n小.如果大

Spark日志分析项目Demo(9)--常规性能调优

一 分配更多资源 分配更多资源:性能调优的王道,就是增加和分配更多的资源,性能和速度上的提升,是显而易见的:基本上,在一定范围之内,增加资源与性能的提升,是成正比的:写完了一个复杂的spark作业之后,进行性能调优的时候,首先第一步,我觉得,就是要来调节最优的资源配置:在这个基础之上,如果说你的spark作业,能够分配的资源达到了你的能力范围的顶端之后,无法再分配更多的资源了,公司资源有限:那么才是考虑去做后面的这些性能调优的点. 问题: 1.分配哪些资源? 2.在哪里分配这些资源? 3.为什么

Spark性能优化指南——基础篇

前言 在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的功能涵盖了大数据领域的离线批处理.SQL类处理.流式/实时计算.机器学习.图计算等各种不同类型的计算操作,应用范围与前景非常广泛.在美团•大众点评,已经有很多同学在各种项目中尝试使用Spark.大多数同学(包括笔者在内),最初开始尝试使用Spark的原因很简单,主要就是为了让大数据计算作业的执行速度更快.性能更高. 然而,通过Spark开发出高性能的大数据计算作业,并不是那么简单的.如果没有对Spar