昨晚听了王家林老师的Spark IMF传奇行动第18课:RDD持久化、广播、累加器,作业是unpersist试验,阅读累加器源码看内部工作机制:
scala> val rdd = sc.parallelize(1 to 1000) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21 scala> rdd.persist res0: rdd.type = ParallelCollectionRDD[0] at parallelize at <console>:21 scala> rdd.count 16/01/24 11:42:56 INFO DAGScheduler: Job 0 finished: count at <console>:24, took 1.451543 s res1: Long = 1000 16/01/24 11:43:14 INFO DAGScheduler: Job 2 finished: count at <console>:24, took 0.094119 s res3: Long = 1000 scala> rdd.unpersist() 16/01/24 11:43:43 INFO ParallelCollectionRDD: Removing RDD 0 from persistence list 16/01/24 11:43:43 INFO BlockManager: Removing RDD 0 res5: rdd.type = ParallelCollectionRDD[0] at parallelize at <console>:21 scala> rdd.count 16/01/24 11:44:56 INFO DAGScheduler: Job 0 finished: count at <console>:24, took 1.475321 s res1: Long = 1000
persisit后,count执行快了许多,但unpersist后,执行又变慢了。
累加器Accumulator:全局唯一,对于Executor只能修改但不可读,只对Driver可读,只增不减
val sum = sc.accumulator(0) val d1 = sc.parallelize(1 to 5) val result1 = d1.foreach(item => sum+= item) println(sum)
结果是15.
后续课程可以参照新浪微博 王家林_DT大数据梦工厂:http://weibo.com/ilovepains
王家林 中国Spark第一人,微信公共号DT_Spark
转发请写明出处。
时间: 2024-10-13 22:26:36