spark使用性能优化记录

性能调优:

总则:加资源加并行度 简单直接,调节最优的资源配置 RDD架构和持久化

当可分配的资源无法达到更多的时候在考虑性能调优

从 重剑无锋 到 花拳绣腿

1.分配资源 并行度 RDD架构和缓存

2.shuffle调优

3.spark算子调优

4.JVM调优 、 广播大变量

分配哪些资源:executor(task--worker任务数)  cpu per  executor(每个作业的cpu核心数)、memory (可以使用的内存)、driver memory(影响不大)

在spark submit shell脚本里面调整对应的参数

yarn资源队列

standalone  熟悉硬件配置

原则:你能使用多大资源就调节多大

可有并行执行的task ,executor * cpu cor,减少磁盘IO 提高性能

如果有RDD.cache(),分配更多的内存,不用写入磁盘 ,如果内存较小,jvm 垃圾回收也会成为较大的问题,shuffle阶段reduce端拉取数据聚合,也是消耗内存,溢出会写入磁盘。

Job: 多个stage ,stage0结束到reduceBykey的时候,会在stage1阶段的reduce端task建立一份文件(可以有多个key,但是相同key的一定在同一个文件里) 去stage0 端拉取自己的数据

并行度:  各个stage阶段的task数量,官方推荐 task数量设置成cpu核心数的 2-3倍  这样CPU core 不至于那样的空闲 提升性能conf.set("spark.default.parallelism","500")

RDD持久化后,内存、磁盘 各有一部分数据,当内存不够支撑 可以将RDD序列化成字节数组,就一个对象大大减少内存空间占用,在使用需要反序列化 还是OOM 存入磁盘,内存很大时,可以双副本机制两份持久化数据

纯内存 persist(StorageLevel.MEMORY_ONLY())   可以用.cache()代替

persist(StorageLevel.MEMORY_ONLY_SER())

persist(StorageLevel.MEMORY_AND_DISK())

persist(StorageLevel.MEMORY_AND_DISK_SER())

persist(StorageLevel.DISK_ONLY())

persist(StorageLevel.MEMORY_ONLY_2()) 双副本机制

广播变量:   不用的话map副本消耗网络资源,传输到各个task 每个都占用内存,堆区消耗增大,大变量广播出去,不是直接使用,开始driver上面一份 task去BlockManager负责管理的某个Executoor对应的内存磁盘上的数据上面拿 ,BLM没有的话会去driver或者另一个BLM上面拿,不是每个task一个副本,而是每个executor上一个随机抽取的时候可以用到广播变量broadcast

Kryo序列化:

使用:算子中用到的外部变量、持久化RDD进行序列化时、shuffle

Kryo序列化 比java默认的序列化速度快 内存占用小 是java序列化机制的1/10

在sparkconf中设置属性 .set(spark.serializer,org.apache.spark.serializer.KryoSreializer)

注册你需要Kryo序列化的类 .registerKryoClass(new Class[]{xxx.class})

数据本地化等待时间

task没有被分配到它数据所在的节点(节点计算资源满了)

task等待默认3s 等不下去就会被分配到别的节点 或者task去用节点的另一个executor执行,task的executor去BlockManager拉取所需的数据,这需要网络传输,通过TransferService 去远程其他节点找。最差可能需要跨机架去拉取数据

本地化级别 : 进程本地化 、节点本地化、机架本地化

可以观察日志 PROCESS_LOCAL ANY 都是什么级别的 调节等待时长 查看spark作业时长有没有缩短

调节方法:conf .set("spark.locality","10")

JVM调优:

降低cache操作的内存占比

让task执行算子函数时 有更多的内存可以使用 减少persist内存占比

JVM 内存划分:算子计算用 、persist用于持久化

executor 堆外内存与连接等待时长

task stage0 阶段,堆外内存不足,OOM,block manager 挂掉

task stage1 通过mapoutputtracker 得到需要的数据地址,去stage0 阶段找的时候 找不到,此时任务会重新执行...反复几次task失败,查看log shuffle output file not found 就是这个问题 task lost 、oom 等

