CacheManager彻底解密:CacheManager运行原理流程图和源码详解(DT大数据梦工厂)

内容:

1、CacheManager重大价值;

2、CacheManager运行原理图;

3、CacheManager源码解析;

BlockManager针对Cache这样的行为做了CacheManager

Spark出色的原因:

1、Spark基于RDD构成了一体化、多元化的大数据处理中心(不需要再处理多种范式来部署多种框架,只要Spark!!!降低成本投入获得更高的产出);

2、迭代,因为在计算的时候迭代,在构建复杂算法的时候非常方便(图计算、机器学习、数据仓库),而CacheManager 在多重迭代的时候非常重要;

==========CacheManager分析============

1、CacheManager管理的是缓存,而缓存可以是基于内存的缓存,也可以是基于磁盘的缓存;

2、CacheManager需要通过BlockManager来操作数据;

3、每当Task运行的时候,会调用RDD的conpute方法,而compute方法会调用iterator方法,从下面代码中可以看到默认的RDD是基于内存的,计算一次后基本从CacheManager获得;

/**
 * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
 * This should ‘‘not‘‘ be called by users directly, but is available for implementors of custom
 * subclasses of RDD.
 */
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}

==========CacheManager源码详解============

1、Cache在工作的时候会最大化的保留数据,但是数据不一定绝对完整,因为当前的计算如果需要内存空间的话,那么Cache在内存中的数据必须让出空间,此时如果在RDD持久化的时候同时指定了可以把数据放在disk上,那么部分cache的数据就可以从内存转入磁盘,否则的话,数据就会丢失。所以Cache不一定可靠,所以必须得用getOrCompute来确定数据能取到!!!

/** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](
    rdd: RDD[T],
    partition: Partition,
    context: TaskContext,
    storageLevel: StorageLevel): Iterator[T] = {

val key = RDDBlockId(rdd.id, partition.index)
  logDebug(s"Looking for partition $key")
  blockManager.get(key) match {
    case Some(blockResult) =>
      // Partition is already materialized, so just return its values
      val existingMetrics = context.taskMetrics
        .getInputMetricsForReadMethod(blockResult.readMethod)
      existingMetrics.incBytesRead(blockResult.bytes)

val iter = blockResult.data.asInstanceOf[Iterator[T]]
      new InterruptibleIterator[T](context, iter) {
        override def next(): T = {
          existingMetrics.incRecordsRead(1)
          delegate.next()
        }
      }
    case None =>
      // Acquire a lock for loading this partition
      // If another thread already holds the lock, wait for it to finish return its results
      val storedValues = acquireLockForPartition[T](key)
      if (storedValues.isDefined) {
        return new InterruptibleIterator[T](context, storedValues.get)
      }

// Otherwise, we have to load the partition ourselves
      try {
        logInfo(s"Partition $key not found, computing it")
        val computedValues = rdd.computeOrReadCheckpoint(partition, context)

// If the task is running locally, do not persist the result
        if (context.isRunningLocally) {
          return computedValues
        }

// Otherwise, cache the values and keep track of any updates in block statuses
        val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
        val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
        val metrics = context.taskMetrics
        val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
        metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
        new InterruptibleIterator(context, cachedValues)

} finally {
        loading.synchronized {
          loading.remove(key)
          loading.notifyAll()
        }
      }
  }
}

2、具体CacheManager在获得缓存数据的时候,首先会通过BlockManager来抓到数据(其中getLocal和getRemote在上一讲有提及);

/**
 * Get a block from the block manager (either local or remote).
 */
def get(blockId: BlockId): Option[BlockResult] = {
  val local = getLocal(blockId)
  if (local.isDefined) {
    logInfo(s"Found block $blockId locally")
    return local
  }
  val remote = getRemote(blockId)
  if (remote.isDefined) {
    logInfo(s"Found block $blockId remotely")
    return remote
  }
  None
}

3、缓存没有数据算的时候,先要锁数据,这里还是从blockManager中获得数据(一般走到这里从这里也取不到的);

/**
 * Acquire a loading lock for the partition identified by the given block ID.
 *
 * If the lock is free, just acquire it and return None. Otherwise, another thread is already
 * loading the partition, so we wait for it to finish and return the values loaded by the thread.
 */
