Spark 官网提到的几点调优

1. 数据序列化

默认使用的是Java自带的序列化机制。优点是可以处理所有实现了java.io.Serializable 的类。但是Java 序列化比较慢。

可以使用Kryo序列化机制,通常比Java 序列化机制性能高10倍。但是并不支持所有实现了java.io.Serializable 的类。使用 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 开启Kryo序列化。不使用Kryo做为默认值的原因是:需要注册自定义的类。例如:

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

注意:如果Object很大,需要在配置中增加 spark.kryoserializer.buffer 的值。如果没有在Kryo中注册自定义的类,Kryo也能正常工作,这些类会完全地保存下来(等于没有序列化就进行传输或保存了),会造成资源浪费。

2. 内存调优

可以考虑3个方面:(1)对象需要的总内存 (2)指向这些对象的指针 (3)GC

通常情况下,指针占用的空间将是原始数据的2~5倍。有以下几个原因:

(1)Java对象的“object header”(对象头),包含了指向它的类的指针,占用16bytes。对于一些只有很少数据的object,16bytes要比对象本身占用的空间要多。

(2)Java String 中在原始String数据的基础上有另外40bytes的开销(String的保存形式是Char的数组,并且有length的额外数据)。因为String内部使用UTF-16编码,每个char 占用2个byte。因此10个字符的String,将会很轻易地占用60个bytes

(3)诸如HashMap,LinkedList 的集合类,使用链式结构,每个entry(Map.Entry)都有一个包装类。这些类不仅有“object header”,还有指向下一个对象的指针(通常是8个bytes)。

(4)基本类型的集合,通常会被包装成对象类型。

3. 内存管理

Spark中的内存使用主要有两类:执行内存和存储内存。执行内存是指shuffles, joins, sorts and aggregations计算时用到的内存。存储内存主要是指cache和集群间传播的内部数据用到的内存。执行内存和存储内存使用的是同一块区域。当没有计算执行时, 存储将获得所有这块区域的可用内存,反之亦然。执行比存储具有更高的获取内存的优先级,也就是说,如果内存不够时,存储会释放一部分内存给执行用,直到存储需要的最低的阀值。

有两个相关的配置,但是通常来说,用户不需要改变其默认值。

(1) spark.memory.fraction  表示使用的Java 堆内存的比例。默认值0.6. 剩下的40%的内存用于:(a)存储用户数据、Spark内部元数据 (b)防止OOM

(2)spark.memory.storageFraction 表示上面所说的存储内存最少占用的比例。默认值 是0.5

4. 确定内存消耗

最好的方式是生成一个RDD并cache,在web UI 中的 Storage 中查看占用了多少内存。

确定一个指定object 占用内存的大小,可以使用 SizeEstimator.estimate(obj) 方法。

5. 调整数据结构

减少内存消耗,首先应该避免使用基于指针的数据结构和包装对象等诸如此类的Java特性。有以下几种途径:

