性能调优:
总则:加资源加并行度 简单直接,调节最优的资源配置 RDD架构和持久化
当可分配的资源无法达到更多的时候在考虑性能调优
从 重剑无锋 到 花拳绣腿
1.分配资源 并行度 RDD架构和缓存
2.shuffle调优
3.spark算子调优
4.JVM调优 、 广播大变量
分配哪些资源:executor(task--worker任务数) cpu per executor(每个作业的cpu核心数)、memory (可以使用的内存)、driver memory(影响不大)
在spark submit shell脚本里面调整对应的参数
yarn资源队列
standalone 熟悉硬件配置
原则:你能使用多大资源就调节多大
可有并行执行的task ,executor * cpu cor,减少磁盘IO 提高性能
如果有RDD.cache(),分配更多的内存,不用写入磁盘 ,如果内存较小,jvm 垃圾回收也会成为较大的问题,shuffle阶段reduce端拉取数据聚合,也是消耗内存,溢出会写入磁盘。
Job: 多个stage ,stage0结束到reduceBykey的时候,会在stage1阶段的reduce端task建立一份文件(可以有多个key,但是相同key的一定在同一个文件里) 去stage0 端拉取自己的数据
并行度: 各个stage阶段的task数量,官方推荐 task数量设置成cpu核心数的 2-3倍 这样CPU core 不至于那样的空闲 提升性能conf.set("spark.default.parallelism","500")
RDD持久化后,内存、磁盘 各有一部分数据,当内存不够支撑 可以将RDD序列化成字节数组,就一个对象大大减少内存空间占用,在使用需要反序列化 还是OOM 存入磁盘,内存很大时,可以双副本机制两份持久化数据
纯内存 persist(StorageLevel.MEMORY_ONLY()) 可以用.cache()代替
persist(StorageLevel.MEMORY_ONLY_SER())
persist(StorageLevel.MEMORY_AND_DISK())
persist(StorageLevel.MEMORY_AND_DISK_SER())
persist(StorageLevel.DISK_ONLY())
persist(StorageLevel.MEMORY_ONLY_2()) 双副本机制
广播变量: 不用的话map副本消耗网络资源,传输到各个task 每个都占用内存,堆区消耗增大,大变量广播出去,不是直接使用,开始driver上面一份 task去BlockManager负责管理的某个Executoor对应的内存磁盘上的数据上面拿 ,BLM没有的话会去driver或者另一个BLM上面拿,不是每个task一个副本,而是每个executor上一个随机抽取的时候可以用到广播变量broadcast
Kryo序列化:
使用:算子中用到的外部变量、持久化RDD进行序列化时、shuffle
Kryo序列化 比java默认的序列化速度快 内存占用小 是java序列化机制的1/10
在sparkconf中设置属性 .set(spark.serializer,org.apache.spark.serializer.KryoSreializer)
注册你需要Kryo序列化的类 .registerKryoClass(new Class[]{xxx.class})
数据本地化等待时间
task没有被分配到它数据所在的节点(节点计算资源满了)
task等待默认3s 等不下去就会被分配到别的节点 或者task去用节点的另一个executor执行,task的executor去BlockManager拉取所需的数据,这需要网络传输,通过TransferService 去远程其他节点找。最差可能需要跨机架去拉取数据
本地化级别 : 进程本地化 、节点本地化、机架本地化
可以观察日志 PROCESS_LOCAL ANY 都是什么级别的 调节等待时长 查看spark作业时长有没有缩短
调节方法:conf .set("spark.locality","10")
JVM调优:
降低cache操作的内存占比
让task执行算子函数时 有更多的内存可以使用 减少persist内存占比
JVM 内存划分:算子计算用 、persist用于持久化
executor 堆外内存与连接等待时长
task stage0 阶段,堆外内存不足,OOM,block manager 挂掉
task stage1 通过mapoutputtracker 得到需要的数据地址,去stage0 阶段找的时候 找不到,此时任务会重新执行...反复几次task失败,查看log shuffle output file not found 就是这个问题 task lost 、oom 等
设置 :提交作业的时候参数设置里面,默认是300M 通常大任务的时候是不够的
-conf spark.yarn.executor.memoryOverhead=2048
task去别的BlockManager上拉取数据的那个节点,正在进行JVM GC,此时连接超过默认60s 超时失败,反复提交stage
报错: error:file id uuid not found 、file lost 等
-conf spark.core.connection.ack.wait.time.out=300,这个可能解决偶尔的数据拉取失败情况
shuffle调优
shuffle原理
在某个action触发job的时候 DAGScheduler 会负责划分job为多个stage 依据算子 将操作前半部分以及之前所有的RDD transformation,操作划分为一个stage shuffle的后半部分直到action为之的RDD和transformation操作划分为另外一个stage,所以一个shuffle 会有两个stage ,stage0 通过内存缓冲区,缓冲区满溢后再spill溢写将key相同的写入磁盘的同一个文件,driver总共有多少个task 他就生成多少个文件,stage1 将属于他的key的那个文件拉取文件K,V对 到内存缓冲区 用HashMap数据格式 进行(k,v)的汇聚。task用我们自定义的聚合函数进行累加得到的最终的值 就完成了shuffle
合并map端输出文件:
.set("spark.shuffle.consolidateFiles","true") 默认是不开启的
并行执行的task创建输出文件 下一批并行执行的task 复用之前已有的输出文件 大大减少了map端的输出文件,也减少了stage1阶段task任务的拉取文件数,大大减少了磁盘的IO,实际生产环节 可能减少的时间是可以缩短一半的
调节map端内存缓冲与reduce端内存占比:
map端内存缓冲默认32K ,如果task处理数量较大 需要频繁的将内存溢写到磁盘
reduce端内存使用自己executor的JVM 堆空间q 分配是默认 0.2 当拉取的数据过多
频繁的spill操作 溢写到磁盘 只有聚合的时候还的多次读写存在磁盘的数据进行聚合
参数查看: standalone spark UI :4040 stage executor task shuffle read writer
yarn 界面 application spark UI ...
实现设置: .set("spark.shuffle.file.buffer"."32") ,调节 64 、128 成倍加
.set("spark.shuffle.memoryFraction","0.2"),调节0.1 的加
调节之后,减少了内存缓冲区溢写的次数 ,也减少了聚合读取磁盘文件的数量
HashShuffleManager和SortShuffleManager
spark.shuffle.manager:hash、sort、tungsten-sort
new SparkConf().set("spark.shuffle.manager", "hash") 普通的规则
new SparkConf().set("spark.shuffle.manager", "tungsten-sort")钨丝,官方说是优化了内存机制
原文地址:https://www.cnblogs.com/lsbigdata/p/10204858.html