RDD关键性能考量之 并行度

《Spark快速大数据分析》

8.4 关键性能考量

并行度

RDD的逻辑表示其实是一个对象的集合。在物理执行期间,RDD会被分为一系列的分区,

每个分区都是整个数据的子集。当Spark调度并运行任务时,Spark会为每个分区中的数据

创建出一个任务,该任务在默认情况下会需要集群中的一个计算节点来执行。

Spark也会针对RDD直接自动推断出合适的并行度,这对于大多数用例来说已经足够了。

输入RDD一般会根据其底层的存储系统选择并行度。例如,从HDFS上读数据的输入RDD

会为数据在HDFS上的每个文件区块创建一个分区。从数据混洗后的RDD派生下来的RDD

则会采用与其父RDD相同的并行度。

并行度会从两个方面影响程序的性能。

首先,当并行度过低时,Spark集群会出现资源闲置的情况。

比如,假设你的应用有1000个可使用的计算节点,但所运行的步骤只有30个任务,你就应该提高并行度

来充分利用更多的计算节点。

当并行度过高时,每个分区产生的间接开销累计起来会更大。评判并行度是否过高的标准包括

任务是否是几乎在瞬间(毫秒级)完成的,或者是否观察到任务没有读写任务数据。

Spark提供了两种方法来对操作的并行度进行调优。

第一种方法是在数据混洗操作时,使用参数的方式为混洗后的RDD指定并行度。

第二种方法是对于任何已有的RDD,可以进行重新分区来获取更多或者更少的分区数。

重新分区操作通过repartition()实现,该操作会把RDD随机打乱并分成设定的分区数目。

如果你确定要减少分区数,可以使用coalesce()操作。由于没有打乱数据,该操作比repartition()更为高效。

如果你认为当前的并行度过高或者过低,可以利用这些方法对分区重新调整。

举个栗子,假设我们从S3上读取了大量数据,然后马上进行fileter()操作筛选调数据集合中的

大部分数据。默认情况下,filter()返回的RDD的分区数和其父RDD一样,这样会产生很多的空分区

或者只有少量数据的分区。这时,可以通过 合并得到分区数更少的RDD来提高应用的性能。

def testCoalesce = {

    val conf = new SparkConf().setMaster("local").setAppName("testCoalesce")

    val sc = new SparkContext(conf)

    val input = sc.parallelize(1 to 9999, 1000)

    logger.warn(s"RDD[input] partitionCount[${input.partitions.length}]")

    val test = input.filter { x => x % 2015 == 0  }

    logger.warn(s"RDD[test]  partitionCount[${test.partitions.length}]")

    val test2 = test.coalesce(2, true).cache()

    logger.warn(s"RDD[test2] partitionCount[${test2.partitions.length}]")

    val result = test2.collect()

    logger.warn(s"result [${result.mkString(",")}]")

    Thread.sleep(Int.MaxValue)

  }

  

执行结果


00:47:21 831 [main] WARN test.scala.spark.TestSpark2$.testCoalesce(TestSpark2.scala:19): RDD[input] partitionCount[1000]

00:47:22 009 [main] WARN test.scala.spark.TestSpark2$.testCoalesce(TestSpark2.scala:22): RDD[test]  partitionCount[1000]

00:47:22 122 [main] WARN test.scala.spark.TestSpark2$.testCoalesce(TestSpark2.scala:25): RDD[test2] partitionCount[2]

[Stage 0:===>                                                   (58 + 1) / 1000]

[Stage 0:=====>                                                 (95 + 1) / 1000]

[Stage 0:=======>                                              (131 + 1) / 1000]

[Stage 0:============>                                        (238 + 24) / 1000]

[Stage 0:============>                                        (243 + 19) / 1000]

[Stage 0:================>                                     (314 + 1) / 1000]

[Stage 0:=================>                                    (330 + 1) / 1000]

[Stage 0:=====================>                                (390 + 1) / 1000]

[Stage 0:=======================>                              (443 + 1) / 1000]

[Stage 0:===========================>                          (500 + 2) / 1000]

[Stage 0:==============================>                       (557 + 1) / 1000]

[Stage 0:=================================>                    (618 + 1) / 1000]

[Stage 0:===================================>                  (662 + 1) / 1000]

[Stage 0:=======================================>              (724 + 1) / 1000]

[Stage 0:==========================================>           (791 + 1) / 1000]

[Stage 0:==============================================>       (855 + 1) / 1000]

[Stage 0:================================================>     (895 + 1) / 1000]

[Stage 0:===================================================>  (953 + 1) / 1000]

00:47:30 466 [main] WARN test.scala.spark.TestSpark2$.testCoalesce(TestSpark2.scala:29): result [2015,4030,6045,8060]

打开http://localhost:4040/jobs/,可以看到任务执行统计。

时间: 2024-12-12 04:20:36

RDD关键性能考量之 并行度的相关文章

RDD关键性能考量之 内存管理

