spark job运行参数优化

一、问题

使用spark join两张表(5000w*500w)总是出错,报的异常显示是在shuffle阶段。

14/11/27 12:05:49 ERROR storage.DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /hadoop/application_1415632483774_448143/spark-local-20141127115224-9ca8/04/shuffle_1_1562_27
java.io.FileNotFoundException: /hadoop/application_1415632483774_448143/spark-local-20141127115224-9ca8/04/shuffle_1_1562_27 (No such file or directory)
        at java.io.FileOutputStream.open(Native Method)
        at java.io.FileOutputStream.<init>(FileOutputStream.java:212)
        at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(BlockObjectWriter.scala:178)
        at org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$revertWrites$1.apply(HashShuffleWriter.scala:118)
        at org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$revertWrites$1.apply(HashShuffleWriter.scala:117)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at org.apache.spark.shuffle.hash.HashShuffleWriter.revertWrites(HashShuffleWriter.scala:117)
        at org.apache.spark.shuffle.hash.HashShuffleWriter.stop(HashShuffleWriter.scala:89)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:54)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)

出问题的代码块(scala)

1 val cRdd = iRdd.leftOuterJoin(label).map {
2      case (id, (iMap, Some(set))) => (id, (iMap, set))
3      case (id, (iMap, None)) => (id, (iMap, new HashSet[Int]()))
4    }.persist(StorageLevel.MEMORY_AND_DISK)

二、问题分析与解决

一般spark job很多问题都是来源于系统资源不够用,通过监控日志等判断是内存资源占用过高导致的问题,因此尝试通过配置参数的方法来解决。

1)--conf spark.akka.frameSize=100

此参数控制Spark中通信消息的最大容量 (如task的输出结果),默认为10M。当处理大数据时,task的输出可能会大于这个值,需要根据实际数据设置一个更高的值。尝试将此参数设置成100M后,问题未能解决。

2)--conf spark.shuffle.manager=SORT

Spark默认的shuffle采用Hash模式,在HASH模式下,每一次shuffle会生成M*R的数量的文件(M指的是Map的数目,R指的是Reduce的数目),而当Map和Reduce的数目开得较大时,会产生相当规模的文件,与此同时带来了大量的内存开销。

为了降低系统资源,可以采用Sort模式,Sort模式只产生M数量的文件。具体可以参考:Sort-based Shuffle之初体验

在我们的应用场景下,采用Sort模式后,shuffle时间比之前增大了1/3,但是问题依旧未解决。

3)--conf spark.yarn.executor.memoryOverhead=4096

executor堆外内存设置。起初是1024M,未能跑过,后改为4096M,Job就能跑通,原因是程序使用了大量的堆外内存。

时间: 2024-07-29 23:18:13

spark job运行参数优化的相关文章

JVM运行参数优化详细教程

获取设置的参数str的值:  常用的-X参数有以下这些: 手动调用GC执行垃圾回收操作:(-XX:+DisableExplicitGC 手动调用将会失效) 查看tomcat的进程ID: 或者: 原文地址:https://www.cnblogs.com/niwotaxuexiba/p/11221276.html

MySQL优化:mysql服务运行参数的设置

可以用管理员登录在mysql>命令行下设置,设置是临时的. 如果希望永久生效,则要修改/etc/my.cnf文件中的[mysqld]下相关参数: #vim /ect/my.cnf [mysqld] -- 下面详细讲解相关运行参数的设置: 1.并发连接数设置    max_connections 最大并发连接数的设置公式:曾经有过的最大连接数/要设置的最大连接数*100%约等于85%时是合适的,15%应付突发访问量 mysql> show  variables like  "max_c

Spark Streaming实践和优化

发表于:<程序员>杂志2016年2月刊.链接:http://geek.csdn.net/news/detail/54500 作者:徐鑫,董西成 在流式计算领域,Spark Streaming和Storm时下应用最广泛的两个计算引擎.其中,Spark Streaming是Spark生态系统中的重要组成部分,在实现上复用Spark计算引擎.如图1所示,Spark Streaming支持的数据源有很多,如Kafka.Flume.TCP等.Spark Streaming的内部数据表示形式为DStrea

【Spark 深入学习 04】再说Spark底层运行机制

本节内容 · spark底层执行机制 · 细说RDD构建过程 · Job Stage的划分算法 · Task最佳计算位置算法 一.spark底层执行机制 对于Spark底层的运行原理,找到了一副很好的图,先贴上 客户端提交应用后,spark是如何执行的要有一个整体的概念,做到心中有数,先整体把握,才能更好的分模块开垦细节,废话不多说,先来看该图如何更好的理解. 1)提交前的联系 Worker向Master或则ResourceManager汇报自己有哪些资源(内存.CPU.磁盘空间.网络等),Ma

Spark 性能相关参数配置详解-shuffle篇

作者:刘旭晖 Raymond 转载请注明出处 Email:colorant at 163.com BLOG:http://blog.csdn.net/colorant/ 随着Spark的逐渐成熟完善, 越来越多的可配置参数被添加到Spark中来, 在Spark的官方文档http://spark.apache.org/docs/latest/configuration.html 中提供了这些可配置参数中相当大一部分的说明. 但是文档的更新总是落后于代码的开发的, 还有一些配置参数没有来得及被添加到

spark-一些参数优化

Spark程序优化所需要关注的几个关键点--最主要的是数据序列化和内存优化 spark 设置相关参数问题1:reduce task数目不合适解决方法:需根据实际情况调节默认配置,调整方式是修改参数spark.default.parallelism.通常,reduce数目设置为core数目的2到3倍.数量太大,造成很多小任务,增加启动任务的开销:数目太少,任务运行缓慢.如果你想了解大数据的学习路线,想学习大数据知识以及需要免费的学习资料可以加群:784789432.欢迎你的加入.每天下午三点开直播

OpenCV中的SVM参数优化

OpenCV中的SVM参数优化 标签: svm参数优化opencv SVMSVR参数优化CvSVMopencv CvSVM 2014-08-19 10:31 2995人阅读 评论(8) 收藏 举报  分类: 机器学习(11)  opencv(18)  版权声明:本文为博主原创文章,未经博主允许不得转载. SVM(支持向量机)是机器学习算法里用得最多的一种算法.SVM最常用的是用于分类,不过SVM也可以用于回归,我的实验中就是用SVM来实现SVR(支持向量回归). 对于功能这么强的算法,OpenC

Spark修炼之道(进阶篇)——Spark入门到精通:第九节 Spark SQL运行流程解析

1.整体运行流程 使用下列代码对SparkSQL流程进行分析,让大家明白LogicalPlan的几种状态,理解SparkSQL整体执行流程 // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits

视频编码器评估与参数优化

视频编码器评估与参数优化是个比较麻烦的问题,主要原因是:编码器种类多,输入的参数繁多,参数的相互影响非线性,深入理解其实现需要投入巨大的精力或者不可能(对于商业产品而言).而另一方面,评估和选择编码器.针对特定的目标选择最优的编码参数组合对于视频服务商--比如视频网站来说,是一个现实的需要.有感于或者苦恼于如何选择最优的方案,笔者通过对多目标优化方法的尝试进行了一定的探索,之所以选择这种方法,主要原因是它比较"暴力"和"傻瓜",并不需要过多的纠缠参数的涵义f'(*∩