Spark源码系列(五)RDD是如何被分布式缓存?

这一章想讲一下Spark的缓存是如何实现的。这个persist方法是在RDD里面的,所以我们直接打开RDD这个类。

  def persist(newLevel: StorageLevel): this.type = {
    // StorageLevel不能随意更改
    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
      throw new UnsupportedOperationException("Cannot change storage level of an RDD after it was already assigned a level")
    }
    sc.persistRDD(this)
    // Register the RDD with the ContextCleaner for automatic GC-based cleanup    // 注册清理方法
    sc.cleaner.foreach(_.registerRDDForCleanup(this))
    storageLevel = newLevel
    this
  }

它调用SparkContext去缓存这个RDD,追杀下去。

  private[spark] def persistRDD(rdd: RDD[_]) {
    persistentRdds(rdd.id) = rdd
  }

它居然是用一个HashMap来存的,具体看这个map的类型是TimeStampedWeakValueHashMap[Int, RDD[_]]类型。把存进去的值都隐式转换成WeakReference,然后加到一个内部的一个ConcurrentHashMap里面。这里貌似也没干啥,这是有个鸟蛋用。。大神莫喷,知道干啥用的人希望告诉我一下。

CacheManager

现在并没有保存,等到真正运行Task运行的时候才会去缓存起来。入口在Task的runTask方法里面,具体的我们可以看ResultTask,它调用了RDD的iterator方法。

  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

一旦设置了StorageLevel,就要从SparkEnv的cacheManager取数据。

  def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel): Iterator[T] = {
    val key = RDDBlockId(rdd.id, split.index)
    blockManager.get(key) match {
      case Some(values) =>
        // 已经有了,直接返回就可以了
        new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])

      case None =>
        // loading包含这个key表示已经有人在加载了,等到loading被释放了,就可以去blockManager里面取到了
        loading.synchronized {
          if (loading.contains(key)) {
            while (loading.contains(key)) {
              try {
                loading.wait()
              } catch {
                case e: Exception =>
                  logWarning(s"Got an exception while waiting for another thread to load $key", e)
              }
            }
            // 别人成功拿到了,我们直接取结果就是了,如果别人取失败了,我们再来取一次
            blockManager.get(key) match {
              case Some(values) =>
                return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
              case None =>
                loading.add(key)
            }
          } else {
            loading.add(key)
          }
        }
        try {
          // 通过rdd自身的compute方法去计算得到结果,回去看看RDD那文章,自己看看源码就清楚了
          val computedValues = rdd.computeOrReadCheckpoint(split, context)

          // 如果是本地运行的,就没必要缓存了,直接返回即可
          if (context.runningLocally) {
            return computedValues
          }

          // 跟踪blocks的更新状态
          var updatedBlocks = Seq[(BlockId, BlockStatus)]()
          val returnValue: Iterator[T] = {
            if (storageLevel.useDisk && !storageLevel.useMemory) {
              /* 这是RDD采用DISK_ONLY的情况,直接扔给blockManager
               * 然后把结果直接返回,它不需要把结果一下子全部加载进内存
               * 这同样适用于MEMORY_ONLY_SER,但是我们需要在启用它之前确认blocks没被block store给丢弃 */
              updatedBlocks = blockManager.put(key, computedValues, storageLevel, tellMaster = true)
              blockManager.get(key) match {
                case Some(values) =>
                  values.asInstanceOf[Iterator[T]]
                case None =>
                  throw new Exception("Block manager failed to return persisted valued")
              }
            } else {
              // 先存到一个ArrayBuffer,然后一次返回,在blockManager里也存一份
              val elements = new ArrayBuffer[Any]
              elements ++= computedValues
              updatedBlocks = blockManager.put(key, elements, storageLevel, tellMaster = true)
              elements.iterator.asInstanceOf[Iterator[T]]
            }
          }

          // 更新task的监控参数
          val metrics = context.taskMetrics
          metrics.updatedBlocks = Some(updatedBlocks)

          new InterruptibleIterator(context, returnValue)

        } finally {
          // 改完了,释放锁
          loading.synchronized {
            loading.remove(key)
            loading.notifyAll()
          }
        }
    }
  }

1、如果blockManager当中有,直接从blockManager当中取。

2、如果blockManager没有,就先用RDD的compute函数得到出来一个Iterable接口。

3、如果StorageLevel是只保存在硬盘的话,就把值存在blockManager当中,然后从blockManager当中取,这样的好处是不会一次把数据全部加载进内存。

