Spark 知识点总结--调优(一)

搭建集群: SPARK_WORKER-CORES : 当计算机是32核双线程的时候,需要指定SPARK_WORKER_CORES的个数为64个

SPARK_WORKER_MEMORY :

任务提交:

./spark-submit --master node:port --executor-cores --class  ..jar xxx

--executor-cores: 指定每个executor使用的core 的数量

--executor-memory: 指定每个executor最多使用的内存

--total-executor-cores: standalone 集群中 spark application 所使用的总的core

--num-executor : 在yarn 中为 spark application 启动的executor

--Driver-cores: driver使用的core

--Driver-memory: driver使用的内存

以上的参数是在spark-submit 提交任务的时候指定的,也可以在spark-defaults.xml中进行配置

spark 并行度调优: (一般在做测试的时候使用)

sc.textFile(xx,minnum)

sc.parallelize(seq,num)

sc.makeRDD(seq,num)

sc.parallelizePairs(List,num )

在算子层面提高并行度:

ReduceByKey(fun,num),join(xx,num),distinct(num),groupByKey(num)

可以使用repartition进行升高并行度:

repartition(num)/coalesce()   repartition(num) = coalesce(num,shuffle=num)

spark.default.parallelism :  本地模式默认的并行度是local[数字]    standalone/yarn: 当前executor中所使用的所有的core 的个数

spark.sql.shuffle.partitions 200

自定义分区器

sparkStreaming:

receiver 模式:  spark.streaming.blockInterval = 200ms

direct 模式(spark2.3+) : 与读取的topic的 partition 的个数是一致

代码调优:

1、 避免创建重复的RDD,尽量复用同一个RDD

2、对多次使用的RDD进行持久化

持久化的算子:

cache(): 默认将数据放在了内存中,当跨job使用RDD的时候,可以将数据放置到cache中

persist():

MEMORY_ONLY :直接将数据放置到内存中

MEMORY_ONLY_SER: 当数据量比较大的时候,可以将数据序列化之后放置到内存中

MEMORY_AND_DISK:将数据放置到磁盘中

MEMORY_AND_DISK_SER:

checkpoint()

3、尽量避免shuffle类的算子:

map类算子+光弄方式变量来替代join

4、使用map端有预聚合的shuffle类算子

reduceByKey:

aggregateByKey:

代码演示:

package com.optimize.study.spark

import org.apache.spark.{SparkConf, SparkContext}

object aggregateByKey {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local").setAppName("test")

    val sc = new SparkContext(conf)

    val unit = sc.parallelize(Array[(String, Int)](
      ("zhangsan", 18),
      ("zhangsan", 19),
      ("lisi", 20),
      ("wangwu", 21),
      ("zhangsan", 22),
      ("lisi", 23),
      ("wangwu", 24),
      ("wangwu", 25)
    ), 2)

    val result = unit.aggregateByKey(" ")((s:String,i:Int)=>{s+"$"+i},(s1:String,s2:String)=>{s1+"#"+s2})

    result.foreach(println)

  }
}

combineByKey:

代码演示:

package com.bjsxt.myscalacode

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object MyCombineByKey {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("test")
    val sc = new SparkContext(conf)
    val rdd1 = sc.parallelize(Array[(String, Int)](
      ("zhangsan", 18),
      ("zhangsan", 19),
      ("lisi", 20),
      ("wangwu", 21),
      ("zhangsan", 22),
      ("lisi", 23),
      ("wangwu", 24),
      ("wangwu", 25)
    ),2)

    /**
      * partition index = 0,value = (zhangsan,18)     =>(zhangsan,hello18)    =>(zhangsan,hello18#19)
      * partition index = 0,value = (zhangsan,19)
      * partition index = 0,value = (lisi,20)         =>(lisi,hello20)        =>(lisi,hello20)
      * partition index = 0,value = (wangwu,21)       =>(wangwu,hell21)       =>(wangwu,hell21)
      *                                                                                                 =>(zhangsna,hello18#[email protected])
      *                                                                                                 =>(lisi,[email protected])
      *                                                                                                 =>(wangwu,[email protected]#25)
      * partition index = 1,value = (zhangsan,22)     =>(zhangsna,hello22)    =>(zhangsna,hello22)
      * partition index = 1,value = (lisi,23)         =>(lisi,hello23)        =>(lisi,hello23)
      * partition index = 1,value = (wangwu,24)       =>(wangwu,hello24)      =>(wangwu,hello24#25)
      * partition index = 1,value = (wangwu,25)
      */
    val unit: RDD[(String, String)] = rdd1.combineByKey((i:Int)=>{"hello"+i}, (s:String, i:Int)=>{s+"#"+i}, (s1:String, s2:String)=>{s1+"@"+s2})

    unit.foreach(println)

//    rdd1.mapPartitionsWithIndex((index,iter)=>{
//      val transIter = iter.map(one => {
//        s"partition index = ${index},value = $one"
//      })
//      transIter
//    }).foreach(println)

  }
}

