spark优化

优化一般考虑资源优化

一、资源优化

I 集群方面的:driver的内存,worker内存,核数

方法

1.配置文件:spark-env.sh(配置worker的信息)

SPARK_WORKER_CORE 每个worker的使用总核数

SPARK_WORKER_MEMORY 每个worker所使用的内存数(shuffer阶段利用netty

传输文件还会使用到的executor堆外内存也在其中)

SPARK_WORKER_INSTANCE 每台节点上启动的worker数量(standalone集群上

默认为1个,运行在yarn上默认是2个)

2.提交job的时候进行设置(配置excutor的信息)

--executor-cores 默认情况下每一个Executor使用这台Worker所有的core

--executor-memory 1

--driver-cores 1

--driver-memory 1

--total-executor-cores

堆外内存设置(传输过程中netty框架使用,较小可能到导致excutor挂掉)

--conf spark.yarn.executor.memoryOverhead=2048(yarn)

--conf spark.executor.memoryOverhead=2048 (stabdalone)

excutor间通信超时时间(防止在gc发生期间连接导致传输失败)

--conf spark.core.connection.ack.wait.timeout=60

II excutor的内存分布

III 根据配置的资源情况调节并行度,也就是task的个数

合理设置并行度,就可以完全充分利用你的集群计算资源,合理设置并行度,就可以完全充分利用你的集群计算资源,让每个task处理的数据量尽量减少来提高速度.

task的数量:至少设置成与集群cpu core数量相同(理想化的状态:每个核分配到任务之后,并行处理,结束的时间基本相同)

官方推荐task的数量为总的cpu core的数量的2~3倍

但是实际情况与理想情况会产生偏差(由于每个核的处理性能和每个核处理的数据量(例如一

条数据包含的量比较大)),所以将task的数量设置为2~3倍,可以有效地使处理速度快的核处理

更多的数据

设置task的数量方法(主要由stage中的finalRDD中的partition数来决定的--因为管道模式执行)

1.job执行过程中shuffle 之后reducer的分区数,默认是跟上一个rdd相同

--conf spark.default.parallelism(sparkcore)

--conf spark.sql.shuffle.partitions(sparksql) 默认200

2.数据源是hdfs,可以增加block块的大小

3.读取文件的方法:textFile(filePath,numPartitions)

4.shuffer类的算子 可以传入numPartitions

5.自定义的分区器

6.reparttion,coalesce

sparkStreaming+kafka情况下:

recerver(接收模式): block.interval 默认200ms

dirct(直联模式): 读取的topics数 KafkaUtils.createDirectStream()

二、代码优化

代码优化主要通过调节框架执行的参数与代码使用的算子层面进行优化

1.避免创建重复的RDD:性能没有影响

2.尽可能的复用RDD

3.对多次使用的RDD进行持久化

cache MEMORY_ONLY

persist 选择的优先顺序

I MEMORY_ONLY

II MEMORY_ONLY_SER

III MEMORY_AND_DISK_SER

IV 不建议使用DISK_ONLY

checkpoint(主要是为了防止数据丢失,最好在调用之前cache一下)

I 持久化数据到 HDFS

II 切断RDD之间的依赖关系

执行流程:

1、我们的job 的 job执行完成后,会从final RDD 从后往前回溯

2、在回溯的过程,哪一个RDD调用了checkpoint就对这个RDD

做一个标记

3、框架会自动重新启动一个新的job,重新计算这些RDD,然

后将结果持久化到HDFS上

4、切断这些RDD的依赖关系,统一改名为checkpointRDD

4.避免使用shuffle类的算子 主要指join

可以利用广播变量改变 用在一个RDD数量较大,一个较小的情况

join = 广播变量 + filter 获取的数据只与其中一个RDD有关

join = 广播变量 + mapToPair(或者map) 获取的结果用到了两个RDD中的数据

注意: 广播变量大小不能超过executor内存的54%

5.使用map-side预处理shuffer的操作,也就是多使用有combine操作的shuffle算子

reduceByKey 代替 groupByKey

aggregateByKey

combineByKey

6.使用高性能的算子

mapPartition <- map

foreachPartition <- foreach

reduceByKey

aggreateByKey

reparttion(增加分区)

filter+coalesce(减少分区,也就是task的数量)

7.广播变量

如果不适用广播变量情况下,算子的方法中用到了driver中的变量(其实是常量),那么每个task都会有一个变量副本,使用广播变量会是副本数量减小到每个executor一份

8.序列化的时候 采用Kyro序列化方式

conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer").registerKryoClasses(new Class[]{XXXXX.class}))

在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输

将自定义的类型作为RDD的泛型类型时(比如JavaRDD<>,SXT是自定义类

型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必

须实现Serializable接口。

使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中

的每个partition都序列化成一个大的字节数组。

9.优化数据结构

10. 使用高性能的库fastutil 例如IntValue来代替List<Integer>

三、shuffle调优

1.选择shuffle的类型

spark.shuffle.manager 1.2以及以后版本默认为sort

hash (开启合并机制) spark.shuffle.consolidateFiles false默认未开启

sort (开启byPass机制) spark.shuffle.sort.bypassMergeThreshold > shuffle read task

2.buffer 大小 默认32k spark.shuffle.file.buffer

3.reduce task 拉取数据的失败等待时间,失败的重试次数

spark.shuffle.io.maxRetries spark.shuffle.io.retryWait

4.reduce task 拉取数据 一次拉取的量 spark.reducer.maxSizeInFlight 默认48m

四.JVM调优

通过woker的4040UI页面查看task的GC时间 如果GC时间过大 考虑调节GC时间

调节方式

1.降低RDD缓存所占比例

2.降低shuffle聚合数据所占内存比例

3.增大executor的内存

五.task的数据本地化调优

task需要的数据与task最好位于同一个excutor中,可以充分的减少网络io开销

本地化级别分为

PROCESS_LOCAL

task执行所须得数据位于同一excutor的内存中

NODE_LOCAL

task执行所需要的数据位于同一台机器的内存中,可以是其他excutor的内存中

NO_PREF

ROCK_LOCAL

task执行所需要的数据位于同一个机架上的机器的内存中或者磁盘中

ANY

task与数据在集群的任意地方

taskscheduler发送task的时候首先选择process_local,但是由于excutor中可能存在task执行

那么会造成任务的失败,taskscheduler会等待3秒后进行重试,重试5次之后会降低本地化级别

进行发送task

通过UI页面查看每个任务的本地化级别,如果级别都是过低,那么就应该调节本地化级别

调节等待时间:

spark.locality.wait

spark.locality.wait.process

spark.locality.wait.node

spark.locality.wait.rack

六.数据倾斜的处理(一个stage的执行速度取决于最慢的task执行的速度)

主要针对join进行调优

1.让hive解决 治标不治本

2.过滤掉导致倾斜的key 此操作可能是好的作用 可以去除机械点击或者爬虫产生的数据

3.提高shuffer的并行度 当数据比较均匀的条件下使用

4.双重聚合 当数据量比较大,并进行聚合操作的时候

先将key打上随机的reduce task以下的数作为前缀,进行一次聚合

去掉前缀,再次进行聚合

5.将reduce join 装化为 map join 使用与join的其中一个RDD数量比较小(几百兆到一两个G)

利用广播变量 来替代join

6.拆分其中导致数据倾斜的RDD 一个RDD,少量key对应很多数据量,另一个比较均匀

I 抽样查询其中key对应数据量较多的键值对

II 将两个RDD中的TopN的key过滤出来变为四个RDD,两个包含导致倾斜的key的RDD,

两个都是不会倾斜key的RDD

III 包含导致倾斜key的两个RDD 其中导致倾斜的RDD进行拼接reduce task 以内随机数的

前缀,另外一个进行扩充reduce task倍并拼接从0到reduce task 前缀的操作,利用

reduce task个task进行join操作,将join后的结果key去除前缀

IV 将不会倾斜的RDD进行join计算

V 将两个得到的结果进行union 拼接得到最终的join的结果

7.大量的key导致数据倾斜 一个RDD,少量key对应很多数据量,另一个比较均匀

直接将导致倾斜的RDD打上N以内的随机数前缀

另一个扩充N倍并打上从0到N的的前缀

利用N个task来进行join

shuffle file 找不到问题分析:

问题分析:

Executor挂掉了,BlockManager对象就没了

Executor没有挂掉,而是在建立通信或者在数据传输的环节出现了问题

解决:

如果是Executor挂掉了,

堆内内存不足

1、检查代码 2、提高Executor的内存 --executor-memory

堆外内存不足导致

--conf spark.yarn.executor.memoryOverhead=2048(yarn)

--conf spark.executor.memoryOverhead=2048 (stabdalone)

注意:是在提交application的时候设置

如果Executor没有挂掉

建立通信出现了问题(可能是遇到map的excutor正处于gc状态)

增加建立通信的超时时间-conf spark.core.connection.ack.wait.timeout=60

注意:是在提交application的时候设置

数据传输的环节出现了问题:

提高拉数据的重试次数以及等待时间

时间: 2024-10-13 16:10:25

spark优化的相关文章

Spark优化一则 - 减少Shuffle

Spark优化一则 - 减少Shuffle 看了Spark Summit 2014的A Deeper Understanding of Spark Internals,视频(要科学上网)详细讲解了Spark的工作原理,Slides的45页给原始算法和优化算法. 破砂锅用自己3节点的Spark集群试验了这个优化算法,并进一步找到更快的算法.测试数据是Sogou实验室的日志文件前10000000条数据.目标是对日志第2列数据,按照第一个字母合并,得到每个首字母有几条记录. 所有的方案都重新启动Spa

【总结】Spark优化-多Job并发执行

Spark程序中一个Job的触发是通过一个Action算子,比如count(), saveAsTextFile()等 在这次Spark优化测试中,从Hive中读取数据,将其另外保存四份,其中两个Job采用串行方式,另外两个Job采用并行方式.将任务提交到Yarn中执行.能够明显看出串行与兵线处理的性能. 每个Job执行时间: JobID 开始时间 结束时间 耗时 Job 0 16:59:45 17:00:34 49s Job 1 17:00:34 17:01:13 39s Job 2 17:01

Spark优化之gc

对于官方Programming Guides的GC优化一节做了阅读. 在这里记录一下我的理解,可能记录的比较混乱没有条理: 我理解其实GC优化的主要目的就是在你的任务执行中使用更少的内存,进行更少的gc回收,因为GC回收会使你的任务执行的更慢. 使用-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps可以在日志中观察你的程序执行过程中GC回收的情况.根据日志进行优化.例如你发现回收的过于频繁,那么就要看一下是不是你的程序和内存使用是否有问题

spark优化(一):合理分配资源

分配更多资源:性能调优的王道,就是增加和分配更多的资源,性能和速度上的提升,是显而易见的:基本上,在一定范围之内,增加资源与性能的提升,是成正比的:写完了一个复杂的spark作业之后,进行性能调优的时候,首先第一步,我觉得,就是要来调节最优的资源配置:在这个基础之上,如果说你的spark作业,能够分配的资源达到了你的能力范围的顶端之后,无法再分配更多的资源了,公司资源有限:那么才是考虑去做后面的这些性能调优的点. 问题:1.分配哪些资源?2.在哪里分配这些资源?3.为什么多分配了这些资源以后,性

spark优化之临时目录

官方是这样说的: Directory to use for "scratch" space in Spark, including map output files and RDDs that get stored on disk. This should be on a fast, local disk in your system. It can also be a comma-separated list of multiple directories on different

spark优化之数据结构(减少内存)

官网是这么说的: The first way to reduce memory consumption is to avoid the Java features that add overhead, such as pointer-based data structures and wrapper objects. There are several ways to do this:1.Design your data structures to prefer arrays of object

spark优化之并行度

这个其实我前面已经记录过了,这里在记录一下. 我可以通过参数人为的来控制分区大小,增加分区中即可增加任务的并行度,并行度高自然运行的就快了嘛. 官方推荐集群中每个cpu并行的任务是2-3个(也就是2-3个partition),这样对于资源使用是最充分的 那么如何调整并行度呢. 在类似 sc.textFile 的方法中的第二个参数即可调整分区滴

spark性能优化

一:Spark的性能优化,主要手段包括:1.使用高性能序列化类库2.优化数据结构3.对多次使用的RDD进行持久化 / Checkpoint4.使用序列化的持久化级别5.Java虚拟机垃圾回收调优6.提高并行度7.广播共享数据8.数据本地化9.reduceByKey和groupByKey的合理使用10.Shuffle调优(核心中的核心,重中之重) 二:spark诊断内存消耗 java主要的内存消耗 1.每个Java对象,都有一个对象头,会占用16个字节,主要是包括了一些对象的元信息,比如指向它的类

Spark调优

转载:http://www.oschina.net/translate/spark-tuning 因为大部分Spark程序都具有“内存计算”的天性,所以集群中的所有资源:CPU.网络带宽或者是内存都有可能成为Spark程序的瓶颈.通常情况下,如果数据完全加载到内存那么网络带宽就会成为瓶颈,但是你仍然需要对程序进行优化,例如采用序列化的方式保存RDD数据(Resilient Distributed Datasets),以便减少内存使用.该文章主要包含两个议题:数据序列化和内存优化,数据序列化不但能