private def acquireLockForPartition[T](id: RDDBlockId): Option[Iterator[T]] = {
  loading.synchronized {
    if (!loading.contains(id)) {
      // If the partition is free, acquire its lock to compute its value
      loading.add(id)
      None
    } else {
      // Otherwise, wait for another thread to finish and return its result
      logInfo(s"Another thread is loading $id, waiting for it to finish...")
      while (loading.contains(id)) {
        try {
          loading.wait()
        } catch {
          case e: Exception =>
            logWarning(s"Exception while waiting for another thread to load $id", e)
        }
      }
      logInfo(s"Finished waiting for $id")
      val values = blockManager.get(id)
      if (!values.isDefined) {
        /* The block is not guaranteed to exist even after the other thread has finished.
         * For instance, the block could be evicted after it was put, but before our get.
         * In this case, we still need to load the partition ourselves. */
        logInfo(s"Whoever was loading $id failed; we‘ll try it ourselves")
        loading.add(id)
      }
      values.map(_.data.asInstanceOf[Iterator[T]])
    }
  }
}

4、如果CacheManager没有通过BlockManager获得缓存内容的话,此时会通过BlockManager的RDD的如下方法来获得数据:

val computedValues = rdd.computeOrReadCheckpoint(partition, context)

/**
 * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
 */
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
  if (isCheckpointedAndMaterialized) {
    firstParent[T].iterator(split, context)
  } else {
    compute(split, context)
  }
}

上述方法首先会查看当前的RDD是否进行了checkpoint,如果进行了的话,就直接读取checkpoint的数据,否则的话,就必须进行计算,计算之后会通过putInBlockManager会把数据按照StorageLevel重新缓存起来;

备注:所以如果多步骤迭代的话,有了checkpoint,就极大提升效率了!

5、缓存的时候如果需要放在内存中,内存足够的情况下,看到一点内存就放一下,看到一点内存就放一下,一点一点放,实在放不完,就放disk;

private def putInBlockManager[T](
    key: BlockId,
    values: Iterator[T],
    level: StorageLevel,
    updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
    effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {

val putLevel = effectiveStorageLevel.getOrElse(level)
  if (!putLevel.useMemory) {
    /*
     * This RDD is not to be cached in memory, so we can just pass the computed values as an
     * iterator directly to the BlockManager rather than first fully unrolling it in memory.
     */
    updatedBlocks ++=
      blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
    blockManager.get(key) match {
      case Some(v) => v.data.asInstanceOf[Iterator[T]]
      case None =>
        logInfo(s"Failure to store $key")
        throw new BlockException(key, s"Block manager failed to return cached value for $key!")
    }
  } else {
    /*
     * This RDD is to be cached in memory. In this case we cannot pass the computed values
     * to the BlockManager as an iterator and expect to read it back later. This is because
     * we may end up dropping a partition from memory store before getting it back.
     *
     * In addition, we must be careful to not unroll the entire partition in memory at once.
     * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
     * single partition. Instead, we unroll the values cautiously, potentially aborting and
     * dropping the partition to disk if applicable.
     */
    blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
      case Left(arr) =>
        // We have successfully unrolled the entire partition, so cache it in memory
        updatedBlocks ++=
          blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
        arr.iterator.asInstanceOf[Iterator[T]]
      case Right(it) =>
        // There is not enough space to cache this partition in memory
        val returnValues = it.asInstanceOf[Iterator[T]]
        if (putLevel.useDisk) {
          logWarning(s"Persisting partition $key to disk instead.")
          val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
            useOffHeap = false, deserialized = false, putLevel.replication)
          putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
        } else {
          returnValues
        }
    }
  }
}

王家林老师名片:

中国Spark第一人

新浪微博:http://weibo.com/ilovepains

微信公众号:DT_Spark

博客:http://blog.sina.com.cn/ilovepains

手机:18610086859

QQ:1740415547

邮箱:[email protected]

时间: 2024-12-08 01:09:11

CacheManager彻底解密:CacheManager运行原理流程图和源码详解(DT大数据梦工厂)的相关文章

Checkpoint彻底解密:Checkpoint的运行原理和源码实现彻底详解(DT大数据梦工厂)