map端有预聚合的好处: (相对于直接聚合的好处就是: 直接聚合的时候先将数据进行拉取,然后在reduce端进行聚合,但是预聚合会先在每个map端进行一次聚合然后在对聚合后的数据进行拉取合并)

减少map端shuffle的数据量

减少reduce端拉取的数据量

减少reduce端聚合的次数

4、尽量使用高性能的算子:

使用foreachPartition代替foreach

使用mappartitions代替map

在大量的数据过滤之后使用coalesce减少分区

使用reduceByKey代替GroupByKey

使用repartitionAndSortWithinPartitions代替repartition和sort类操作

5、使用广播变量

使用广播变量可以减少executor端内存的使用

6、使用Kryo优化序列化的性能  spark 中使用到序列化的地方

a、RDD<自定义类型>

b、task 序列化

c、RDD持久化可以进行序列化  MEMOYR_AND_DISK_SER

spark使用Kryo序列化机制。Kryo序列化机制,比默认的java序列化机制速度要快很多,序列化之后的数据所占有的内存更小,大概是java序列化后所使用内存的1/10,所以使用Kryo序列化之后,可以让网络传输的数据更少,在集群中消耗的内存资源更少

spark使用kryo序列化机制需要进行注册:

SparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer").registerKryoClasses(new class[]()SpeedSortKey.class)

7、优化数据结构:

在spark 中尽量使用原生的数据类型代替字符串

在spark中尽量使用字符串代替对象

在spark 中尽量使用数组代替集合

8、代码优化:

减少内存使用

减少节点之间的数据传输

减少磁盘的IO

9、数据本地化调节 --数据本地化级别

a、PROCESS_LOCAL : task处理的数据在当前executor内存中

B、NODE_LOCAL:task处理的数据在当前节点的磁盘上,或者在当前节点其他executor的内存中

C、NO_PREF: task处理数据在外部的数据库中

D、RACK_LOCAL : task处理数据在同机架的其他worker节点的executor内存或者磁盘上

E、ANY :task处理数据在其他的机架上

相关参数调优:

spark.locality.process 3s --指从process级别降级到node级别所需等待的时间

spark.locality.node 3s --指从node级别降级到pref级别所需等待的时间

spark.locality.rack   --指rack降级所需等待的时间

driver发送task首先按照数据本地化最高级别进行发送,当task等待3s重试5次之后,如果task依然没有被执行,driver会将task降级发送,同理,依次进行降级处理

10、内存调优

给task足够的运行内存,避免频繁的发生GC,最终导致发min GC 或者 FULL GC ,使JVM停止工作

较少shuffle的聚合内存和较少RDD和广播变量的存储内存

参数:

静态内存:

减少spark.shuffle.memoryFraction 0.2

减少spark.storage.memoryFraction 0.6

统一内存:

spark.memory.fraction 0.6

11、shuffle调节

spark.reducer.maxSizeInFlight : 默认48 M 只 每次拉取数据量的大小

spark.shuffle.io.maxRetries: 默认拉取数据失败重试的次数

spark.shuffle.io.retryWait: 重试的等待间隔

spark.shuffle.sort.bypassMergeThreshold: 200

12、堆外内存调节

