《Spark快速大数据分析》
8.4 关键性能考量
并行度
RDD的逻辑表示其实是一个对象的集合。在物理执行期间,RDD会被分为一系列的分区,
每个分区都是整个数据的子集。当Spark调度并运行任务时,Spark会为每个分区中的数据
创建出一个任务,该任务在默认情况下会需要集群中的一个计算节点来执行。
Spark也会针对RDD直接自动推断出合适的并行度,这对于大多数用例来说已经足够了。
输入RDD一般会根据其底层的存储系统选择并行度。例如,从HDFS上读数据的输入RDD
会为数据在HDFS上的每个文件区块创建一个分区。从数据混洗后的RDD派生下来的RDD
则会采用与其父RDD相同的并行度。
并行度会从两个方面影响程序的性能。
首先,当并行度过低时,Spark集群会出现资源闲置的情况。
比如,假设你的应用有1000个可使用的计算节点,但所运行的步骤只有30个任务,你就应该提高并行度
来充分利用更多的计算节点。
当并行度过高时,每个分区产生的间接开销累计起来会更大。评判并行度是否过高的标准包括
任务是否是几乎在瞬间(毫秒级)完成的,或者是否观察到任务没有读写任务数据。
Spark提供了两种方法来对操作的并行度进行调优。
第一种方法是在数据混洗操作时,使用参数的方式为混洗后的RDD指定并行度。
第二种方法是对于任何已有的RDD,可以进行重新分区来获取更多或者更少的分区数。
重新分区操作通过repartition()实现,该操作会把RDD随机打乱并分成设定的分区数目。
如果你确定要减少分区数,可以使用coalesce()操作。由于没有打乱数据,该操作比repartition()更为高效。
如果你认为当前的并行度过高或者过低,可以利用这些方法对分区重新调整。
举个栗子,假设我们从S3上读取了大量数据,然后马上进行fileter()操作筛选调数据集合中的
大部分数据。默认情况下,filter()返回的RDD的分区数和其父RDD一样,这样会产生很多的空分区
或者只有少量数据的分区。这时,可以通过 合并得到分区数更少的RDD来提高应用的性能。
def testCoalesce = { val conf = new SparkConf().setMaster("local").setAppName("testCoalesce") val sc = new SparkContext(conf) val input = sc.parallelize(1 to 9999, 1000) logger.warn(s"RDD[input] partitionCount[${input.partitions.length}]") val test = input.filter { x => x % 2015 == 0 } logger.warn(s"RDD[test] partitionCount[${test.partitions.length}]") val test2 = test.coalesce(2, true).cache() logger.warn(s"RDD[test2] partitionCount[${test2.partitions.length}]") val result = test2.collect() logger.warn(s"result [${result.mkString(",")}]") Thread.sleep(Int.MaxValue) }
执行结果
00:47:21 831 [main] WARN test.scala.spark.TestSpark2$.testCoalesce(TestSpark2.scala:19): RDD[input] partitionCount[1000] 00:47:22 009 [main] WARN test.scala.spark.TestSpark2$.testCoalesce(TestSpark2.scala:22): RDD[test] partitionCount[1000] 00:47:22 122 [main] WARN test.scala.spark.TestSpark2$.testCoalesce(TestSpark2.scala:25): RDD[test2] partitionCount[2] [Stage 0:===> (58 + 1) / 1000] [Stage 0:=====> (95 + 1) / 1000] [Stage 0:=======> (131 + 1) / 1000] [Stage 0:============> (238 + 24) / 1000] [Stage 0:============> (243 + 19) / 1000] [Stage 0:================> (314 + 1) / 1000] [Stage 0:=================> (330 + 1) / 1000] [Stage 0:=====================> (390 + 1) / 1000] [Stage 0:=======================> (443 + 1) / 1000] [Stage 0:===========================> (500 + 2) / 1000] [Stage 0:==============================> (557 + 1) / 1000] [Stage 0:=================================> (618 + 1) / 1000] [Stage 0:===================================> (662 + 1) / 1000] [Stage 0:=======================================> (724 + 1) / 1000] [Stage 0:==========================================> (791 + 1) / 1000] [Stage 0:==============================================> (855 + 1) / 1000] [Stage 0:================================================> (895 + 1) / 1000] [Stage 0:===================================================> (953 + 1) / 1000] 00:47:30 466 [main] WARN test.scala.spark.TestSpark2$.testCoalesce(TestSpark2.scala:29): result [2015,4030,6045,8060] |
打开http://localhost:4040/jobs/,可以看到任务执行统计。