设置 :提交作业的时候参数设置里面,默认是300M 通常大任务的时候是不够的

-conf spark.yarn.executor.memoryOverhead=2048

task去别的BlockManager上拉取数据的那个节点,正在进行JVM GC,此时连接超过默认60s 超时失败,反复提交stage

报错: error:file id uuid not found 、file lost 等

-conf spark.core.connection.ack.wait.time.out=300,这个可能解决偶尔的数据拉取失败情况

shuffle调优

shuffle原理

在某个action触发job的时候 DAGScheduler 会负责划分job为多个stage 依据算子 将操作前半部分以及之前所有的RDD transformation,操作划分为一个stage shuffle的后半部分直到action为之的RDD和transformation操作划分为另外一个stage,所以一个shuffle 会有两个stage ,stage0 通过内存缓冲区,缓冲区满溢后再spill溢写将key相同的写入磁盘的同一个文件,driver总共有多少个task 他就生成多少个文件,stage1 将属于他的key的那个文件拉取文件K,V对 到内存缓冲区 用HashMap数据格式 进行(k,v)的汇聚。task用我们自定义的聚合函数进行累加得到的最终的值 就完成了shuffle

合并map端输出文件:

.set("spark.shuffle.consolidateFiles","true")   默认是不开启的

并行执行的task创建输出文件 下一批并行执行的task 复用之前已有的输出文件  大大减少了map端的输出文件,也减少了stage1阶段task任务的拉取文件数,大大减少了磁盘的IO,实际生产环节 可能减少的时间是可以缩短一半的

调节map端内存缓冲与reduce端内存占比:

map端内存缓冲默认32K ,如果task处理数量较大 需要频繁的将内存溢写到磁盘

reduce端内存使用自己executor的JVM 堆空间q 分配是默认 0.2 当拉取的数据过多

频繁的spill操作 溢写到磁盘 只有聚合的时候还的多次读写存在磁盘的数据进行聚合

参数查看: standalone spark UI :4040 stage  executor task shuffle read  writer

yarn  界面 application  spark UI ...

实现设置: .set("spark.shuffle.file.buffer"."32") ,调节 64 、128 成倍加

.set("spark.shuffle.memoryFraction","0.2"),调节0.1 的加

调节之后,减少了内存缓冲区溢写的次数 ,也减少了聚合读取磁盘文件的数量

HashShuffleManager和SortShuffleManager

spark.shuffle.manager:hash、sort、tungsten-sort

new SparkConf().set("spark.shuffle.manager", "hash") 普通的规则

new SparkConf().set("spark.shuffle.manager", "tungsten-sort")钨丝,官方说是优化了内存机制

原文地址:https://www.cnblogs.com/lsbigdata/p/10204858.html

时间: 2024-09-24 10:04:50

spark使用性能优化记录的相关文章

Spark记录-Spark性能优化解决方案

Spark性能优化的10大问题及其解决方案 问题1:reduce task数目不合适解决方式:需根据实际情况调节默认配置,调整方式是修改参数spark.default.parallelism.通常,reduce数目设置为core数目的2到3倍.数量太大,造成很多小任务,增加启动任务的开销:数目太少,任务运行缓慢. 问题2:shuffle磁盘IO时间长解决方式:设置spark.local.dir为多个磁盘,并设置磁盘为IO速度快的磁盘,通过增加IO来优化shuffle性能: 问题3:map|red

Spark性能优化的10大问题及其解决方案

Spark性能优化的10大问题及其解决方案 问题1:reduce task数目不合适 解决方式: 需根据实际情况调节默认配置,调整方式是修改参数spark.default.parallelism.通常,reduce数目设置为core数目的2到3倍.数量太大,造成很多小任务,增加启动任务的开销:数目太少,任务运行缓慢. 问题2:shuffle磁盘IO时间长 解决方式: 设置spark.local.dir为多个磁盘,并设置磁盘为IO速度快的磁盘,通过增加IO来优化shuffle性能: 问题3:map