内容: 1.Checkpoint重大价值: 2.Checkpoint运行原理图: 3.Checkpoint源码解析: 机器学习.图计算稍微复杂迭代算法的时候都有Checkpoint的身影,作用不亚于persist ==========Checkpoint到底是什么============ 1.Spark 在生产环境下经常会面临transformation的RDD非常多(例如一个Job中包含1万个RDD)或者具体transformation的RDD本身计算特别复杂或者耗时(例如计算时长超过1个小时

第43课:Spark 1.6 RPC内幕解密:运行机制、源码详解、Netty与Akka等

Spark 是分布式计算框架,多台机器之间必然存在着通信.Spark在早期版本采用Akka实现.现在在Akka的上层抽象出了一个RpcEnv.RpcEnv负责管理机器之间的通信. RpcEnv包含了如下三大核心: RpcEndpoint 消息循环体,负责接收并处理消息.Spark中的Master.Worker都是RpcEndpoint . RpcEndpointRef :RpcEndpoint的引用,如果需要和RpcEndpoint通信,就必须获取它的RpcEndpointRef,通过RpcEn

DT大数据梦工厂第三十五课 Spark系统运行循环流程

本节课内容: 1.     TaskScheduler工作原理 2.     TaskScheduler源码 一.TaskScheduler工作原理 总体调度图: 通过前几节课的讲解,RDD和DAGScheduler以及Worker都已有深入的讲解,这节课我们主要讲解TaskScheduler的运行原理. 回顾: DAGScheduler面向整个Job划分多个Stage,划分是从后往前的回溯过程:运行时从前往后运行的.每个Stage中有很多任务Task,Task是可以并行执行的.它们的执行逻辑完

HA下Spark集群工作原理(DT大数据梦工厂)

Spark高可用HA实战 Spark集群工作原理详解 资源主要指内存.CPU 如果是单点的话,Master如果出现故障,则集群不能对外工作 Spark通过Zookeeper做HA,一般做HA是一个active级别,standby active就是当前工作 standby是随时准备active的挂了之后会切换成为active级别 以前一般是2台机器,一个active,一个standby 现在一般是3台机器,一个active,两个standby,甚至3台以上 Zookeeper包含了哪些内容:所有的

Spark内核架构解密(DT大数据梦工厂)

只有知道内核架构的基础上,才知道为什么要这样写程序? 手工绘图来解密Spark内核架构 通过案例来验证Spark内核架构 Spark架构思考 ==========Spark Runtime的几个概念============ 下载下来运行,基本都是standalone模式,如果掌握了standalone,则yarn和mesos,以后不做特别说明,一律是standalone模式 application=driver+executor,executor是具体处理数据分片,里面是线程池并发的处理数据分片

Spark Runtime(Driver、Masster、Worker、Executor)内幕解密(DT大数据梦工厂)

内容: 1.再论Spark集群部署: 2.Job提交解密: 3.Job的生成和接受: 4.Task的运行: 5.再论Shuffle: 从一个作业视角,透过Master.Drvier.Executor来透视Spark Runtime ==========再论Spark集群部署============ 官网中关于集群的部署: 默认情况下,每个Worker下有一个Executor,会最大化的使用内存和CPU. Master发指令给Worker来分配资源,不关心Worker能不能分配到这个资源,他发给多

Spark on Yarn彻底解密(DT大数据梦工厂)

内容: 1.Hadoop Yarn的工作流程解密: 2.Spark on Yarn两种运行模式实战: 3.Spark on Yarn工作流程解密: 4.Spark on Yarn工作内幕解密: 5.Spark on Yarn最佳实践: 资源管理框架Yarn Mesos是分布式集群的资源管理框架,和大数据没关系,但是可以管理大数据的资源 ==========Hadoop Yarn解析============ 1.Yarn是Hadoop推出的资源管理器,是负责分布式(大数据)集群计算的资源管理的,负

Master HA彻底解密(DT大数据梦工厂)

内容: 1.Master HA解析: 2.Master HA的四种方式: 3.Master HA的内部工作机制: 4.Master HA的源码解密: 本讲主要源码角度分析Master HA,因为在生产环境必然要做的 ==========Master HA解析============ Spark是Master-Slave的结构 现在业界是1个Master Active,2个以上standby 如果有HA的话,切换active的时候,会在上次运行的基础上继续运行 Drvier提交程序.申请资源,是跟

SparkRDD解密(DT大数据梦工厂)

第一阶段,彻底精通Spark 第二阶段,从0起步,操作项目 Hadoop是大数据的基础设施,存储等等 Spark是计算核心所在 1.RDD:基于工作集的应用抽象 2.RDD内幕解密 3.RDD思考 不掌握RDD的人,不可能成为Spark的高手 绝对精通RDD,解决问题的能力大大提高 各种框架底层封装的都是RDD,RDD提供了通用框架 RDD是Spark的通用抽象基石 顶级SPark高手, 1.能解决问题.性能调优: 2.Spark高手拿Spark过来就是修改的 ==========基于工作集的应