4、如果StorageLevel是需要使用内存的情况,就把结果添加到一个ArrayBuffer当中一次返回,另外在blockManager存上一份,下次直接从blockManager取。

对StorageLevel说明一下吧,贴一下它的源码。

class StorageLevel private(
    private var useDisk_ : Boolean,
    private var useMemory_ : Boolean,
    private var useOffHeap_ : Boolean,
    private var deserialized_ : Boolean,
    private var replication_ : Int = 1)

  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false)

大家注意看它那几个参数,useDisk_、useMemory_、useOffHeap_、deserialized_、replication_ 在具体的类型的时候是传的什么值。

下面我们的目标要放到blockManager。

BlockManager

BlockManager这个类比较大,我们从两方面开始看吧,putBytes和get方法。先从putBytes说起,之前说过Task运行结束之后,结果超过10M的话,会用BlockManager缓存起来。

env.blockManager.putBytes(blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)

putBytes内部又掉了另外一个方法doPut,方法很大呀,先折叠起来。

  private def doPut(
      blockId: BlockId,
      data: Values,
      level: StorageLevel,
      tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {// Return value
    val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

    // 记录它的StorageLevel,以便我们可以在它加载进内存之后,可以按需写入硬盘。
  // 此外,在我们把调用BlockInfo的markReay方法之前,都没法通过get方法获得该部分内容
    val putBlockInfo = {
      val tinfo = new BlockInfo(level, tellMaster)
      // 如果不存在,就添加到blockInfo里面
      val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
      if (oldBlockOpt.isDefined) {
        // 如果已经存在了,就不需要重复添加了
        if (oldBlockOpt.get.waitForReady()) {return updatedBlocks
        }
        // 存在于blockInfo当中->但是上一次保存失败了,拿出旧的信息,再试一遍
        oldBlockOpt.get
      } else {
        tinfo
      }
    }

    val startTimeMs = System.currentTimeMillis
    // 当我们需要存储数据,并且要复制数据到别的机器,我们需要访问它的值,但是因为我们的put操作会读取整个iterator,
    // 这就不会有任何的值留下。在我们保存序列化的数据的场景,我们可以记住这些bytes,但在其他场景,比如反序列化存储的
    // 时候,我们就必须依赖返回一个Iterator
    var valuesAfterPut: Iterator[Any] = null
    // Ditto for the bytes after the put
    var bytesAfterPut: ByteBuffer = null
    // Size of the block in bytes
    var size = 0L

    // 在保存数据之前,我们要实例化,在数据已经序列化并且准备好发送的情况下,这个过程是很快的
    val replicationFuture = if (data.isInstanceOf[ByteBufferValues] && level.replication > 1) {
      // duplicate并不是复制这些数据,只是做了一个包装
      val bufferView = data.asInstanceOf[ByteBufferValues].buffer.duplicate()
      Future {
        // 把block复制到别的机器上去
        replicate(blockId, bufferView, level)
      }
    } else {
      null
    }

    putBlockInfo.synchronized {

      var marked = false
      try {
        if (level.useMemory) {
          // 首先是保存到内存里面,尽管它也使用硬盘,等内存不够的时候,才会写入硬盘
          // 下面分了三种情况,但是Task的结果是ByteBufferValues这种情况,具体看putBytes方法
          val res = data match {
            case IteratorValues(iterator) =>
              memoryStore.putValues(blockId, iterator, level, true)
            case ArrayBufferValues(array) =>
              memoryStore.putValues(blockId, array, level, true)
            case ByteBufferValues(bytes) =>
              bytes.rewind()
              memoryStore.putBytes(blockId, bytes, level)
          }
          size = res.size
          // 这里写得那么恶心,是跟data的类型有关系的,data: Either[Iterator[_], ByteBuffer],Left是Iterator,Right是ByteBuffer
          res.data match {
            case Right(newBytes) => bytesAfterPut = newBytes
            case Left(newIterator) => valuesAfterPut = newIterator
          }
          // 把被置换到硬盘的blocks记录到updatedBlocks上
          res.droppedBlocks.foreach { block => updatedBlocks += block }
        } else if (level.useOffHeap) {
          // 保存到Tachyon上.
          val res = data match {
            case IteratorValues(iterator) =>
              tachyonStore.putValues(blockId, iterator, level, false)
            case ArrayBufferValues(array) =>
              tachyonStore.putValues(blockId, array, level, false)
            case ByteBufferValues(bytes) =>
              bytes.rewind()
              tachyonStore.putBytes(blockId, bytes, level)
          }
          size = res.size
          res.data match {
            case Right(newBytes) => bytesAfterPut = newBytes
            case _ =>
          }
        } else {
          // 直接保存到硬盘,不要复制到其它节点的就别返回数据了.
          val askForBytes = level.replication > 1
          val res = data match {
            case IteratorValues(iterator) =>
              diskStore.putValues(blockId, iterator, level, askForBytes)
            case ArrayBufferValues(array) =>
              diskStore.putValues(blockId, array, level, askForBytes)
            case ByteBufferValues(bytes) =>
              bytes.rewind()
              diskStore.putBytes(blockId, bytes, level)
          }
          size = res.size
          res.data match {
            case Right(newBytes) => bytesAfterPut = newBytes
            case _ =>
          }
        }
     // 通过blockId获得当前的block状态
        val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
        if (putBlockStatus.storageLevel != StorageLevel.NONE) {
          // 成功了,把该block标记为ready,通知BlockManagerMaster
          marked = true
          putBlockInfo.markReady(size)
          if (tellMaster) {
            reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
          }
          updatedBlocks += ((blockId, putBlockStatus))
        }
      } finally {
        // 如果没有标记成功,就把该block信息清除
       if (!marked) {
          blockInfo.remove(blockId)
          putBlockInfo.markFailure()
        }
      }
    }

    // 把数据发送到别的节点做备份
    if (level.replication > 1) {
      data match {
        case ByteBufferValues(bytes) => Await.ready(replicationFuture, Duration.Inf)
        case _ => {
          val remoteStartTime = System.currentTimeMillis
          // 把Iterator里面的数据序列化之后,发送到别的节点
          if (bytesAfterPut == null) {
            if (valuesAfterPut == null) {
              throw new SparkException("Underlying put returned neither an Iterator nor bytes! This shouldn‘t happen.")
            }
            bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
          }
          replicate(blockId, bytesAfterPut, level)
        }
      }
    }
    // 销毁bytesAfterPut
    BlockManager.dispose(bytesAfterPut)
    updatedBlocks
  }

从上面的的来看:

1、存储的时候按照不同的存储级别分了3种情况来处理:存在内存当中(包括MEMORY字样的),存在tachyon上(OFF_HEAP),只存在硬盘上(DISK_ONLY)。

2、存储完成之后会根据存储级别决定是否发送到别的节点,在名字上最后带2字的都是这种,2表示一个block会在两个节点上保存。

3、存储完毕之后,会向BlockManagerMaster汇报block的情况。

4、这里面的序列化其实是先压缩后序列化,默认使用的是LZF压缩,可以通过spark.io.compression.codec设定为snappy或者lzo,序列化方式通过spark.serializer设置,默认是JavaSerializer。

接下来我们再看get的情况。

    val local = getLocal(blockId)
    if (local.isDefined) return local
    val remote = getRemote(blockId)
    if (remote.isDefined) return remote
    None

先从本地取,本地没有再去别的节点取,都没有,返回None。从本地取就不说了,怎么进怎么出。讲一下怎么从别的节点去,它们是一个什么样子的关系?

我们先看getRemote方法

  private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = {
    val locations = Random.shuffle(master.getLocations(blockId))
    for (loc <- locations) {
      val data = BlockManagerWorker.syncGetBlock(GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
      if (data != null) {
        if (asValues) {
          return Some(dataDeserialize(blockId, data))
        } else {
          return Some(data)
        }
      }
    }
    None
  }

这个方法包括两个步骤:

1、用blockId通过master的getLocations方法找到它的位置。

2、通过BlockManagerWorker.syncGetBlock到指定的节点获取数据。

ok,下面就重点讲BlockManager和BlockManagerMaster之间的关系,以及BlockManager之间是如何相互传输数据。

BlockManager与BlockManagerMaster的关系

BlockManager我们使用的时候是从SparkEnv.get获得的,我们观察了一下SparkEnv,发现它包含了我们运行时候常用的那些东东。那它创建是怎么创建的呢,我们找到SparkEnv里面的create方法,右键FindUsages,就会找到两个地方调用了,一个是SparkContext,另一个是Executor。在SparkEnv的create方法里面会实例化一个BlockManager和BlockManagerMaster。这里我们需要注意看BlockManagerMaster的实例化方法,里面调用了registerOrLookup方法。

    def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
      if (isDriver) {
        actorSystem.actorOf(Props(newActor), name = name)
      } else {
        val driverHost: String = conf.get("spark.driver.host", "localhost")
        val driverPort: Int = conf.getInt("spark.driver.port", 7077)
        Utils.checkHost(driverHost, "Expected hostname")
        val url = s"akka.tcp://[email protected]$driverHost:$driverPort/user/$name"
        val timeout = AkkaUtils.lookupTimeout(conf)
        Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
      }
    }

所以从这里可以看出来,除了Driver之后的actor都是,都是持有的Driver的引用ActorRef。梳理一下,我们可以得出以下结论:

1、SparkContext持有一个BlockManager和BlockManagerMaster。

2、每一个Executor都持有一个BlockManager和BlockManagerMaster。

3、Executor和SparkContext的BlockManagerMaster通过BlockManagerMasterActor来通信。

接下来,我们看看BlockManagerMasterActor里的三组映射关系。

  // 1、BlockManagerId和BlockManagerInfo的映射关系
  private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
  // 2、Executor ID 和 Block manager ID的映射关系
  private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
  // 3、BlockId和保存它的BlockManagerId的映射关系
  private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]

