【转载】Apache Spark Jobs 性能调优(二)

调试资源分配

 

Spark 的用户邮件邮件列表中经常会出现 “我有一个500个节点的集群,为什么但是我的应用一次只有两个 task 在执行”,鉴于 Spark 控制资源使用的参数的数量,这些问题不应该出现。但是在本章中,你将学会压榨出你集群的每一分资源。推荐的配置将根据不同的集群管理系统(YARN、Mesos、Spark Standalone)而有所不同,我们将主要集中在YARN 上,因为这个Cloudera 推荐的方式。

Spark(以及YARN) 需要关心的两项主要的资源是 CPU 和 内存, 磁盘 和 IO 当然也影响着 Spark 的性能,但是不管是 Spark 还是 Yarn 目前都没法对他们做实时有效的管理。

在一个 Spark 应用中,每个 Spark executor 拥有固定个数的 core 以及固定大小的堆大小。core 的个数可以在执行 spark-submit 或者 pyspark 或者 spark-shell 时,通过参数 --executor-cores 指定,或者在 spark-defaults.conf 配置文件或者 SparkConf 对象中设置 spark.executor.cores 参数。同样地,堆的大小可以通过 --executor-memory 参数或者 spark.executor.memory 配置项。core 配置项控制一个 executor 中task的并发数。 --executor-cores 5 意味着每个 executor 中最多同时可以有5个 task 运行。memory 参数影响 Spark 可以缓存的数据的大小,也就是在 group aggregate 以及 join 操作时 shuffle 的数据结构的最大值。

--num-executors 命令行参数或者spark.executor.instances 配置项控制需要的 executor 个数。从 CDH 5.4/Spark 1.3 开始,你可以避免使用这个参数,只要你通过设置 spark.dynamicAllocation.enabled 参数打开 动态分配 。动态分配可以使的 Spark 的应用在有后续积压的在等待的 task 时请求 executor,并且在空闲时释放这些 executor。

同时 Spark 需求的资源如何跟 YARN 中可用的资源配合也是需要着重考虑的,YARN 相关的参数有:

  • yarn.nodemanager.resource.memory-mb 控制在每个节点上 container 能够使用的最大内存;
  • yarn.nodemanager.resource.cpu-vcores 控制在每个节点上 container 能够使用的最大core个数;

请求5个 core 会生成向YARN 要5个虚拟core的请求。从YARN 请求内存相对比较复杂因为以下的一些原因:

--executor-memory/spark.executor.memory 控制 executor 的堆的大小,但是 JVM 本身也会占用一定的堆空间,比如内部的 String 或者直接 byte buffer,executor memory 的 spark.yarn.executor.memoryOverhead 属性决定向YARN 请求的每个 executor 的内存大小,默认值为max(384, 0.7 * spark.executor.memory);
YARN 可能会比请求的内存高一点,YARN 的 yarn.scheduler.minimum-allocation-mb 和 yarn.scheduler.increment-allocation-mb 属性控制请求的最小值和增加量。
下面展示的是 Spark on YARN 内存结构:

如果这些还不够决定Spark executor 个数,还有一些概念还需要考虑的:

  • 应用的master,是一个非 executor 的容器,它拥有特殊的从YARN 请求资源的能力,它自己本身所占的资源也需要被计算在内。在 yarn-client 模式下,它默认请求 1024MB 和 1个core。在 yarn-cluster 模式中,应用的 master 运行 driver,所以使用参数 --driver-memory 和 --driver-cores 配置它的资源常常很有用。
  • 在 executor 执行的时候配置过大的 memory 经常会导致过长的GC延时,64G是推荐的一个 executor 内存大小的上限。
  • 我们注意到 HDFS client 在大量并发线程是时性能问题。大概的估计是每个 executor 中最多5个并行的 task 就可以占满的写入带宽。
  • 在运行微型 executor 时(比如只有一个core而且只有够执行一个task的内存)扔掉在一个JVM上同时运行多个task的好处。比如 broadcast 变量需要为每个 executor 复制一遍,这么多小executor会导致更多的数据拷贝。

