[转] - Spark排错与优化

Spark排错与优化

http://blog.csdn.net/lsshlsw/article/details/49155087

一. 运维

1. Master挂掉,standby重启也失效

Master默认使用512M内存,当集群中运行的任务特别多时,就会挂掉,原因是master会读取每个task的event log日志去生成Sparkui,内存不足自然会OOM,可以在master的运行日志中看到,通过HA启动的master自然也会因为这个原因失败。

解决

  1. 增加Master的内存占用,在Master节点spark-env.sh 中设置:

    export SPARK_DAEMON_MEMORY 10g # 根据你的实际情况
    
  2. 减少保存在Master内存中的作业信息
    spark.ui.retainedJobs 500   # 默认都是1000
    spark.ui.retainedStages 500
    

2. worker挂掉或假死

有时候我们还会在web ui中看到worker节点消失或处于dead状态,在该节点运行的任务则会报各种 lost worker 的错误,引发原因和上述大体相同,worker内存中保存了大量的ui信息导致gc时失去和master之间的心跳。

解决

  1. 增加Master的内存占用,在Worker节点spark-env.sh 中设置:

    export SPARK_DAEMON_MEMORY 2g # 根据你的实际情况
    
  2. 减少保存在Worker内存中的Driver,Executor信息
    spark.worker.ui.retainedExecutors 200   # 默认都是1000
    spark.worker.ui.retainedDrivers 200
    

二. 运行错误

1.shuffle FetchFailedException

Spark Shuffle FetchFailedException解决方案

错误提示

  1. missing output location

    org.apache.spark.shuffle.MetadataFetchFailedException:
    Missing an output location for shuffle 0
    

  2. shuffle fetch faild
    org.apache.spark.shuffle.FetchFailedException:
    Failed to connect to spark047215/192.168.47.215:50268
    

    当前的配置为每个executor使用1core,5GRAM,启动了20个executor

解决

这种问题一般发生在有大量shuffle操作的时候,task不断的failed,然后又重执行,一直循环下去,直到application失败。

一般遇到这种问题提高executor内存即可,同时增加每个executor的cpu,这样不会减少task并行度。

  • spark.executor.memory 15G
  • spark.executor.cores 3
  • spark.cores.max 21

启动的execuote数量为:7个

execuoterNum = spark.cores.max/spark.executor.cores

每个executor的配置:

3core,15G RAM

消耗的内存资源为:105G RAM

15G*7=105G

可以发现使用的资源并没有提升,但是同样的任务原来的配置跑几个小时还在卡着,改了配置后几分钟就能完成。

2.Executor&Task Lost

错误提示

  1. executor lost

    WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, aa.local):
    ExecutorLostFailure (executor lost)
    
  2. task lost
    WARN TaskSetManager: Lost task 69.2 in stage 7.0 (TID 1145, 192.168.47.217):
    java.io.IOException: Connection from /192.168.47.217:55483 closed
    
  3. 各种timeout
    java.util.concurrent.TimeoutException: Futures timed out after [120 second]
    
    ERROR TransportChannelHandler: Connection to /192.168.47.212:35409
    has been quiet for 120000 ms while there are outstanding requests.
    Assuming connection is dead; please adjust spark.network.
    timeout if this is wrong
    

解决

由网络或者gc引起,worker或executor没有接收到executor或task的心跳反馈。 
提高 spark.network.timeout 的值,根据情况改成300(5min)或更高。 
默认为 120(120s),配置所有网络传输的延时,如果没有主动设置以下参数,默认覆盖其属性

  • spark.core.connection.ack.wait.timeout
  • spark.akka.timeout
  • spark.storage.blockManagerSlaveTimeoutMs
  • spark.shuffle.io.connectionTimeout
  • spark.rpc.askTimeout or spark.rpc.lookupTimeout

3.倾斜

错误提示

  1. 数据倾斜

  2. 任务倾斜 
    差距不大的几个task,有的运行速度特别慢。

解决

