Spark 代码走读之 Cache

Spark是基于内存的计算模型,但是当compute chain非常长或者某个计算代价非常大时,能将某些计算的结果进行缓存就显得很方便了。Spark提供了两种缓存的方法 Cache 和 checkPoint。本章只关注 Cache (基于spark-core_2.10),在后续的章节中会提到 checkPoint.

主要从以下三方面来看

  1. persist时发生什么
  2. 执行action时如何去缓存及读取缓存
  3. 如何释放缓存

定义缓存

spark的计算是lazy的,只有在执行action时才真正去计算每个RDD的数据。要使RDD缓存,必须在执行某个action之前定义RDD.persist(),此时也就定义了缓存,但是没有真正去做缓存。RDD.persist会调用到SparkContext.persistRDD(rdd),同时将RDD注册到ContextCleaner中(后面会讲到这个ContextCleaner)。

def persist(newLevel: StorageLevel): this.type = {
    // TODO: Handle changes of StorageLevel
    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
      throw new UnsupportedOperationException(
        "Cannot change storage level of an RDD after it was already assigned a level")
    }
    sc.persistRDD(this)
    // Register the RDD with the ContextCleaner for automatic GC-based cleanup
    sc.cleaner.foreach(_.registerRDDForCleanup(this))
    storageLevel = newLevel
    this
  }

sc.persistRDD很简单,将(rdd.id, rdd)加到persistentRdds中。persistentRDDs一个HashMap,key就是rdd.id,value是一个包含时间戳的对rdd的弱引用。persistentRDDs用来跟踪已经被标记为persist的RDD的引用的。

所以在定义缓存阶段,做了两件事:一是设置了rdd的StorageLevel,而是将rdd加到了persistentRdds中并在ContextCleaner中注册。

缓存

当执行到某个action时,真正计算才开始,这时会调用DAGScheduler.submitJob去提交job,通过rdd.iterator()来计算partition。

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

iterator的逻辑很清楚,如果srorageLevel被标记过了就去CacheManager取,否则自己compute或者从checkPoint读取。

在cacheManager.getOrCompute中,通过RDDBlockId尝试去BlockManager中得到缓存的数据。如果缓存得不到(第一次计算),并调用computeOrReadCheckPoint去计算,并将结果cache起来,cache是通过putInBlockManger实现。根据StorageLevel,如果是缓存在内存中,会将结果存在MemoryStore的一个HashMap中,如果是在disk,结果通过DiskStore.put方法存到磁盘的某个文件夹中。这个文件及最终由Utils中的方法确定

private def getOrCreateLocalRootDirsImpl(conf: SparkConf): Array[String] = {
    if (isRunningInYarnContainer(conf)) {
      // If we are in yarn mode, systems can have different disk layouts so we must set it
      // to what Yarn on this system said was available. Note this assumes that Yarn has
      // created the directories already, and that they are secured so that only the
      // user has access to them.
      getYarnLocalDirs(conf).split(",")
    } else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
      conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator)
    } else {
      // In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user
      // configuration to point to a secure directory. So create a subdirectory with restricted
      // permissions under each listed directory.
      Option(conf.getenv("SPARK_LOCAL_DIRS"))
        .getOrElse(conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
        .split(",")
        .flatMap { root =>
          try {
            val rootDir = new File(root)
            if (rootDir.exists || rootDir.mkdirs()) {
              val dir = createTempDir(root)
              chmod700(dir)
              Some(dir.getAbsolutePath)
            } else {
              logError(s"Failed to create dir in $root. Ignoring this directory.")
              None
            }
          } catch {
            case e: IOException =>
            logError(s"Failed to create local root dir in $root. Ignoring this directory.")
            None
          }
        }
        .toArray
    }
  }

如果已经缓存了,那么cacheManager.getOrCompute在调用blockManger.get(RDDBlockId)时会返回结果。get会先调用getLocal在本地获取,如果本地没有则调用getRemote去远程寻找,getRemote会call BlockMangerMaster.getLocation得到缓存的地址。

释放

Spark通过调用rdd.unpersit来释放缓存,这是通过SparkContext.unpersistRDD来实现的。在unpersistRDD中,rdd会从persistentRdds中移除,并通知BlockManagerMaster去删除数据缓存。BlockManagerMaster会通过消息机制告诉exectutor去删除内存或者disk上的缓存数据。

那么问题来了,如果用户不通过手动来unpersit,那缓存岂不是越积越多,最后爆掉吗?