<Spark快速大数据分析> 8.4.2 关键性能考量   内存管理 内存对Spark来说哟几个不同的用途,理解并调优Spark的内存使用方法 可以帮助优化Spark应用.在各个执行器进程中,内存有一下所列集中用途. RDD存储 当调用RDD的persist()或cache()方法时,这个RDD的分区会被存储到缓存区中. Spark会根据spark.stroage.memoryFraction限制用来缓存的内存占整个JVM堆空间的比例大小. 如果超出限制,旧的分区数据会被移出内存. 数据混洗与

RDD关键性能考量之 序列化格式

<Spark快速大数据分析> 8.4.2 关键性能考量  序列化格式 当Spark需要通过网络传输数据,或是将数据写到磁盘上时,Spark需要把数据序列化为二进制格式. 序列化会在数据进行混洗操作时发生,此时有可能需要通过网络传输大量数据. 默认情况下,Spark会使用Java内建的序列化库.Spark也支持使用第三方序列化库Kryo, 可以提供比Java的序列化工具更短的序列化时间和更高压缩比的二进制表示,但不能直接序列化全部 类型的对象.几乎所有的应用都在迁移到Kryo后获得了更好的性能.

MySQL关键性能监控(QPS/TPS)

原文链接:http://www.cnblogs.com/chenty/p/5191777.html 工作中尝尝会遇到各种数据库性能调优,除了查看某条SQL执行时间长短外,还需要对系统的整体处理能力有更全局的掌握. QPS:Query per second,每秒查询量 TPS:Transaction per second,每秒事物量 以上两个指标在实际应用中会经常被问到,作为一个项目领导者,必须时刻掌握这些重要指标,并根据相应趋势做出调整. 以下列出上述两个指标的具体算法: QPS = Queri

Sharepoint 性能考量

1.crawl 时间的规划 统计站点使用率,根据使用率拆分成不同job,降低每个job的crawl时间,在非工作时间crawl pdf filter 使用adobe的产品进行index速度很慢,可以使用Foxit PDF iFilter 对特别大的站点,使用rule排除长时间没有update的subsite,提高crawl效率和减少full crawl条目数量 2.DB SharePoint_Config  DB的备份设置为simple,设置full基本上用不上 Audit Log定期备份和清理

Spark学习笔记6:Spark调优与调试

1.使用Sparkconf配置Spark 对Spark进行性能调优,通常就是修改Spark应用的运行时配置选项. Spark中最主要的配置机制通过SparkConf类对Spark进行配置,当创建出一个SparkContext时,就需要创建出一个SparkConf实例. Sparkconf实例包含用户要重载的配置选项的键值对.调用set()方法来添加配置项的设置,然后把这个对象传给SparkContext的构造方法. 调用setAppName()和setMaster()来分别设置spark.app

网络流量分析——NPMD关注IT运维、识别宕机和运行不佳进行性能优化。智能化分析是关键-主动发现业务运行异常。

科来 做流量分析,同时也做了一些安全分析(偏APT)--参考其官网:http://www.colasoft.com.cn/cases-and-application/network-security-analysis.php 作为安全工程师的你,想发现有谁在攻击我,还原攻击过程并且取证么? 作为立志成为网络技术大拿的你,想在学习理论知识的同时,了解实战中会遇到的哪些问题,这些问题用什么样的思路去解决么?如果以上答案为Yes,那么<CSNA网络分析经典实战案例>就是你的菜,以下内容全是网络安全真

[Berkeley]弹性分布式数据集RDD的介绍(RDD: A Fault-Tolerant Abstraction for In-Memory Cluster Computing 论文翻译)

摘要: 本文提出了分布式内存抽象的概念--弹性分布式数据集(RDD,Resilient Distributed Datasets).它同意开发者在大型集群上运行基于内存的计算.RDD适用于两种应用,而现有的数据流系统对这两种应用的处理并不高效:一是迭代式算法,这在图应用和机器学习领域非经常见.二是交互式数据挖掘工具.这两种情况下.将数据保存在内存中可以极大地提高性能.为了有效地实现容错,RDD提供了一种高度受限的共享内存,即RDD在共享状态的时候是基于粗粒度的转换而不是细粒度的更新(换句话说就是

Spark性能优化指南——基础篇

前言 在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的功能涵盖了大数据领域的离线批处理.SQL类处理.流式/实时计算.机器学习.图计算等各种不同类型的计算操作,应用范围与前景非常广泛.在美团•大众点评,已经有很多同学在各种项目中尝试使用Spark.大多数同学(包括笔者在内),最初开始尝试使用Spark的原因很简单,主要就是为了让大数据计算作业的执行速度更快.性能更高. 然而,通过Spark开发出高性能的大数据计算作业,并不是那么简单的.如果没有对Spar

架构设计:系统存储(9)——MySQL数据库性能优化(5)

=================================== (接上文<架构设计:系统存储(9)--MySQL数据库性能优化(5)>) 4-3-3-3.避免死锁的建议 上一篇文章我们主要介绍了MySQL数据库中锁的基本原理.工作过程和产生死锁的原因.通过上一篇文章的介绍,可以确定我们需要业务系统中尽可能避免死锁的出现.这里为各位读者介绍一些在InnoDB引擎使用过程中减少死锁的建议. 正确使用读操作语句 经过之前文章介绍,我们知道一般的快照读是不会给数据表任何锁的.那么这些快照读操作