看到这三组关系,前面的getLocations方法不用看它的实现,我们都应该知道是怎么找了。

BlockManager相互传输数据

BlockManager之间发送数据和接受数据是通过BlockManagerWorker的syncPutBlock和syncGetBlock方法来实现。看BlockManagerWorker的注释,说是BlockManager的网络接口,采用的是事件驱动模型。

再仔细看这两个方法,它传输的数据包装成BlockMessage之后,通过ConnectionManager的sendMessageReliablySync方法来传输。

接下来的故事就是nio之间的发送和接收了,就简单说几点吧:

1、ConnectionManager内部实例化一个selectorThread线程来接收小心,具体请看run方法。

2、Connection发送数据的时候,是一次把消息队列的message全部发送,不是一个一个message发送,具体看SendConnection的write方法,与之对应的接收看ReceivingConnection的read方法。

3、read完了之后,调用回调函数ConnectionManager的receiveMessage方法,它又调用了handleMessage方法,handleMessage又调用了BlockManagerWorker的onBlockMessageReceive方法。剧情有点儿狗血,我是快晕了。

  def processBlockMessage(blockMessage: BlockMessage): Option[BlockMessage] = {
    blockMessage.getType match {
      case BlockMessage.TYPE_PUT_BLOCK => {
        val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel)
        putBlock(pB.id, pB.data, pB.level)
        None
      }
      case BlockMessage.TYPE_GET_BLOCK => {
        val gB = new GetBlock(blockMessage.getId)
        val buffer = getBlock(gB.id)
        Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer)))
      }
      case _ => None
    }
  }