大多数任务都完成了,还有那么一两个任务怎么都跑不完或者跑的很慢,分为数据倾斜和task倾斜两种。

  1. 数据倾斜 
    数据倾斜大多数情况是由于大量的无效数据引起,比如null或者”“,也有可能是一些异常数据,比如统计用户登录情况时,出现某用户登录过千万次的情况,无效数据在计算前需要过滤掉。 
    数据处理有一个原则,多使用filter,这样你真正需要分析的数据量就越少,处理速度就越快。

    sqlContext.sql("...where col is not null and col != ‘‘")
    

    具体可参考: 
    解决spark中遇到的数据倾斜问题

  2. 任务倾斜 
    task倾斜原因比较多,网络io,cpu,mem都有可能造成这个节点上的任务执行缓慢,可以去看该节点的性能监控来分析原因。以前遇到过同事在spark的一台worker上跑R的任务导致该节点spark task运行缓慢。 
    或者可以开启spark的推测机制,开启推测机制后如果某一台机器的几个task特别慢,推测机制会将任务分配到其他机器执行,最后Spark会选取最快的作为最终结果。
    • spark.speculation true
    • spark.speculation.interval 100 - 检测周期,单位毫秒;
    • spark.speculation.quantile 0.75 - 完成task的百分比时启动推测
    • spark.speculation.multiplier 1.5 - 比其他的慢多少倍时启动推测。

4.OOM

错误提示

堆内存溢出

java.lang.OutOfMemoryError: Java heap space

解决

内存不够,数据太多就会抛出OOM的Exeception,主要有driver OOM和executor OOM两种

  1. driver OOM 
    一般是使用了collect操作将所有executor的数据聚合到driver导致。尽量不要使用collect操作即可。
  2. executor OOM 
    可以按下面的内存优化的方法增加code使用内存空间
    • 增加executor内存总量,也就是说增加spark.executor.memory的值
    • 增加任务并行度(大任务就被分成小任务了),参考下面优化并行度的方法

5.task not serializable

错误提示

org.apache.spark.SparkException: Job aborted due to stage failure:
Task not serializable: java.io.NotSerializableException: ...

解决

如果你在worker中调用了driver中定义的一些变量,Spark就会将这些变量传递给Worker,这些变量并没有被序列化,所以就会看到如上提示的错误了。

val x = new X()  //在driver中定义的变量
dd.map{r => x.doSomething(r) }.collect  //map中的代码在worker(executor)中执行
  • 1
  • 2
  • 1
  • 2

除了上文的map,还有filter,foreach,foreachPartition等操作,还有一个典型例子就是在foreachPartition中使用数据库创建连接方法。这些变量没有序列化导致的任务报错。

下面提供三种解决方法:

  1. 将所有调用到的外部变量直接放入到以上所说的这些算子中,这种情况最好使用foreachPartition减少创建变量的消耗。
  2. 将需要使用的外部变量包括sparkConf,SparkContext,都用 @transent进行注解,表示这些变量不需要被序列化
  3. 将外部变量放到某个class中对类进行序列化。

6.driver.maxResultSize太小

错误提示

Caused by: org.apache.spark.SparkException:
 Job aborted due to stage failure: Total size of serialized
 results of 374 tasks (1026.0 MB) is bigger than
  spark.driver.maxResultSize (1024.0 MB)

解决

spark.driver.maxResultSize默认大小为1G 每个Spark action(如collect)所有分区的序列化结果的总大小限制,简而言之就是executor给driver返回的结果过大,报这个错说明需要提高这个值或者避免使用类似的方法,比如countByValue,countByKey等。

将值调大即可

spark.driver.maxResultSize 2g

7.taskSet too large

错误提示

WARN TaskSetManager: Stage 198 contains a task of very large size (5953 KB). The maximum recommended task size is 100 KB.
  • 1
  • 1

这个WARN可能还会导致ERROR

Caused by: java.lang.RuntimeException: Failed to commit task

Caused by: org.apache.spark.executor.CommitDeniedException: attempt_201603251514_0218_m_000245_0: Not committed because the driver did not authorize commit
  • 1
  • 2
  • 3
  • 1
  • 2
  • 3

解决

如果你比较了解spark中的stage是如何划分的,这个问题就比较简单了。 
一个Stage中包含的task过大,一般由于你的transform过程太长,因此driver给executor分发的task就会变的很大。 
所以解决这个问题我们可以通过拆分stage解决。也就是在执行过程中调用cache.count缓存一些中间数据从而切断过长的stage。

8. driver did not authorize commit

driver did not authorize commit