为了让以上的这些更加具体一点,这里有一个实际使用过的配置的例子,可以完全用满整个集群的资源。假设一个集群有6个节点有NodeManager在上面运行,每个节点有16个core以及64GB的内存。那么 NodeManager的容量:yarn.nodemanager.resource.memory-mb 和 yarn.nodemanager.resource.cpu-vcores 可以设为 63 * 1024 = 64512 (MB) 和 15。我们避免使用 100% 的 YARN container 资源因为还要为 OS 和 hadoop 的 Daemon 留一部分资源。在上面的场景中,我们预留了1个core和1G的内存给这些进程。Cloudera Manager 会自动计算并且配置。

所以看起来我们最先想到的配置会是这样的:--num-executors 6 --executor-cores 15 --executor-memory 63G。但是这个配置可能无法达到我们的需求,因为: 
- 63GB+ 的 executor memory 塞不进只有 63GB 容量的 NodeManager; 
- 应用的 master 也需要占用一个core,意味着在某个节点上,没有15个core给 executor 使用; 
- 15个core会影响 HDFS IO的吞吐量。 
配置成 --num-executors 17 --executor-cores 5 --executor-memory 19G 可能会效果更好,因为: 
- 这个配置会在每个节点上生成3个 executor,除了应用的master运行的机器,这台机器上只会运行2个 executor

- --executor-memory 被分成3份(63G/每个节点3个executor)=21。 21 * (1 - 0.07) ~ 19。

调试并发

 

我们知道 Spark 是一套数据并行处理的引擎。但是 Spark 并不是神奇得能够将所有计算并行化,它没办法从所有的并行化方案中找出最优的那个。每个 Spark stage 中包含若干个 task,每个 task 串行地处理数据。在调试 Spark 的job时,task 的个数可能是决定程序性能的最重要的参数。

那么这个数字是由什么决定的呢?在之前的博文中介绍了 Spark 如何将 RDD 转换成一组 stage。task 的个数与 stage 中上一个 RDD 的 partition 个数相同。而一个 RDD 的 partition 个数与被它依赖的 RDD 的 partition 个数相同,除了以下的情况:coalesce transformation 可以创建一个具有更少 partition 个数的 RDD,union transformation 产出的 RDD 的 partition 个数是它父 RDD 的 partition 个数之和,cartesian 返回的 RDD 的 partition 个数是它们的积。

如果一个 RDD 没有父 RDD 呢? 由 textFile 或者 hadoopFile 生成的 RDD 的 partition 个数由它们底层使用的 MapReduce InputFormat 决定的。一般情况下,每读到的一个 HDFS block 会生成一个 partition。通过parallelize 接口生成的 RDD 的 partition 个数由用户指定,如果用户没有指定则由参数 spark.default.parallelism 决定。

要想知道 partition 的个数,可以通过接口 rdd.partitions().size() 获得。

这里最需要关心的问题在于 task 的个数太小。如果运行时 task 的个数比实际可用的 slot 还少,那么程序解没法使用到所有的 CPU 资源。

过少的 task 个数可能会导致在一些聚集操作时, 每个 task 的内存压力会很大。任何 joincogroup*ByKey 操作都会在内存生成一个 hash-map或者 buffer 用于分组或者排序。joincogroup ,groupByKey 会在 shuffle 时在 fetching 端使用这些数据结构,reduceByKey ,aggregateByKey 会在 shuffle 时在两端都会使用这些数据结构。

当需要进行这个聚集操作的 record 不能完全轻易塞进内存中时,一些问题会暴露出来。首先,在内存 hold 大量这些数据结构的 record 会增加 GC的压力,可能会导致流程停顿下来。其次,如果数据不能完全载入内存,Spark 会将这些数据写到磁盘,这会引起磁盘 IO和排序。在 Cloudera 的用户中,这可能是导致 Spark Job 慢的首要原因。

那么如何增加你的 partition 的个数呢?如果你的问题 stage 是从 Hadoop 读取数据,你可以做以下的选项: 
- 使用 repartition 选项,会引发 shuffle; 
- 配置 InputFormat 用户将文件分得更小; 
- 写入 HDFS 文件时使用更小的block。

如果问题 stage 从其他 stage 中获得输入,引发 stage 边界的操作会接受一个 numPartitions 的参数,比如

<span style="font-family:Microsoft YaHei;"><span style="font-family:Microsoft YaHei;font-size:14px;">val rdd2 = rdd1.reduceByKey(_ + _, numPartitions = X)</span></span>

  