(1)数据结构优先使用对象数组和基本类型,尽量不使用Java和scala里的集合类(如:HashMap)。可以使用 fastutil (http://fastutil.di.unimi.it/) 提供的集合类和基本类型。

(2)尽量避免使用有很多小对象和指针的内嵌结构

(3)考虑使用数字ID 和枚举类代替作为key的String

(4)如果内存小于32GB,在Spark-env.sh 里设置  -XX:+UseCompressedOops,这样指针使用4bytes 而不是8bytes

6. 序列化RDD 存储

当你的 object 仍然很大,简单的降低内存消耗的方法是使用序列化的存储方法。强烈建议使用kyro序列化机制。

7. 垃圾回收调优

垃圾回收的时间主要是花费在寻找那些不再被引用的对象。因此它跟Java Object 的数量有关。我们应该使用具有较少object的数据结构(如:使用array代替linkedList)。一种较好的方法是用序列化的形式持久化Object,这样每个RDD partition 只有一个字节数组。

测量GC的影响:在Java option 中加入 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 后,可在worker 的 stdout 中找到GC的日志。

(1) 在任务完成之前,如果有多次full GC,说明执行任务的内存不够

(2) 如果有多次minor GC,但是 full GC 并不多,可以增大 Eden 区的大小

(3) 在GC的日志中,如果老年代快满了,减少 spark.memory.fraction 以降低cache所用的内存

(4) 尝试使用 G1 垃圾回收器(-XX:+UseG1GC)。如果堆比较大,应该增加 G1 区的大小(通过 -XX:G1HeapRegionSize 设置

(5) 如果任务是从HDFS上读数据,HDFS 块的大小为 128M,块解压后的大小一般为原始大小的2~3倍,如果要运行4个task,可以估算Eden区需要 4*3*128M。

8. 其它

(1)并行度。除非你手动每步都设置较高的并行度,否则,集群不会被最大化地利用。Spark会自动根据每个文件的大小设置相应的task数量。对于诸如groupByKey,reduceByKey 等 reduce 操作,并行度为最大的父 RDD 的 partition 的数量。可以配置 spark.default.parallelism 设置默认的并行度。一般来讲,建议一个CPU 运行 2~3个task。

(2)Reduce Task 的内存使用。有时候,发生OOM并不是因为内存中放不下RDD,而是因为某个或几个task 分配的内存不够。例如:某个groupByKey 操作处理很大的数据集(因为数据倾斜的缘故)。 简单的解决方法是:设置较高的并行度。

(3)广播大的变量。 使用广播的功能能有效地减少序列化的 task 的大小和集群加载job的花消。如果你的task中需要使用一个来自driver的大的object(如:静态查询表),应该把它转化成广播变量。 Master端会打印序列化后的 task 的大小,通常如果大于20KB 的话,就值得去优化。

(4)数据本地性。数据本地性可分为以下几类:

(a) PROCESS_LOCAL  数据在运行代码的JVM中。

(b) NODE_LOCAL 数据和运行的代码在同一台机器上。如:当前节点上正好有HDFS的数据块。

(c) NO_PREF 数据可以较快获取,但是不在本地

(d) RACK_LOCAL 数据在同一 机架内,需要通过network获取

(e) Any 除上述外的数据

最好的情况就是 task 都运行在最好的数据本地性的环境,但通常不太可能。很多时候,某个executor 上的任务都完成了,而其它忙碌的机器上尚有未处理的data。Spark通常会等一段时间,以等待忙碌的机器空闲下来去处理数据(因为具有较高的本地性)。当超过这个等待时间后,空间的executor会把这些数据拉过来进行处理。每个数据本地性级别对应的等待时间可以查看配置中的 spark.locality 部分。通常默认的配置工作得蛮好的。如果你的task运行时间较长,可以增加这些值。

原文地址:https://www.cnblogs.com/langfanyun/p/8073665.html

时间: 2024-10-16 15:29:46

Spark 官网提到的几点调优的相关文章

Spark Streaming 官网上提到的几点调优

总的来说,需要考虑以下两点: 1. 有效地运用集群资源去减少每个批次处理的时间 2. 正确的设置batch size,以使得处理速度能跟上接收速度 一.  为了减少处理时间,主要有以下几个优化点: 1. 接收数据的并行度. 每个InputDStream只创建一个Receiver用于接收数据,如果接收数据是系统的瓶颈,可以创建多个InputDStream.配置不同的InputDStream读取数据源的不同分区.比如原先用一个InputDStream读取Kafka的两个topic的数据,可以拆分成两

Spark-->combineByKey【请阅读Apache spark官网文档】

这篇文章,很有必要看,写的不错.但是看过后,不要忘记查看Apache spark官网.因为这篇文章理解还是和源码.官网文档 不一致.有一点错误![cnblogs的代码编辑器 不支持Scala,所以 语言的关键字 没有高亮显示] 在数据分析中,处理Key,Value的Pair数据是极为常见的场景,例如我们可以针对这样的数据进行分组.聚合或者将两个包含Pair数据的RDD根据key进行join.从函数的抽象层面看,这些操作具有共同的特征,都是将类型为RDD[(K,V)]的数据处理为RDD[(K,C)

spark记录(6)SparkCore的调优之开发调优

摘抄自:https://www.cnblogs.com/qingyunzong/p/8946637.html 前言 在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的功能涵盖了大数据领域的离线批处理.SQL类处理.流式/实时计算.机器学习.图计算等各种不同类型的计算操作,应用范围与前景非常广泛.在美团•大众点评,已经有很多同学在各种项目中尝试使用Spark.大多数同学(包括笔者在内),最初开始尝试使用Spark的原因很简单,主要就是为了让大数据计算作业的执

spark记录(9)SparkCore的调优之Spark内存模型

摘抄自:https://www.cnblogs.com/qingyunzong/p/8946637.html 一.概述 Spark 作为一个基于内存的分布式计算引擎,其内存管理模块在整个系统中扮演着非常重要的角色.理解 Spark 内存管理的基本原理,有助于更好地开发 Spark 应用程序和进行性能调优.本文旨在梳理出 Spark 内存管理的脉络,抛砖引玉,引出读者对这个话题的深入探讨.本文中阐述的原理基于 Spark 2.1 版本,阅读本文需要读者有一定的 Spark 和 Java 基础,了解

本地开发spark代码上传spark集群服务并运行(基于spark官网文档)

打开IDEA 在src下的main下的scala下右击创建一个scala类 名字为SimpleApp ,内容如下 import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = "/home/spark/opt/s

spark官网学习

1.foreachRDD设计模式 正确使用foreachRDD的方式,每一个分区创建一个单例connection对象. dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => //创建单例对象 val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) con

贝叶斯、朴素贝叶斯及调用spark官网 mllib NavieBayes示例

贝叶斯法则 机器学习的任务:在给定训练数据D时,确定假设空间H中的最佳假设. 最佳假设:一种方法是把它定义为在给定数据D以及H中不同假设的先验概率的有关知识下的最可能假设 贝叶斯理论提供了一种计算假设概率的方法,基于假设的先验概率.给定假设下观察到不同数据的概率以及观察到的数据本身 先验概率和后验概率 用P(A)表示在没有训练数据前假设A拥有的初始概率.P(A)被称为A的先验概率. 先验概率反映了关于A是一正确假设的机会的背景知识 如果没有这一先验知识,可以简单地将每一候选假设赋予相同的先验概率

Spark技术内幕:Shuffle的性能调优

通过上面的架构和源码实现的分析,不难得出Shuffle是Spark Core比较复杂的模块的结论.它也是非常影响性能的操作之一.因此,在这里整理了会影响Shuffle性能的各项配置.尽管大部分的配置项在前文已经解释过它的含义,由于这些参数的确是非常重要,这里算是做一个详细的总结. 1.1.1  spark.shuffle.manager 前文也多次提到过,Spark1.2.0官方支持两种方式的Shuffle,即Hash Based Shuffle和Sort Based Shuffle.其中在Sp

spark性能调优之资源调优

转https://tech.meituan.com/spark-tuning-basic.html spark作业原理 使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程.根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动.Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU core.而Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Stand