spark调优之开发调优

(1)避免重复的RDD

案例:

val rdd1 = sc.textFile("hdfs://zzy/hello.txt")
rdd1.map(...)
val rdd2 = sc.textFile("hdfs://zzy/hello.txt")
rdd2.reduce(...)

这里条用了两次textFile,并且读取的是同一个文件,造成了多次的磁盘读取,如果是hi同一个文件,读取一次即可。

(2)尽可能多的复用一个RDD

错误演示:

       //由于业务需要,对rdd1执行了一个map操作,创建了一个rdd2,而rdd2中的数据仅仅是rdd1中的value值而已,也就是说,rdd2是rdd1的子集。
        val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
        val listRDD: RDD[(String, Int)] = sc.parallelize(list)
        val listRDD2=listRDD.map(kv=>kv._2)

正确做法:

        //在进行第二个map操作时,只使用每个数据的kv._2,也就是rdd1中的value值,即可
        val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
        val listRDD: RDD[(String, Int)] = sc.parallelize(list)
        listRDD.reduceByKey(_+_).map(kv=>kv._2)

(3)对多次使用的RDD进行持久化

案例:

        val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
        val listRDD: RDD[(String, Int)] = sc.parallelize(list)
        listRDD.cache()
        listRDD.map(kv=>kv._1)
        listRDD.map(kv=>kv._2)
        listRDD.reduceByKey(_+_)

注意:程序运行过程中的数据放置在内存中,如果程序执行完成,一般都会释放内存的资源。如果程序执行过程中,生成了一些中间结果是另一个程序需要使用的数据时,那么就可以把该数据持久化到内存或者磁盘中,避免不必要的重复计算,一般的一个RDD如果被重复使用2~3次以上,就需要持久化。

(4)尽量避免使用shuffle类的算子

因为在spark作业的运行过程中,最消耗性能的地方就是shuffle过程,原因:

  • 在shuffle过程中,各个节点上相同的key都会通过网络传输聚合到同一个节点上,会引发大量的IO操作以及数据的网络传输
  • shuffle过程,必须上一个阶段完成之后,才能进行下一个阶段的计算,导致了在并行计算中,由于某个计算的时间特别长,导致了整体计算时长取决于那个计算最长的计算的时间。如果有一个任务运行了很长时间,而其他的任务在很短的时间就计算完成,其他的任务程序需要等待这个未完成的程序,导致资源被浪费
    案例:
    // 传统的join操作会导致shuffle操作:
    // 因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作
    val rdd3 = rdd1.join(rdd2)

    使用Broadcast+map的join操作:

    // 使用Broadcast将一个数据量较小的RDD作为广播变量。
    val rdd2Data = rdd2.collect()
    val rdd2DataBroadcast = sc.broadcast(rdd2Data)

    注意:广播变量的对象一定不能太大,如果太大的话可能会导致OOM,当然广播变量不能是一个RDD,可以通过一些action算子,将RDD转化成为集合然后进行广播。

    (5)使用map-side预聚合的shuffle操作(combine)


    具有combine的算子:reduceByKey、combineByKey、aggregateByKey。
    没有combine的算子:groupByKey、coGroup。

    (6)使用高性能的算子

  • 使用reduceByKey/ combineByKey代替groupByKey----优先使用有combine的算子
  • 使用mappartitions代替map

案例:

        val hdfsRDD: RDD[String] = sc.textFile("hdfs://zy/data/word.txt")
             //Map是每一条调用一次
                hdfsRDD.map(kv=>{
            //获取数据库的连接
            connect=Connect.getconnect()
            connect.insert(kv)
        })
        //Mappartitions每一个分区调用一次。
        hdfsRDD.mapPartitions(partition=>{
            if(!partition.isEmpty){
                //获取数据库的连接
                connect=Connect.getconnect()
                partition.foreach(mes=>{
                    connect.insert(mes)
                })
            }
        })

以上的案例虽然都是插入数据,但是使用map是每一条记录都需要创建一个连接,而使用mappartition只需要每一个分区创建一个即可。

  • 使用foreachPartitione代替foreach,原理和上一个一样
  • 使用filter算子之后,使用colaesce重新分区
    因为在过滤之后,数据量会减少,此时在进行重新分区,会重新划分数据,是数据分配均匀。
  • 使用repartitionAndSortWithinPartitions替代repartition与sort类操作
    repartitionAndSortWithinPartitions:分区和排序同时进行。 (效率高)
    Repartition+sort:先分区然后在排序(这种方法,会重复创建RDD)

(7)使用广播变量

优势见:http://blog.51cto.com/14048416/2338188

(8)使用spark自带的Kryo序列化

默认的情况下,spark支持java原生的序列化机制,使用KryoSerolizar可以优化序列化和反序列化的性能
案例:

// 设置序列化器为KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

(9)数据结构的调优

在java中有三种对象比较消耗内存:对象、字符串、集合类型。
对象:每一个java对象都有对象头,引用等额外的信息,因此比较占用内存空间
字符串:每一个字符串内部都有一个字符数组以及长度等额外信息
集合:HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素。
所以我们遵循:尽量少用这三种类型,当然使用基本数据类型代替字符串,使用字符串代替对象,使用对象代替集合。