X 应该取什么值?最直接的方法就是做实验。不停的将 partition 的个数从上次实验的 partition 个数乘以1.5,直到性能不再提升为止。

同时也有一些原则用于计算 X,但是也不是非常的有效是因为有些参数是很难计算的。这里写到不是因为它们很实用,而是可以帮助理解。这里主要的目标是启动足够的 task 可以使得每个 task 接受的数据能够都塞进它所分配到的内存中。

每个 task 可用的内存通过这个公式计算:spark.executor.memory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction)/spark.executor.cores 。 memoryFraction 和 safetyFractio 默认值分别 0.2 和 0.8.

在内存中所有 shuffle 数据的大小很难确定。最可行的是找出一个 stage 运行的 Shuffle Spill(memory) 和 Shuffle Spill(Disk) 之间的比例。在用所有shuffle 写乘以这个比例。但是如果这个 stage 是 reduce 时,可能会有点复杂:

在往上增加一点因为大多数情况下 partition 的个数会比较多。

试试在,在有所疑虑的时候,使用更多的 task 数(也就是 partition 数)都会效果更好,这与 MapRecuce 中建议 task 数目选择尽量保守的建议相反。这个因为 MapReduce 在启动 task 时相比需要更大的代价。

压缩你的数据结构

 

Spark 的数据流由一组 record 构成。一个 record 有两种表达形式:一种是反序列化的 Java 对象另外一种是序列化的二进制形式。通常情况下,Spark 对内存中的 record 使用反序列化之后的形式,对要存到磁盘上或者需要通过网络传输的 record 使用序列化之后的形式。也有计划在内存中存储序列化之后的 record。

spark.serializer 控制这两种形式之间的转换的方式。Kryo serializer,org.apache.spark.serializer.KryoSerializer 是推荐的选择。但不幸的是它不是默认的配置,因为 KryoSerializer 在早期的 Spark 版本中不稳定,而 Spark 不想打破版本的兼容性,所以没有把 KryoSerializer 作为默认配置,但是 KryoSerializer 应该在任何情况下都是第一的选择。

你的 record 在这两种形式切换的频率对于 Spark 应用的运行效率具有很大的影响。去检查一下到处传递数据的类型,看看能否挤出一点水分是非常值得一试的。

过多的反序列化之后的 record 可能会导致数据到处到磁盘上更加频繁,也使得能够 Cache 在内存中的 record 个数减少。点击这里查看如何压缩这些数据。

过多的序列化之后的 record 导致更多的 磁盘和网络 IO,同样的也会使得能够 Cache 在内存中的 record 个数减少,这里主要的解决方案是把所有的用户自定义的 class 都通过 SparkConf#registerKryoClasses 的API定义和传递的。

数据格式

 

任何时候你都可以决定你的数据如何保持在磁盘上,使用可扩展的二进制格式比如:Avro,Parquet,Thrift或者Protobuf,从中选择一种。当人们在谈论在Hadoop上使用Avro,Thrift或者Protobuf时,都是认为每个 record 保持成一个 Avro/Thrift/Protobuf 结构保存成 sequence file。而不是JSON。

每次当时试图使用JSON存储大量数据时,还是先放弃吧...

【转载自:http://blog.csdn.net/u012102306/article/details/51700664】

时间: 2024-12-28 08:24:04

【转载】Apache Spark Jobs 性能调优(二)的相关文章

【转载】Apache Spark Jobs 性能调优(一)

当你开始编写 Apache Spark 代码或者浏览公开的 API 的时候,你会遇到各种各样术语,比如 transformation,action,RDD 等等. 了解到这些是编写 Spark 代码的基础. 同样,当你任务开始失败或者你需要透过web界面去了解自己的应用为何如此费时的时候,你需要去了解一些新的名词: job, stage, task.对于这些新术语的理解有助于编写良好 Spark 代码.这里的良好主要指更快的 Spark 程序.对于 Spark 底层的执行模型的了解对于写出效率更

Apache Spark Jobs 性能调优

当你开始编写 Apache Spark 代码或者浏览公开的 API 的时候,你会遇到各种各样术语,比如transformation,action,RDD 等等. 了解到这些是编写 Spark 代码的基础. 同样,当你任务开始失败或者你需要透过web界面去了解自己的应用为何如此费时的时候,你需要去了解一些新的名词: job, stage, task.对于这些新术语的理解有助于编写良好 Spark 代码.这里的良好主要指更快的 Spark 程序.对于 Spark 底层的执行模型的了解对于写出效率更高