9. 环境报错

  1. driver节点内存不足 
    driver内存不足导致无法启动application,将driver分配到内存足够的机器上或减少driver-memory

    Java HotSpot(TM) 64-Bit Server VM warning: INFO:
    

    os::commit_memory(0x0000000680000000, 4294967296, 0) failed; 
    error=’Cannot allocate memory’ (errno=12)

  2. hdfs空间不够 
    hdfs空间不足,event_log无法写入,所以 ListenerBus会报错 ,增加hdfs空间(删除无用数据或增加节点)
    Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
     File /tmp/spark-history/app-20151228095652-0072.inprogress
     could only be replicated to 0 nodes instead of minReplication (=1)
    
    ERROR LiveListenerBus: Listener EventLoggingListener threw an exception
    java.lang.reflect.InvocationTargetException
    
  3. spark编译包与Hadoop版本不一致 
    下载对应hadoop版本的spark包或自己编译。
    java.io.InvalidClassException: org.apache.spark.rdd.RDD;
     local class incompatible: stream classdesc serialVersionUID
    
  4. driver机器端口使用过多 
    在一台机器上没有指定端口的情况下,提交了超过15个任务。
    16/03/16 16:03:17 ERROR SparkUI: Failed to bind SparkUI
    java.net.BindException: 地址已在使用: Service ‘SparkUI‘ failed after 16 retries!
    

    提交任务时指定app web ui端口号解决:

    --conf spark.ui.port=xxxx
    
  5. 中文乱码

    使用write.csv等方法写出到hdfs的文件,中文乱码。JVM使用的字符集如果没有指定,默认会使用系统的字符集,因为各个节点系统字符集并不都是UTF8导致,所以会出现这个问题。直接给JVM指定字符集即可。

    spark-defaults.conf

    spark.executor.extraJavaOptions -Dfile.encoding=UTF-8
    

三. 一些python错误

1.python版本过低

java.io.UIException: Cannot run program "python2.7": error=2,没有那个文件或目录

spark使用的Python版本为2.7,centOS默认python版本为2.6,升级即可。

2.python权限不够

错误提示

部分节点上有错误提示

java.io.IOExeception: Cannot run program "python2.7": error=13, 权限不够

解决

新加的节点运维装2.7版本的python,python命令是正确的,python2.7却无法调用,只要改改环境变量就好了。

3.pickle使用失败

错误提示

TypeError: (‘__cinit__() takes exactly 8 positional arguments (11 given)‘,
 <type ‘sklearn.tree._tree.Tree‘>, (10, array([1], dtype=int32), 1,
  <sklearn.tree._tree.RegressionCriterion object at 0x100077480>,
   50.0, 2, 1, 0.1, 10, 1, <mtrand.RandomState object at 0x10a55da08>))

解决

该pickle文件是在0.17版本的scikit-learn下训练出来的,有些机器装的是0.14版本,版本不一致导致,升级可解决,记得将老版本数据清理干净,否则会报各种Cannot import xxx的错误。

4.python编码错误

错误提示

UnicodeEncodeError: ‘ascii‘ codec can‘t encode characters in position 0-1: ordinal not in range(128)

解决

方法1:

import sys
reload(sys)
sys.setdefaultencoding(‘utf-8‘)
  • 1
  • 2
  • 3
  • 1
  • 2
  • 3

方法2:

//报错
str(u‘中国‘)
//不报错
str(u‘中国‘.encode(‘utf-8‘))
  • 1
  • 2
  • 3
  • 4
  • 1
  • 2
  • 3
  • 4

四. 一些优化

1. 部分Executor不执行任务

有时候会发现部分executor并没有在执行任务,为什么呢?

(1) 任务partition数过少, 
要知道每个partition只会在一个task上执行任务。改变分区数,可以通过 repartition 方法,即使这样,在 repartition 前还是要从数据源读取数据,此时(读入数据时)的并发度根据不同的数据源受到不同限制,常用的大概有以下几种:

hdfs - block数就是partition数
mysql - 按读入时的分区规则分partition
es - 分区数即为 es 的 分片数(shard)

(2) 数据本地性的副作用

taskSetManager在分发任务之前会先计算数据本地性,优先级依次是:

process(同一个executor) -> node_local(同一个节点) -> rack_local(同一个机架) -> any(任何节点)

Spark会优先执行高优先级的任务,任务完成的速度很快(小于设置的spark.locality.wait时间),则数据本地性下一级别的任务则一直不会启动,这就是Spark的延时调度机制。