Spark性能优化指南——高级篇

Spark性能优化指南--高级篇 [TOC] 前言 继基础篇讲解了每个Spark开发人员都必须熟知的开发调优与资源调优之后,本文作为<Spark性能优化指南>的高级篇,将深入分析数据倾斜调优与shuffle调优,以解决更加棘手的性能问题. 数据倾斜调优 调优概述 有的时候,我们可能会遇到大数据计算中一个最棘手的问题--数据倾斜,此时Spark作业的性能会比期望差很多.数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能. 数据倾斜发生时的现象 绝大多数tas

Spark性能优化指南——基础篇

前言 在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的功能涵盖了大数据领域的离线批处理.SQL类处理.流式/实时计算.机器学习.图计算等各种不同类型的计算操作,应用范围与前景非常广泛.在美团•大众点评,已经有很多同学在各种项目中尝试使用Spark.大多数同学(包括笔者在内),最初开始尝试使用Spark的原因很简单,主要就是为了让大数据计算作业的执行速度更快.性能更高. 然而,通过Spark开发出高性能的大数据计算作业,并不是那么简单的.如果没有对Spar

王家林谈Spark性能优化第一季!(DT大数据梦工厂)

内容: 1.Spark性能优化需要思考的基本问题: 2.CPU和Memory: 3.并行度和Task: 4.网络: ==========王家林每日大数据语录============ 王家林每日大数据语录Spark篇0080(2016.1.26于深圳):如果Spark中CPU的使用率不够高,可以考虑为当前的程序分配更多的Executor,或者增加更多的Worker实例来充分的使用多核的潜能. 王家林每日大数据语录Spark篇0079(2016.1.26于深圳):适当设置Partition分片数是非

美团Spark性能优化指南——基础篇

http://tech.meituan.com/spark-tuning-basic.html 前言 在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的功能涵盖了大数据领域的离线批处理.SQL类处理.流式/实时计算.机器学习.图计算等各种不同类型的计算操作,应用范围与前景非常广泛.在美团?大众点评,已经有很多同学在各种项目中尝试使用Spark.大多数同学(包括笔者在内),最初开始尝试使用Spark的原因很简单,主要就是为了让大数据计算作业的执行速度更快.性

【转载】 Spark性能优化指南——基础篇

前言 开发调优 调优概述 原则一:避免创建重复的RDD 原则二:尽可能复用同一个RDD 原则三:对多次使用的RDD进行持久化 原则四:尽量避免使用shuffle类算子 原则五:使用map-side预聚合的shuffle操作 原则六:使用高性能的算子 原则七:广播大变量 原则八:使用Kryo优化序列化性能 原则九:优化数据结构 资源调优 调优概述 Spark作业基本运行原理 资源参数调优 写在最后的话 前言 在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的

Spark性能优化指南——基础篇转

前言 在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的功能涵盖了大数据领域的离线批处理.SQL类处理.流式/实时计算.机器学习.图计算等各种不同类型的计算操作,应用范围与前景非常广泛.在美团•大众点评,已经有很多同学在各种项目中尝试使用Spark.大多数同学(包括笔者在内),最初开始尝试使用Spark的原因很简单,主要就是为了让大数据计算作业的执行速度更快.性能更高. 然而,通过Spark开发出高性能的大数据计算作业,并不是那么简单的.如果没有对Spar

【转载】Spark性能优化指南——高级篇

前言 数据倾斜调优 调优概述 数据倾斜发生时的现象 数据倾斜发生的原理 如何定位导致数据倾斜的代码 查看导致数据倾斜的key的数据分布情况 数据倾斜的解决方案 解决方案一:使用Hive ETL预处理数据 解决方案二:过滤少数导致倾斜的key 解决方案三:提高shuffle操作的并行度 解决方案四:两阶段聚合(局部聚合+全局聚合) 解决方案五:将reduce join转为map join 解决方案六:采样倾斜key并分拆join操作 解决方案七:使用随机前缀和扩容RDD进行join 解决方案八:多