Spark的性能调优

下面这些关于Spark的性能调优项,有的是来自官方的,有的是来自别的的工程师,有的则是我自己总结的. Data Serialization,默认使用的是Java Serialization,这个程序员最熟悉,但是性能.空间表现都比较差.还有一个选项是Kryo Serialization,更快,压缩率也更高,但是并非支持任意类的序列化. Memory Tuning,Java对象会占用原始数据2~5倍甚至更多的空间.最好的检测对象内存消耗的办法就是创建RDD,然后放到cache里面去,然后在UI 上

Spark Streaming性能调优详解(转)

原文链接:Spark Streaming性能调优详解 Spark Streaming提供了高效便捷的流式处理模式,但是在有些场景下,使用默认的配置达不到最优,甚至无法实时处理来自外部的数据,这时候我们就需要对默认的配置进行相关的修改.由于现实中场景和数据量不一样,所以我们无法设置一些通用的配置(要不然Spark Streaming开发者就不会弄那么多参数,直接写死不得了),我们需要根据数据量,场景的不同设置不一样的配置,这里只是给出建议,这些调优不一定试用于你的程序,一个好的配置是需要慢慢地尝试

Spark 常规性能调优

1. 常规性能调优 一:最优资源配置 Spark性能调优的第一步,就是为任务分配更多的资源,在一定范围内,增加资源的分配与性能的提升是成正比的,实现了最优的资源配置后,在此基础上再考虑进行后面论述的性能调优策略.  --driver-memory 配置Driver内存(影响不大) 内存大小影响不大 资源的分配在使用脚本提交Spark任务时进行指定,标准的Spark任务提交脚本如代码清单2-1所示: /usr/opt/modules/spark/bin/spark-submit --class c

揭秘Spark应用性能调优

引言:在多台机器上分布数据以及处理数据是Spark的核心能力,即我们所说的大规模的数据集处理.为了充分利用Spark特性,应该考虑一些调优技术.本文每一小节都是关于调优技术的,并给出了如何实现调优的必要步骤.本文选自<Spark GraphX实战>. 1 用缓存和持久化来加速 Spark 我们知道Spark 可以通过 RDD 实现计算链的原理 :转换函数包含在 RDD 链中,但仅在调用 action 函数后才会触发实际的求值过程,执行分布式运算,返回运算结果.要是在 同一 RDD 上重复调用

使用Apache Spark 对 mysql 调优 查询速度提升10倍以上

在这篇文章中我们将讨论如何利用 Apache Spark 来提升 MySQL 的查询性能. 介绍 在我的前一篇文章Apache Spark with MySQL 中介绍了如何利用 Apache Spark 实现数据分析以及如何对大量存放于文本文件的数据进行转换和分析.瓦迪姆还做了一个基准测试用来比较 MySQL 和 Spark with Parquet 柱状格式 (使用空中交通性能数据) 二者的性能. 这个测试非常棒,但如果我们不希望将数据从 MySQL 移到其他的存储系统中,而是继续在已有的

apache高负载性能调优

1 先阅读apache配置优化建议如下,再对相关参数进行调整,观察服务器状况. 2 Apache配置优化建议: 3 进入/usr/local/apache2/conf/extra 目录下 4 Apache优化, 5 经过上述操作后,Apache已经能够正常运行.但是,对于访问量稍大的站点,Apache的这些默认配置是无法满足需求的,我们仍需调整Apache的一些参数,使Apache能够在大访问量环境下发挥出更好的性能.以下我们对Apache配置文件httpd.conf中对性能影响较大的参数进行一

spark性能调优:开发调优

在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的功能涵盖了大数据领域的离线批处理.SQL类处理.流式/实时计算.机器学习.图计算等各种不同类型的计算操作,应用范围与前景非常广泛. 然而,通过Spark开发出高性能的大数据计算作业,并不是那么简单的.如果没有对Spark作业进行合理的调优,Spark作业的执行速度可能会很慢,这样就完全体现不出Spark作为一种快速大数据计算引擎的优势来.因此,想要用好Spark,就必须对其进行合理的性能优化. Spark的