根据BlockMessage的类型进行处理,put类型就保存数据,get类型就从本地把block读出来返回给它。

相关参数

//BlockManager的最大内存
spark.storage.memoryFraction 默认值0.6//文件保存的位置spark.local.dir 默认是系统变量java.io.tmpdir的值//tachyon保存的地址spark.tachyonStore.url 默认值tachyon://localhost:19998//默认不启用netty来传输shuffle的数据spark.shuffle.use.netty 默认值是falsespark.shuffle.sender.port 默认值是0// Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory for receiving shuffle outputs)spark.reducer.maxMbInFlight 默认值是48*1024*1024

岑玉海

转载请注明出处,谢谢!

Spark源码系列(五)RDD是如何被分布式缓存?

时间: 2024-08-10 15:08:58

Spark源码系列(五)RDD是如何被分布式缓存?的相关文章

Spark源码系列(四)图解作业生命周期

这一章我们探索了Spark作业的运行过程,但是没把整个过程描绘出来,好,跟着我走吧,let you know! 我们先回顾一下这个图,Driver Program是我们写的那个程序,它的核心是SparkContext,回想一下,从api的使用角度,RDD都必须通过它来获得. 下面讲一讲它所不为认知的一面,它和其它组件是如何交互的. Driver向Master注册Application过程 SparkContext实例化之后,在内部实例化两个很重要的类,DAGScheduler和TaskSched

Spark源码系列(七)Spark on yarn具体实现