本博文参考至美团的spark调优https://tech.meituan.com/

原文地址:http://blog.51cto.com/14048416/2338607

时间: 2024-10-12 04:50:43

spark调优之开发调优的相关文章

spark性能调优:开发调优

在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的功能涵盖了大数据领域的离线批处理.SQL类处理.流式/实时计算.机器学习.图计算等各种不同类型的计算操作,应用范围与前景非常广泛. 然而,通过Spark开发出高性能的大数据计算作业,并不是那么简单的.如果没有对Spark作业进行合理的调优,Spark作业的执行速度可能会很慢,这样就完全体现不出Spark作为一种快速大数据计算引擎的优势来.因此,想要用好Spark,就必须对其进行合理的性能优化. Spark的

spark记录(6)SparkCore的调优之开发调优

摘抄自:https://www.cnblogs.com/qingyunzong/p/8946637.html 前言 在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的功能涵盖了大数据领域的离线批处理.SQL类处理.流式/实时计算.机器学习.图计算等各种不同类型的计算操作,应用范围与前景非常广泛.在美团•大众点评,已经有很多同学在各种项目中尝试使用Spark.大多数同学(包括笔者在内),最初开始尝试使用Spark的原因很简单,主要就是为了让大数据计算作业的执

Spark学习之路 (八)SparkCore的调优之开发调优

讨论QQ:1586558083 目录 调优概述 原则一:避免创建重复的RDD 一个简单的例子 原则二:尽可能复用同一个RDD 一个简单的例子 原则三:对多次使用的RDD进行持久化 对多次使用的RDD进行持久化的代码示例 Spark的持久化级别 如何选择一种最合适的持久化策略 原则四:尽量避免使用shuffle类算子 Broadcast与map进行join代码示例 原则五:使用map-side预聚合的shuffle操作 原则六:使用高性能的算子 使用reduceByKey/aggregateByK

Spark学习之路 (八)SparkCore的调优之开发调优[转]

前言 在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的功能涵盖了大数据领域的离线批处理.SQL类处理.流式/实时计算.机器学习.图计算等各种不同类型的计算操作,应用范围与前景非常广泛.在美团?大众点评,已经有很多同学在各种项目中尝试使用Spark.大多数同学(包括笔者在内),最初开始尝试使用Spark的原因很简单,主要就是为了让大数据计算作业的执行速度更快.性能更高. 然而,通过Spark开发出高性能的大数据计算作业,并不是那么简单的.如果没有对Spar

Spark 开发调优(一)

Spark性能优化 - 开发调优 优化一 避免创建重复的RDD 通常来说,我们在开发一个Spark作业时,首先是基于某个数据源(比如Hive表或HDFS文件)创建一个初始的RDD:接着对这个RDD执行某个算子操作,然后得到下一个RDD:以此类推,循环往复,直到计算出最终我们需要的结果.在这个过程中,多个RDD会通过不同的算子操作(比如map.reduce等)串起来,这个"RDD串",就是RDD lineage,也就是"RDD的血缘关系链". 我们在开发过程中要注意:

【Spark深入学习 -14】Spark应用经验与程序调优

----本节内容------- 1.遗留问题解答 2.Spark调优初体验 2.1 利用WebUI分析程序瓶颈 2.2 设置合适的资源 2.3 调整任务的并发度 2.4 修改存储格式 3.Spark调优经验 3.1 Spark原理及调优工具 3.2 运行环境优化 3.2.1 防止不必要的分发 3.2.2 提高数据本地性 3.2.3 存储格式选择 3.2.4 选择高配机器 3.3 优化操作符 3.3.1 过滤操作导致多小任务 3.3.2 降低单条记录开销 3.3.3 处理数据倾斜或者任务倾斜 3.

spark性能调优之资源调优

转https://tech.meituan.com/spark-tuning-basic.html spark作业原理 使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程.根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动.Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU core.而Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Stand

Spark性能调优之JVM调优

Spark性能调优之JVM调优 通过一张图让你明白以下四个问题 1.JVM GC机制,堆内存的组成                2.Spark的调优为什么会和JVM的调优会有关联?--因为Scala也是基于JVM运行的语言                3.Spark中OOM产生的原因                4.如何在JVM这个层面上来对Spark进行调优 补充:                Spark程序运行时--JVM堆内存分配比例 RDD缓存的数据(0.6)    默认 对象_

[大数据性能调优] 第一章:性能调优的本质、Spark资源使用原理和调优要点分析

本課主題 大数据性能调优的本质 Spark 性能调优要点分析 Spark 资源使用原理流程 Spark 资源调优最佳实战 Spark 更高性能的算子 引言 我们谈大数据性能调优,到底在谈什么,它的本质是什么,以及 Spark 在性能调优部份的要点,这两点让直式进入性能调优都是一个至关重要的问题,它的本质限制了我们调优到底要达到一个什么样的目标或者说我们是从什么本源上进行调优.希望这篇文章能为读者带出以下的启发: 了解大数据性能调优的本质 了解 Spark 性能调优要点分析 了解 Spark 在资