举个极端例子:运行一个count任务,如果数据全都堆积在某一台节点上,那将只会有这台机器在长期计算任务,集群中的其他机器则会处于等待状态(等待本地性降级)而不执行任务,造成了大量的资源浪费。

判断的公式为:

curTime – lastLaunchTime >= localityWaits(currentLocalityIndex)

其中 curTime 为系统当前时间,lastLaunchTime 为在某优先级下最后一次启动task的时间

如果满足这个条件则会进入下一个优先级的时间判断,直到 any,不满足则分配当前优先级的任务。

数据本地性任务分配的源码在 taskSetManager.Scala 。

如果存在大量executor处于等待状态,可以降低以下参数的值(也可以设置为0),默认都是3s。

spark.locality.wait
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack

当你数据本地性很差,可适当提高上述值,当然也可以直接在集群中对数据进行balance。

2. spark task 连续重试失败

有可能哪台worker节点出现了故障,task执行失败后会在该 executor 上不断重试,达到最大重试次数后会导致整个 application执行失败,我们可以设置失败黑名单(task在该节点运行失败后会换节点重试),可以看到在源码中默认设置的是 0,

private val EXECUTOR_TASK_BLACKLIST_TIMEOUT =
    conf.getLong("spark.scheduler.executorTaskBlacklistTime", 0L)  
  • 1
  • 2
  • 1
  • 2

在 spark-default.sh 中设置

spark.scheduler.executorTaskBlacklistTime 30000

当 task 在该 executor 运行失败后会在其它 executor 中启动,同时此 executor 会进入黑名单30s(不会分发任务到该executor)。

3. 内存

如果你的任务shuffle量特别大,同时rdd缓存比较少可以更改下面的参数进一步提高任务运行速度。

spark.storage.memoryFraction - 分配给rdd缓存的比例,默认为0.6(60%),如果缓存的数据较少可以降低该值。 
spark.shuffle.memoryFraction - 分配给shuffle数据的内存比例,默认为0.2(20%) 
剩下的20%内存空间则是分配给代码生成对象等。

如果任务运行缓慢,jvm进行频繁gc或者内存空间不足,或者可以降低上述的两个值。 
"spark.rdd.compress","true" - 默认为false,压缩序列化的RDD分区,消耗一些cpu减少空间的使用

4. 并发

mysql读取并发度优化

spark.default.parallelism 
发生shuffle时的并行度,在standalone模式下的数量默认为core的个数,也可手动调整,数量设置太大会造成很多小任务,增加启动任务的开销,太小,运行大数据量的任务时速度缓慢。

spark.sql.shuffle.partitions 
sql聚合操作(发生shuffle)时的并行度,默认为200,如果该值太小会导致OOM,executor丢失,任务执行时间过长的问题

相同的两个任务: 
spark.sql.shuffle.partitions=300:

spark.sql.shuffle.partitions=500:

速度变快主要是大量的减少了gc的时间。

但是设置过大会造成性能恶化,过多的碎片task会造成大量无谓的启动关闭task开销,还有可能导致某些task hang住无法执行。

修改map阶段并行度主要是在代码中使用rdd.repartition(partitionNum)来操作。

5. shuffle

spark-sql join优化 
map-side-join 关联优化

6. 磁盘

磁盘IO优化

7.序列化

kryo Serialization

8.数据本地性

Spark不同Cluster Manager下的数据本地性表现 
spark读取hdfs数据本地性异常

9.代码

编写Spark程序的几个优化点

时间: 2024-08-04 23:10:56

[转] - Spark排错与优化的相关文章

Spark 读取 Hbase 优化 --手动划分 region 提高并行数

一. Hbase 的 region 我们先简单介绍下 Hbase 的 架构和 region : 从物理集群的角度看,Hbase 集群中,由一个 Hmaster 管理多个 HRegionServer,其中每个 HRegionServer 都对应一台物理机器,一台 HRegionServer 服务器上又可以有多个 Hregion(以下简称 region).要读取一个数据的时候,首先要先找到存放这个数据的 region.而 Spark 在读取 Hbase 的时候,读取的 Rdd 会根据 Hbase 的

Spark SQL性能优化