是的,你的想法完全合理。因此Spark会自动删除不在scope内的缓存。“不在scope”指的是在用户程序中已经没有了该RDD的引用,RDD的数据是不可读取的。这里就要用到之前提到的ContextCleaner。ContextCleaner存了CleanupTaskWeakReference弱引用及存放该引用的队列。当系统发生GC将没有强引用的rdd对象回收后,这个弱引用会加入到队列中。ContextCleaner起了单独的一个线程轮询该队列,将队列中的弱引用取出,根据引用中的rddId触发sc.unpersistRDD。通过这样Spark能及时的将已经垃圾回收的RDD对应的cache进行释放。这里要清楚rdd与数据集的关系,rdd只是一个定义了计算逻辑的对象,对象本身不会包含其所代表的数据,数据要通过rdd.compute计算得到。所以系统回收rdd,只是回收了rdd对象,并没有回收rdd代表的数据集。

此外,SparkContext中还有一个MetadataCleaner,该cleaner会移除persistentRdds中的过期的rdd。(笔者一直没清楚这个移除和cache释放有什么关系??)

Reference:

https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence

http://jerryshao.me/architecture/2013/10/08/spark-storage-module-analysis/

https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md

http://blog.csdn.net/yueqian_zhu/article/details/48177353

http://www.cnblogs.com/jiaan-geng/p/5189177.html

时间: 2024-10-06 03:22:10

Spark 代码走读之 Cache的相关文章

自己编写的spark代码执行流程

我们自己编写了spark代码后;放到集群中一执行,就会出现问题,没有序列化.指定的配置文件不存在.classnotfound等等.这其实很多时候就是因为我们对自己编写的spark代码执行流程的不熟悉导致的,源码阅读可以解决,但源码不是每个人都能看懂或能看进去的,下面我们就来讲一下,我们自己写的spark代码究竟是这么执行的.从执行的过程可分为三个部分来分析main方法,RDD处理方法,DStream处理方法,从执行的JVM虚拟机可以分为两个部分driver端,worker端 一.main方法 m

本地开发spark代码上传spark集群服务并运行(基于spark官网文档)

打开IDEA 在src下的main下的scala下右击创建一个scala类 名字为SimpleApp ,内容如下 import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = "/home/spark/opt/s

Spark代码调优(一)

环境极其恶劣情况下: import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.apache.spark.sql.hive.HiveContext val sqlContext = new HiveContext(sc) val sql = sqlContext.sql("selec

Java缓存Ehcache-Ehcache的Cache预热机制及代码实现(Cache Warming for multi-tier Caches)

Ehcache中Cache预热机制 Cache预热机制简介 Ehcache在程序启动的时候并不会立即去加载位于磁盘上的数据到内存,而是在数据被用到的时候去加载(lazy load).因此在cache启动的时候,其内部没有数据.如果我们想在用到这些数据之前,它们全部被装载进内存,应该怎么做? Ehcache提供了BootstrapCacheLoader机制来解决这个问题,在Cache被激活之前,它会得到运行.并且有两种模式:同步和异步.如果是同步模式,在CacheMana启动之前,加载便会完成:如

Qt Creator插件工作流程代码走读

Qt Creator有个很风骚的插件管理器PluginManager,还有个很骚包的插件说明PluginSpec.基本上,所有的Qt程序的入口都是传统的C程序一样,代码流程从main()函数开始.  在main()中,先初始化用于国际化的translator,然后获取程序配置settings,接着就在栈上创建了PluginManager对象,之后为PluginManager设置搜索用的文件扩展名pluginspec,设置配置,再设置插件搜索路径.  设置好插件搜索路径后,PluginManage

Exphp代码走读(二)

App.class.php 1 <?php 2 namespace System\Core; 3 use System\Driver; 4 5 class App{ 6 public $DB; 7 public $Cache; 8 public $Session; 9 public $Controller; 10 11 public function __construct(){ 12 DBUG ? set_error_handler(array($this,'errorHandler')) :

开源ext2read代码走读之--“\\\\.\\PhysicalDrive0”意义?

在ext2read中读取ext4文件系统的代码中,读取硬盘中的信息时,定义了以下的宏,那么这个宏是什么意思呢? #define DEVICE    "\\\\.\\PhysicalDrive0"是什么意思? 由于"\"是C/C+中转义符, "\\\\.\\"就相当于\\.\,那么以上的宏定义中的"\\\.\\PhysicalDrive0"就等价于"\\.\PhysicalDrive0" 在Windows中

WebRTCDemo.apk代码走读(三):音频接收流程

收到音频包 UdpSocketManagerPosixImpl::Run UdpSocketManagerPosixImpl::Process UdpSocketPosix::HasIncoming(recvfrom) UdpTransportImpl::IncomingRTPCallback UdpTransportImpl::IncomingRTPFunction VoiceChannelTransport::IncomingRTPPacket VoENetworkImpl::Receive

Exphp代码走读(三)

Controller 类 1 <?php 2 namespace System\Core 3 4 5 class Controller { 6 public $Cache; 7 public $Session; 8 public $View; 9 10 private $_requestName; 11 private $_requestMethod; 12 13 public function __construct(){ 14 $this->safescan(); 15 16 Global