本来不打算写的了,但是真的是闲来无事,整天看美剧也没啥意思.这一章打算讲一下Spark on yarn的实现,1.0.0里面已经是一个stable的版本了,可是1.0.1也出来了,离1.0.0发布才一个月的时间,更新太快了,节奏跟不上啊,这里仍旧是讲1.0.0的代码,所以各位朋友也不要再问我讲的是哪个版本,目前为止发布的文章都是基于1.0.0的代码. 在第一章<spark-submit提交作业过程>的时候,我们讲过Spark on yarn的在cluster模式下它的main class是or

Spark源码系列(三)作业运行过程

导读 看这篇文章的时候,最好是能够跟着代码一起看,我是边看代码边写的,所以这篇文章的前进过程也就是我看代码的推进过程. 作业执行 上一章讲了RDD的转换,但是没讲作业的运行,它和Driver Program的关系是啥,和RDD的关系是啥? 官方给的例子里面,一执行collect方法就能出结果,那我们就从collect开始看吧,进入RDD,找到collect方法. def collect(): Array[T] = { val results = sc.runJob(this, (iter: It

Spark源码系列(八)Spark Streaming实例分析

这一章要讲Spark Streaming,讲之前首先回顾下它的用法,具体用法请参照<Spark Streaming编程指南>. Example代码分析 val ssc = new StreamingContext(sparkConf, Seconds(1)); // 获得一个DStream负责连接 监听端口:地址 val lines = ssc.socketTextStream(serverIP, serverPort); // 对每一行数据执行Split操作 val words = line

Spark源码系列(一)spark-submit提交作业过程

前言 折腾了很久,终于开始学习Spark的源码了,第一篇我打算讲一下Spark作业的提交过程.有不明白Spark的原理的话,有另外一位大牛已经写了一个系列的Spark的源码分析了,大家可以去参考他的,他的过程图画得非常好,他写过的我可能就不写了,实在没办法比人家写得更好. 下面给出他的地址: http://www.cnblogs.com/hseagle/p/3664933.html,屌丝们,赶紧去膜拜大神吧. 这个是Spark的App运行图,它通过一个Driver来和集群通信,集群负责作业的分配

第2课 Scala面向对象彻底精通及Spark源码SparkContext,RDD阅读总结

第2课:Scala面向对象彻底精通及Spark源码阅读本期内容:1 Scala中的类.object实战详解 2 Scala中的抽象类.接口实战详解 3 综合案例及Spark源码解析 一:定义类class HiScala{private var name = "Spark" def sayName(){println(name)}def getName = name} Scala中,变量与类中的方法是同等级的,可以直接赋值给方法. scala中的get与set与Java中的get,set

spark源码阅读笔记RDD(七) RDD的创建、读取和保存

Spark支持很多输入和输出源,同时还支持内建RDD.Spark本身是基于Hadoop的生态圈,它可以通过 Hadoop MapReduce所使用的InpoutFormat和OutputFormat接口访问数据.而且大部分的文件格式和存储系统 (HDFS,Hbase,S3等)都支持这种接口.Spark常见的数据源如下: (1) 文件格式和文件系统,也就是我们经常用的TXT,JSON,CSV等這些文件格式 (2)SparkSQL中的结构化数据源 (3)数据库与键值存储(Hbase和JDBC源) 当

spark源码系列之累加器实现机制及自定义累加器

一,基本概念 累加器是Spark的一种变量,顾名思义该变量只能增加.有以下特点: 1,累加器只能在Driver端构建及并只能是Driver读取结果,Task只能累加. 2,累加器不会改变Spark Lazy计算的特点.只会在Job触发的时候进行相关累加操作. 3,现有累加器的类型.相信有很多学习大数据的道友,在这里我给大家说说我滴群哦,大数据海量知识分享,784789432.在此我保证,绝对大数据的干货,等待各位的到来,我们一同从入门到精通吧! 二,累加器的使用 Driver端初始化,并在Act

Spark源码系列(九)Spark SQL初体验之解析过程详解

好久没更新博客了,之前学了一些R语言和机器学习的内容,做了一些笔记,之后也会放到博客上面来给大家共享.一个月前就打算更新Spark Sql的内容了,因为一些别的事情耽误了,今天就简单写点,Spark1.2马上就要出来了,不知道变动会不会很大,据说添加了很多的新功能呢,期待中... 首先声明一下这个版本的代码是1.1的,之前讲的都是1.0的. Spark支持两种模式,一种是在spark里面直接写sql,可以通过sql来查询对象,类似.net的LINQ一样,另外一种支持hive的HQL.不管是哪种方