1.设置Shuffle过程中的并行度:spark.sql.shuffle.partitions(SQLContext.setConf()) 2.在Hive数据仓库建设过程中,合理设置数据类型,比如能设置为INT的,就不要设置为BIGINT.减少数据类型导致的不必要的内存开销. 3.编写SQL时,尽量给出明确的列名,比如select name from students.不要写select *的方式. 4.并行处理查询结果:对于Spark SQL查询的结果,如果数据量比较大,比如超过1000条,那

spark实时计算性能优化

1.  计算提供两种模式,一种是jar包本地计算.一种是JSF服务. 2.  第一步是引入spark,因与netty.JDQ均有冲突,解决netty冲突后,隔离计算为单独服务.已在线上,因storm也与spark存 在运行时冲突,storm也在用服务. 3.  第二步是召回集扩量,发现当召回集由200扩到500后性能下降过快到70ms,利用多线程多核计算,性能到6ms.已在线上 4.  第三步在此扩量到1000,采用增加线程方式,性能达到25ms左右.已在预发 5.  第四步召回集在扩量,如性能

spark新能优化之多次使用RDD的持久化或checkPoint

如果程序中,对某一个RDD,基于它进行了多次transformation或者action操作.那么就非常有必要对其进行持久化操作,以避免对一个RDD反复进行计算. 此外,如果要保证在RDD的持久化数据可能丢失的情况下,还要保证高性能,那么可以对RDD进行Checkpoint操作.(也就是多次用到中间RDD的生成值时可以持久化再checkPoint(当持久化数据没的时候会去checkPoint中寻找,详细见spark源码.))

Spark Streaming性能优化: 如何在生产环境下应对流数据峰值巨变

1.为什么引入Backpressure 默认情况下,Spark Streaming通过Receiver以生产者生产数据的速率接收数据,计算过程中会出现batch processing time > batch interval的情况,其中batch processing time 为实际计算一个批次花费时间, batch interval为Streaming应用设置的批处理间隔.这意味着Spark Streaming的数据接收速率高于Spark从队列中移除数据的速率,也就是数据处理能力低,在设置

spark新能优化之shuffle新能调优

shuffle调优参数 new SparkConf().set("spark.shuffle.consolidateFiles", "true") spark.shuffle.consolidateFiles:是否开启shuffle block file的合并,默认为false//设置从maPartitionRDD上面到到下个stage的resultTask时数据的传输快可以聚合(具体原理可以看下shuffle的原理设置和没设置的区别)spark.reducer.m

spark新能优化之数据本地化

数据本地化的背景: 数据本地化对于Spark Job性能有着巨大的影响.如果数据以及要计算它的代码是在一起的,那么性能当然会非常高.但是,如果数据和计算它的代码是分开的,那么其中之一必须到另外一方的机器上.通常来说,移动代码到其他节点,会比移动数据到代码所在的节点上去,速度要快得多,因为代码比较小.Spark也正是基于这个数据本地化的原则来构建task调度算法的. 数据本地化,指的是,数据离计算它的代码有多近.基于数据距离代码的距离,有几种数据本地化级别:1.PROCESS_LOCAL:数据和计

spark新能优化之序列化的持久化级别

除了对多次使用的RDD进行持久化操作之外,还可以进一步优化其性能.因为很有可能,RDD的数据是持久化到内存,或者磁盘中的.那么,此时,如果内存大小不是特别充足,完全可以使用序列化的持久化级别,比如MEMORY_ONLY_SER.MEMORY_AND_DISK_SER等.使用RDD.persist(StorageLevel.MEMORY_ONLY_SER)这样的语法即可. 这样的话,将数据序列化之后,再持久化,可以大大减小对内存的消耗.此外,数据量小了之后,如果要写入磁盘,那么磁盘io性能消耗也比

Spark Streaming性能优化系列-如何获得和持续使用足够的集群计算资源?

一:数据峰值的巨大影响 1. 数据确实不稳定,例如晚上的时候访问流量特别大 2. 在处理的时候例如GC的时候耽误时间会产生delay延迟 二:Backpressure:数据的反压机制 基本思想:根据上一次计算的Job的一些信息评估来决定下一个Job数据接收的速度. 如何限制Spark接收数据的速度? Spark Streaming在接收数据的时候必须把当前的数据接收完毕才能接收下一条数据. 源码解析 RateController: 1. RateController是监听器,继承自Streami