节点之间连接的等待时长: --conf spark.core.connection.ack.wait.timeout = 300

正常reduce task 从 maptask 拉取数据的过程是:

首先将数据拉取到jvm存储空间,然后jvm将数据存储到网卡buffer,然后通过网络对数据进行传输

有了堆外内存之后,就跳过;了JVM转存数据的过程,直接将数据从磁盘传输到网卡buffer,然后将数据向外进行传输

Spark每个executor中的堆外内存大小是executor内存大小的1/10,多数情况下需要将这个内存的大小调节到2G以上

调节堆外内存的参数:

yarn下:

--conf spark.yarn.executor.memoryOverhead = 2048 M

standalone下:

--conf spark.executor.memoryOverhead = 2048 M

13、数据倾斜处理

数据倾斜:

MR: 某个task处理的数据大于其他task处理的数据

hive:某张表中的某个字段下相同的key 非常多,其他key 对应的数据量非常少

spark: RDD 中某个分区的数据量大于其他分区的数据量

数据倾斜解决:

hive ETL处理:

场景: Spark需要频繁的操作一张hive有数据倾斜的表,每次操作都会按照倾斜的字段进行关联

解决: 可以菇凉业务是否可以将倾斜提前到hive中发生,这样在spark中就不会存在数据倾斜“治标不治本”

过滤少数倾斜的key:

场景: spark 中估计少数倾斜的key是否对业务有影响,如果对业务影响不大,可以将这些key直接过滤掉,再去进行业务分析

解决: 可以使用filter算子直接将这些倾斜的key过滤掉

增加并行度:

场景: 数据量大,分区少,不同的key多,可以直接升高并行度

解决: 可以直接使用算子升高并行度

双重聚合:

场景:分区少,key相同的多,数据量大

解决: 可以对相同的key加上随机的前缀,进行聚合,然后对聚合的结果去掉前缀,再去聚合得到最后的结果

将reduce join 转换为 map join:

场景:两个RDD。一个RDD大,一个RDD小,有数据倾斜,对两个RDD需要进行join操作

解决: 将小的RDD回收到Driver端,然后将数据广播出去,对大的RDD进行Map类的算子操作,这样流彻底的避免了shuffle的产生,就没有数据的倾斜

采样倾斜的key并分拆join 操作 :

场景: 两个RDD。一个RDD比较大,数据倾斜,另一个RDD也比较大,对两个RDD采用join操作,无法采用上述操作进行优化

解决: 用采样分析拆分倾斜的key,随机加前缀,前后膨胀,然后在join解决数据倾斜的问题

使用随机前缀和扩容RDD进行join

场景: 两个RDD, 一个RDD大,有大量的KEY有数据倾斜,另一个RDD也比较大,要对两个RDD进行join操作

解决: 使用随机前缀和扩容RDD进行操作,前提条件是需要较大的内存空间

原文地址:https://www.cnblogs.com/wcgstudy/p/11403487.html

时间: 2024-08-30 09:52:32

Spark 知识点总结--调优(一)的相关文章

Spark的性能调优

下面这些关于Spark的性能调优项,有的是来自官方的,有的是来自别的的工程师,有的则是我自己总结的. Data Serialization,默认使用的是Java Serialization,这个程序员最熟悉,但是性能.空间表现都比较差.还有一个选项是Kryo Serialization,更快,压缩率也更高,但是并非支持任意类的序列化. Memory Tuning,Java对象会占用原始数据2~5倍甚至更多的空间.最好的检测对象内存消耗的办法就是创建RDD,然后放到cache里面去,然后在UI 上

Spark Streaming性能调优详解(转)

原文链接:Spark Streaming性能调优详解 Spark Streaming提供了高效便捷的流式处理模式,但是在有些场景下,使用默认的配置达不到最优,甚至无法实时处理来自外部的数据,这时候我们就需要对默认的配置进行相关的修改.由于现实中场景和数据量不一样,所以我们无法设置一些通用的配置(要不然Spark Streaming开发者就不会弄那么多参数,直接写死不得了),我们需要根据数据量,场景的不同设置不一样的配置,这里只是给出建议,这些调优不一定试用于你的程序,一个好的配置是需要慢慢地尝试

spark 资源参数调优

资源参数调优 了解完了Spark作业运行的基本原理之后,对资源相关的参数就容易理解了.所谓的Spark资源参数调优,其实主要就是对Spark运行过程中各个使用资源的地方,通过调节各种参数,来优化资源使用的效率,从而提升Spark作业的执行性能.以下参数就是Spark中主要的资源参数,每个参数都对应着作业运行原理中的某个部分,我们同时也给出了一个调优的参考值. num-executors 参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行.Driver在向YARN集群管

揭秘Spark应用性能调优

引言:在多台机器上分布数据以及处理数据是Spark的核心能力,即我们所说的大规模的数据集处理.为了充分利用Spark特性,应该考虑一些调优技术.本文每一小节都是关于调优技术的,并给出了如何实现调优的必要步骤.本文选自<Spark GraphX实战>. 1 用缓存和持久化来加速 Spark 我们知道Spark 可以通过 RDD 实现计算链的原理 :转换函数包含在 RDD 链中,但仅在调用 action 函数后才会触发实际的求值过程,执行分布式运算,返回运算结果.要是在 同一 RDD 上重复调用

Apache Spark Jobs 性能调优

当你开始编写 Apache Spark 代码或者浏览公开的 API 的时候,你会遇到各种各样术语,比如transformation,action,RDD 等等. 了解到这些是编写 Spark 代码的基础. 同样,当你任务开始失败或者你需要透过web界面去了解自己的应用为何如此费时的时候,你需要去了解一些新的名词: job, stage, task.对于这些新术语的理解有助于编写良好 Spark 代码.这里的良好主要指更快的 Spark 程序.对于 Spark 底层的执行模型的了解对于写出效率更高

Spark 常规性能调优

1. 常规性能调优 一:最优资源配置 Spark性能调优的第一步,就是为任务分配更多的资源,在一定范围内,增加资源的分配与性能的提升是成正比的,实现了最优的资源配置后,在此基础上再考虑进行后面论述的性能调优策略.  --driver-memory 配置Driver内存(影响不大) 内存大小影响不大 资源的分配在使用脚本提交Spark任务时进行指定,标准的Spark任务提交脚本如代码清单2-1所示: /usr/opt/modules/spark/bin/spark-submit --class c

spark 高层通用调优

一,并行度 如果并行度设置的不足,那么就会导致集群浪费.Spark自动会根据文件的大小,是否可分割等因素来设置map的数目(后面会详细讲解输入格式,同时详细讲解各种输入的map数的决定).对于分布式reduce操作,例如groupbykey和reducebykey,默认它使用的是分区数最大的父RDD的分区数决定reduce的数目.你也可以通过设置spark.default.parallelism来改变默认值,建议值是每个CPU执行2-3个tasks. 二,Reduce任务的内存使用 有时候内存溢

【转载】Apache Spark Jobs 性能调优(二)

调试资源分配   Spark 的用户邮件邮件列表中经常会出现 "我有一个500个节点的集群,为什么但是我的应用一次只有两个 task 在执行",鉴于 Spark 控制资源使用的参数的数量,这些问题不应该出现.但是在本章中,你将学会压榨出你集群的每一分资源.推荐的配置将根据不同的集群管理系统(YARN.Mesos.Spark Standalone)而有所不同,我们将主要集中在YARN 上,因为这个Cloudera 推荐的方式. Spark(以及YARN) 需要关心的两项主要的资源是 CP

【转载】Apache Spark Jobs 性能调优(一)

当你开始编写 Apache Spark 代码或者浏览公开的 API 的时候,你会遇到各种各样术语,比如 transformation,action,RDD 等等. 了解到这些是编写 Spark 代码的基础. 同样,当你任务开始失败或者你需要透过web界面去了解自己的应用为何如此费时的时候,你需要去了解一些新的名词: job, stage, task.对于这些新术语的理解有助于编写良好 Spark 代码.这里的良好主要指更快的 Spark 程序.对于 Spark 底层的执行模型的了解对于写出效率更