Apache Spark-1.0.0浅析(十):数据存储——读写操作

“RDD是由不同的partition组成的,transformation和action是在partition上面进行的;而在storage模块内部,RDD又被视为由不同的block组成,对于RDD的存取是以block为单位进行的,本质上partition和block是等价的,只是看待的角度不同。在Spark storage模块中中存取数据的最小单位是block,所有的操作都是以block为单位进行的。”

BlockManager中定义了三种主要的存储类型(tackyonStore暂且不做分析)

private[storage] val memoryStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore = new DiskStore(this, diskBlockManager)
private[storage] lazy val tachyonStore: TachyonStore

一、DiskStore

首先看diskStore,实例化DiskStore时带入diskBlockManager参数

val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
    conf.get("spark.local.dir",  System.getProperty("java.io.tmpdir")))

DiskBlockManager初始化时,类中为spark.local.dir中的每个路径创建一个本地目录,在这些目录中,创建多个子目录hash存放文件,避免在顶层目录存在大的索引节点

// Create one local directory for each path mentioned in spark.local.dir; then, inside this
  // directory, create multiple subdirectories that we will hash files into, in order to avoid
  // having really large inodes at the top level.
  private val localDirs: Array[File] = createLocalDirs()
  private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))

createLocalDir创建本地目录,每个本地目录其实是一个文件夹,文件夹以“spark-local“+”日期”+“随机整数“形式命名,block以文件的形式存放在localDir中

private def createLocalDirs(): Array[File] = {
    logDebug("Creating local directories at root dirs ‘" + rootDirs + "‘")
    val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
    rootDirs.split(",").map { rootDir =>
      var foundLocalDir = false
      var localDir: File = null
      var localDirId: String = null
      var tries = 0
      val rand = new Random()
      while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) {
        tries += 1
        try {
          localDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
          localDir = new File(rootDir, "spark-local-" + localDirId)
          if (!localDir.exists) {
            foundLocalDir = localDir.mkdirs()
          }
        } catch {
          case e: Exception =>
            logWarning("Attempt " + tries + " to create local dir " + localDir + " failed", e)
        }
      }
      if (!foundLocalDir) {
        logError("Failed " + MAX_DIR_CREATION_ATTEMPTS +
          " attempts to create local dir in " + rootDir)
        System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
      }
      logInfo("Created local directory at " + localDir)
      localDir
    }
  }

所以,DiskStoreManager中对于block的所有操作,都归结为对于File的存取操作,即维护和创建逻辑Block和物理地址的映射。默认的,一个block映射为该blockID命名的文件。

getFile方法创建这种映射,根据filename,hash得到子文件夹,将filename映射到其中

def getFile(filename: String): File = {
    // Figure out which local directory it hashes to, and which subdirectory in that
    val hash = Utils.nonNegativeHash(filename)
    val dirId = hash % localDirs.length
    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir

    // Create the subdirectory if it doesn‘t already exist
    var subDir = subDirs(dirId)(subDirId)
    if (subDir == null) {
      subDir = subDirs(dirId).synchronized {
        val old = subDirs(dirId)(subDirId)
        if (old != null) {
          old
        } else {
          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
          newDir.mkdir()
          subDirs(dirId)(subDirId) = newDir
          newDir
        }
      }
    }

    new File(subDir, filename)
  }

存入数据时:

BlockManager对于diskStore,根据不同的值类型,有两种操作putValues和putBytes

putValues通过blockId得到对应file,根据file创建FileOutputStream,然后序列化写入values

override def putValues(
      blockId: BlockId,
      values: Iterator[Any],
      level: StorageLevel,
      returnValues: Boolean)
    : PutResult = {

    logDebug("Attempting to write values for block " + blockId)
    val startTime = System.currentTimeMillis
    val file = diskManager.getFile(blockId)
    val outputStream = new FileOutputStream(file)
    blockManager.dataSerializeStream(blockId, outputStream, values)
    val length = file.length

    val timeTaken = System.currentTimeMillis - startTime
    logDebug("Block %s stored as %s file on disk in %d ms".format(
      file.getName, Utils.bytesToString(length), timeTaken))

    if (returnValues) {
      // Return a byte buffer for the contents of the file
      val buffer = getBytes(blockId).get
      PutResult(length, Right(buffer))
    } else {
      PutResult(length, null)
    }
  }

putBytes则通过blockId得到file, 根据file获得通道channel,然后写入bytes

override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) : PutResult = {
    // So that we do not modify the input offsets !
    // duplicate does not copy buffer, so inexpensive
    val bytes = _bytes.duplicate()
    logDebug("Attempting to put block " + blockId)
    val startTime = System.currentTimeMillis
    val file = diskManager.getFile(blockId)
    val channel = new FileOutputStream(file).getChannel()
    while (bytes.remaining > 0) {
      channel.write(bytes)
    }
    channel.close()
    val finishTime = System.currentTimeMillis
    logDebug("Block %s stored as %s file on disk in %d ms".format(
      file.getName, Utils.bytesToString(bytes.limit), (finishTime - startTime)))
    return PutResult(bytes.limit(), Right(bytes.duplicate()))
  }

读取数据时:

执行getValues和getBytes,getValues也调用getBytes

override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
    getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
  }

getBytes通过blockId找到文件位置,然后建立channel,对于小文件,直接读入栈中,大文件则映射到内存中

override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
    val segment = diskManager.getBlockLocation(blockId)
    val channel = new RandomAccessFile(segment.file, "r").getChannel()

    try {
      // For small files, directly read rather than memory map
      if (segment.length < minMemoryMapBytes) {
        val buf = ByteBuffer.allocate(segment.length.toInt)
        channel.read(buf, segment.offset)
        buf.flip()
        Some(buf)
      } else {
        Some(channel.map(MapMode.READ_ONLY, segment.offset, segment.length))
      }
    } finally {
      channel.close()
    }
  }

二、MemoryStore

MemoryStore在内存中存放blocks,不是作为反序列化的Java对象的ArrayBuffer,就是作为序列化的ByteBuffer。与diskStore需要创建本地目录相比,memoryStore实例化时,创建一个LinkedHashMap,以此维护BlockId和block entry的映射

case class Entry(value: Any, size: Long, deserialized: Boolean)

private val entries = new LinkedHashMap[BlockId, Entry](32, 0.75f, true)

存入数据时:

类似的,BlockManager对于memoryStore,也有两种操作putValues和putBytes

putValues首先估计存入实例的大小,然后调用tryToPut尝试放入内存中内存

override def putValues(
      blockId: BlockId,
      values: ArrayBuffer[Any],
      level: StorageLevel,
      returnValues: Boolean): PutResult = {
    if (level.deserialized) {
      val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
      val putAttempt = tryToPut(blockId, values, sizeEstimate, deserialized = true)
      PutResult(sizeEstimate, Left(values.iterator), putAttempt.droppedBlocks)
    } else {
      val bytes = blockManager.dataSerialize(blockId, values.iterator)
      val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false)
      PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks)
    }
  }

putBytes同样估计存入实例的大小,然后调用tryToPut尝试存入内存

override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
    // Work on a duplicate - since the original input might be used elsewhere.
    val bytes = _bytes.duplicate()
    bytes.rewind()
    if (level.deserialized) {
      val values = blockManager.dataDeserialize(blockId, bytes)
      val elements = new ArrayBuffer[Any]
      elements ++= values
      val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
      tryToPut(blockId, elements, sizeEstimate, true)
      PutResult(sizeEstimate, Left(values.toIterator))
    } else {
      tryToPut(blockId, bytes, bytes.limit, false)
      PutResult(bytes.limit(), Right(bytes.duplicate()))
    }
  }

两种操作最终都调用tryToPut,putLock用来确保所有的存放请求和相关block存入只被唯一的线程完成,否则,当一个线程正在向空闲内存存放block时,另一个线程可能也在用同一块空闲内存存放不同的block。调用ensureFreeSpace确保内存中有足够的空间存放block。如果memory空间充足,新建entry,加入LinkedHashMap;否则,调用dropFromMemory将block落入磁盘

/**
   * Try to put in a set of values, if we can free up enough space. The value should either be
   * an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)
   * size must also be passed by the caller.
   *
   * Lock on the object putLock to ensure that all the put requests and its associated block
   * dropping is done by only on thread at a time. Otherwise while one thread is dropping
   * blocks to free memory for one block, another thread may use up the freed space for
   * another block.
   *
   * Return whether put was successful, along with the blocks dropped in the process.
   */
  private def tryToPut(
      blockId: BlockId,
      value: Any,
      size: Long,
      deserialized: Boolean): ResultWithDroppedBlocks = {

    /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks
     * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has
     * been released, it must be ensured that those to-be-dropped blocks are not double counted
     * for freeing up more space for another block that needs to be put. Only then the actually
     * dropping of blocks (and writing to disk if necessary) can proceed in parallel. */

    var putSuccess = false
    val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

    putLock.synchronized {
      val freeSpaceResult = ensureFreeSpace(blockId, size)
      val enoughFreeSpace = freeSpaceResult.success
      droppedBlocks ++= freeSpaceResult.droppedBlocks

      if (enoughFreeSpace) {
        val entry = new Entry(value, size, deserialized)
        entries.synchronized {
          entries.put(blockId, entry)
          currentMemory += size
        }
        if (deserialized) {
          logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
            blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
        } else {
          logInfo("Block %s stored as bytes to memory (size %s, free %s)".format(
            blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
        }
        putSuccess = true
      } else {
        // Tell the block manager that we couldn‘t put it in memory so that it can drop it to
        // disk if the block allows disk storage.
        val data = if (deserialized) {
          Left(value.asInstanceOf[ArrayBuffer[Any]])
        } else {
          Right(value.asInstanceOf[ByteBuffer].duplicate())
        }
        val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
        droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
      }
    }
    ResultWithDroppedBlocks(putSuccess, droppedBlocks)
  }

dropFromMemory同样调用diskstore.putValue或diskstore.putBytes将blocks存入硬盘,当然首先需要判断存储级别是否使用硬盘,最后从memoryStore中删除blockId信息。如果存储级别不使用硬盘,则直接移除blockId

/**
   * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
   * store reaches its limit and needs to free up space.
   *
   * Return the block status if the given block has been updated, else None.
   */
  def dropFromMemory(
      blockId: BlockId,
      data: Either[ArrayBuffer[Any], ByteBuffer]): Option[BlockStatus] = {

    logInfo("Dropping block " + blockId + " from memory")
    val info = blockInfo.get(blockId).orNull

    // If the block has not already been dropped
    if (info != null)  {
      info.synchronized {
        // required ? As of now, this will be invoked only for blocks which are ready
        // But in case this changes in future, adding for consistency sake.
        if (!info.waitForReady()) {
          // If we get here, the block write failed.
          logWarning("Block " + blockId + " was marked as failure. Nothing to drop")
          return None
        }

        var blockIsUpdated = false
        val level = info.level

        // Drop to disk, if storage level requires
        if (level.useDisk && !diskStore.contains(blockId)) {
          logInfo("Writing block " + blockId + " to disk")
          data match {
            case Left(elements) =>
              diskStore.putValues(blockId, elements, level, false)
            case Right(bytes) =>
              diskStore.putBytes(blockId, bytes, level)
          }
          blockIsUpdated = true
        }

        // Actually drop from memory store
        val droppedMemorySize =
          if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
        val blockIsRemoved = memoryStore.remove(blockId)
        if (blockIsRemoved) {
          blockIsUpdated = true
        } else {
          logWarning("Block " + blockId + " could not be dropped from memory as it does not exist")
        }

        val status = getCurrentBlockStatus(blockId, info)
        if (info.tellMaster) {
          reportBlockStatus(blockId, info, status, droppedMemorySize)
        }
        if (!level.useDisk) {
          // The block is completely gone from this node; forget it so we can put() it again later.
          blockInfo.remove(blockId)
        }
        if (blockIsUpdated) {
          return Some(status)
        }
      }
    }
    None
  }

读取数据时:

相应的两个方法getValues和getBytes类似,都是先从entry中获得blockId,然后从LinkedHashMap中根据BlockId得到对应的数据。getValues如下

override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
    val entry = entries.synchronized {
      entries.get(blockId)
    }
    if (entry == null) {
      None
    } else if (entry.deserialized) {
      Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
    } else {
      val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn‘t actually copy data
      Some(blockManager.dataDeserialize(blockId, buffer))
    }
  }

getBytes如下

override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
    val entry = entries.synchronized {
      entries.get(blockId)
    }
    if (entry == null) {
      None
    } else if (entry.deserialized) {
      Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[ArrayBuffer[Any]].iterator))
    } else {
      Some(entry.value.asInstanceOf[ByteBuffer].duplicate())   // Doesn‘t actually copy the data
    }
  }

三、BlockManager封装

BlockManager为我们提供了doPut和doGet方法,使用这两个方法对block进行存取操作,无需关心底层实现

存入操作:

对于三种不同的数据类型:Iterator, ArrayBuffer和ByteBuffer,有三种不同的put操作相对应,但是统一调用doPut方法

def put(
      blockId: BlockId,
      values: Iterator[Any],
      level: StorageLevel,
      tellMaster: Boolean): Seq[(BlockId, BlockStatus)] = {
    doPut(blockId, IteratorValues(values), level, tellMaster)
  }
 
def put(    blockId: BlockId,    values: ArrayBuffer[Any],    level: StorageLevel,    tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {  require(values != null, "Values is null")  doPut(blockId, ArrayBufferValues(values), level, tellMaster)}
 
def putBytes(    blockId: BlockId,    bytes: ByteBuffer,    level: StorageLevel,    tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {  require(bytes != null, "Bytes is null")  doPut(blockId, ByteBufferValues(bytes), level, tellMaster)}

doPut方法,要求blockId和storageLevel不能为空,为block创建BlockInfo实例,同时在blockInfo中将其加锁,使其他线程不能get访问此block。然后根据storageLevel将数据存储到memory或者disk上,然后markReady使其可以被其他线程读取。最后,如果level.replication大于1,调用replicate将该block复制到其他节点

private def doPut(
      blockId: BlockId,
      data: Values,
      level: StorageLevel,
      tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {

    require(blockId != null, "BlockId is null")
    require(level != null && level.isValid, "StorageLevel is null or invalid")

    // Return value
    val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

    // Remember the block‘s storage level so that we can correctly drop it to disk if it needs
    // to be dropped right after it got put into memory. Note, however, that other threads will
    // not be able to get() this block until we call markReady on its BlockInfo.
    val putBlockInfo = {
      val tinfo = new BlockInfo(level, tellMaster)
      // Do atomically !
      val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)

      if (oldBlockOpt.isDefined) {
        if (oldBlockOpt.get.waitForReady()) {
          logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
          return updatedBlocks
        }

        // TODO: So the block info exists - but previous attempt to load it (?) failed.
        // What do we do now ? Retry on it ?
        oldBlockOpt.get
      } else {
        tinfo
      }
    }

    val startTimeMs = System.currentTimeMillis

    // If we‘re storing values and we need to replicate the data, we‘ll want access to the values,
    // but because our put will read the whole iterator, there will be no values left. For the
    // case where the put serializes data, we‘ll remember the bytes, above; but for the case where
    // it doesn‘t, such as deserialized storage, let‘s rely on the put returning an Iterator.
    var valuesAfterPut: Iterator[Any] = null

    // Ditto for the bytes after the put
    var bytesAfterPut: ByteBuffer = null

    // Size of the block in bytes
    var size = 0L

    // If we‘re storing bytes, then initiate the replication before storing them locally.
    // This is faster as data is already serialized and ready to send.
    val replicationFuture = if (data.isInstanceOf[ByteBufferValues] && level.replication > 1) {
      // Duplicate doesn‘t copy the bytes, just creates a wrapper
      val bufferView = data.asInstanceOf[ByteBufferValues].buffer.duplicate()
      Future {
        replicate(blockId, bufferView, level)
      }
    } else {
      null
    }

    putBlockInfo.synchronized {
      logTrace("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
        + " to get into synchronized block")

      var marked = false
      try {
        if (level.useMemory) {
          // Save it just to memory first, even if it also has useDisk set to true; we will
          // drop it to disk later if the memory store can‘t hold it.
          val res = data match {
            case IteratorValues(iterator) =>
              memoryStore.putValues(blockId, iterator, level, true)
            case ArrayBufferValues(array) =>
              memoryStore.putValues(blockId, array, level, true)
            case ByteBufferValues(bytes) =>
              bytes.rewind()
              memoryStore.putBytes(blockId, bytes, level)
          }
          size = res.size
          res.data match {
            case Right(newBytes) => bytesAfterPut = newBytes
            case Left(newIterator) => valuesAfterPut = newIterator
          }
          // Keep track of which blocks are dropped from memory
          res.droppedBlocks.foreach { block => updatedBlocks += block }
        } else if (level.useOffHeap) {
          // Save to Tachyon.
          val res = data match {
            case IteratorValues(iterator) =>
              tachyonStore.putValues(blockId, iterator, level, false)
            case ArrayBufferValues(array) =>
              tachyonStore.putValues(blockId, array, level, false)
            case ByteBufferValues(bytes) =>
              bytes.rewind()
              tachyonStore.putBytes(blockId, bytes, level)
          }
          size = res.size
          res.data match {
            case Right(newBytes) => bytesAfterPut = newBytes
            case _ =>
          }
        } else {
          // Save directly to disk.
          // Don‘t get back the bytes unless we replicate them.
          val askForBytes = level.replication > 1

          val res = data match {
            case IteratorValues(iterator) =>
              diskStore.putValues(blockId, iterator, level, askForBytes)
            case ArrayBufferValues(array) =>
              diskStore.putValues(blockId, array, level, askForBytes)
            case ByteBufferValues(bytes) =>
              bytes.rewind()
              diskStore.putBytes(blockId, bytes, level)
          }
          size = res.size
          res.data match {
            case Right(newBytes) => bytesAfterPut = newBytes
            case _ =>
          }
        }

        val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
        if (putBlockStatus.storageLevel != StorageLevel.NONE) {
          // Now that the block is in either the memory, tachyon, or disk store,
          // let other threads read it, and tell the master about it.
          marked = true
          putBlockInfo.markReady(size)
          if (tellMaster) {
            reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
          }
          updatedBlocks += ((blockId, putBlockStatus))
        }
      } finally {
        // If we failed in putting the block to memory/disk, notify other possible readers
        // that it has failed, and then remove it from the block info map.
        if (!marked) {
          // Note that the remove must happen before markFailure otherwise another thread
          // could‘ve inserted a new BlockInfo before we remove it.
          blockInfo.remove(blockId)
          putBlockInfo.markFailure()
          logWarning("Putting block " + blockId + " failed")
        }
      }
    }
    logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))

    // Either we‘re storing bytes and we asynchronously started replication, or we‘re storing
    // values and need to serialize and replicate them now:
    if (level.replication > 1) {
      data match {
        case ByteBufferValues(bytes) => Await.ready(replicationFuture, Duration.Inf)
        case _ => {
          val remoteStartTime = System.currentTimeMillis
          // Serialize the block if not already done
          if (bytesAfterPut == null) {
            if (valuesAfterPut == null) {
              throw new SparkException(
                "Underlying put returned neither an Iterator nor bytes! This shouldn‘t happen.")
            }
            bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
          }
          replicate(blockId, bytesAfterPut, level)
          logDebug("Put block " + blockId + " remotely took " +
            Utils.getUsedTimeMs(remoteStartTime))
        }
      }
    }

    BlockManager.dispose(bytesAfterPut)

    if (level.replication > 1) {
      logDebug("Put for block " + blockId + " with replication took " +
        Utils.getUsedTimeMs(startTimeMs))
    } else {
      logDebug("Put for block " + blockId + " without replication took " +
        Utils.getUsedTimeMs(startTimeMs))
    }

    updatedBlocks
  }

读取操作:

get首先根据blockId调用getLocal从本地获取block,如果不能得到,则调用getRemote从其他节点BlockManger获取block。“在通常情况下Spark任务的分配是根据block的分布决定的,任务往往会被分配到拥有block的节点上,因此getLocal()就能找到所需的block;但是在资源有限的情况下,Spark会将任务调度到与block不同的节点上,这样就必须通过getRemote()来获得block。”

def get(blockId: BlockId): Option[Iterator[Any]] = {
    val local = getLocal(blockId)
    if (local.isDefined) {
      logInfo("Found block %s locally".format(blockId))
      return local
    }
    val remote = getRemote(blockId)
    if (remote.isDefined) {
      logInfo("Found block %s remotely".format(blockId))
      return remote
    }
    None
  }

看一下getLocal,调用doGetLocal

/**
   * Get block from local block manager.
   */
  def getLocal(blockId: BlockId): Option[Iterator[Any]] = {
    logDebug("Getting local block " + blockId)
    doGetLocal(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
  }

doGetLocal首先判断storageLevel如果使用内存,则读入block;然后判断是否使用硬盘,如果使用硬盘且使用内存,则将数据读入内存中,如果只使用硬盘不使用内存,则读取block并返回;如果不使用硬盘,block未在本地找到

private def doGetLocal(blockId: BlockId, asValues: Boolean): Option[Any] = {
    val info = blockInfo.get(blockId).orNull
    if (info != null) {
      info.synchronized {

        // If another thread is writing the block, wait for it to become ready.
        if (!info.waitForReady()) {
          // If we get here, the block write failed.
          logWarning("Block " + blockId + " was marked as failure.")
          return None
        }

        val level = info.level
        logDebug("Level for block " + blockId + " is " + level)

        // Look for the block in memory
        if (level.useMemory) {
          logDebug("Getting block " + blockId + " from memory")
          val result = if (asValues) {
            memoryStore.getValues(blockId)
          } else {
            memoryStore.getBytes(blockId)
          }
          result match {
            case Some(values) =>
              return Some(values)
            case None =>
              logDebug("Block " + blockId + " not found in memory")
          }
        }

        // Look for the block in Tachyon
        if (level.useOffHeap) {
          logDebug("Getting block " + blockId + " from tachyon")
          if (tachyonStore.contains(blockId)) {
            tachyonStore.getBytes(blockId) match {
              case Some(bytes) => {
                if (!asValues) {
                  return Some(bytes)
                } else {
                  return Some(dataDeserialize(blockId, bytes))
                }
              }
              case None =>
                logDebug("Block " + blockId + " not found in tachyon")
            }
          }
        }

        // Look for block on disk, potentially storing it back into memory if required:
        if (level.useDisk) {
          logDebug("Getting block " + blockId + " from disk")
          val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
            case Some(bytes) => bytes
            case None =>
              throw new Exception("Block " + blockId + " not found on disk, though it should be")
          }
          assert (0 == bytes.position())

          if (!level.useMemory) {
            // If the block shouldn‘t be stored in memory, we can just return it:
            if (asValues) {
              return Some(dataDeserialize(blockId, bytes))
            } else {
              return Some(bytes)
            }
          } else {
            // Otherwise, we also have to store something in the memory store:
            if (!level.deserialized || !asValues) {
              // We‘ll store the bytes in memory if the block‘s storage level includes
              // "memory serialized", or if it should be cached as objects in memory
              // but we only requested its serialized bytes:
              val copyForMemory = ByteBuffer.allocate(bytes.limit)
              copyForMemory.put(bytes)
              memoryStore.putBytes(blockId, copyForMemory, level)
              bytes.rewind()
            }
            if (!asValues) {
              return Some(bytes)
            } else {
              val values = dataDeserialize(blockId, bytes)
              if (level.deserialized) {
                // Cache the values before returning them:
                // TODO: Consider creating a putValues that also takes in a iterator?
                val valuesBuffer = new ArrayBuffer[Any]
                valuesBuffer ++= values
                memoryStore.putValues(blockId, valuesBuffer, level, true).data match {
                  case Left(values2) =>
                    return Some(values2)
                  case _ =>
                    throw new Exception("Memory store did not return back an iterator")
                }
              } else {
                return Some(values)
              }
            }
          }
        }
      }
    } else {
      logDebug("Block " + blockId + " not registered locally")
    }
    None
  }

再看一下getRemote,调用doGetRemote

/**
   * Get block from remote block managers.
   */
  def getRemote(blockId: BlockId): Option[Iterator[Any]] = {
    logDebug("Getting remote block " + blockId)
    doGetRemote(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
  }

doGetRemote首先取得该block的所有location信息,然后根据location向远端发送请求获取block,只要有一个远端返回block该函数就返回而不继续发送请求

private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = {
    require(blockId != null, "BlockId is null")
    val locations = Random.shuffle(master.getLocations(blockId))
    for (loc <- locations) {
      logDebug("Getting remote block " + blockId + " from " + loc)
      val data = BlockManagerWorker.syncGetBlock(
        GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
      if (data != null) {
        if (asValues) {
          return Some(dataDeserialize(blockId, data))
        } else {
          return Some(data)
        }
      }
      logDebug("The value of block " + blockId + " is null")
    }
    logDebug("Block " + blockId + " not found")
    None
  }

四、Partition和Block

最后,说明Partition是如何转化为block存储的。资源调度——Task执行中曾经分析过,对于RDD的一系列transformation或action,将转化为对于partitions的tasks的执行,而最后是调用getOrCompute方法。getOrCompute首先根据RDD id和partition index构造出key(blockId),根据key从BlockManager中取出相应的block。如果该block存在,表示此RDD在之前已经被计算过并存储在BlockManager中,可以直接读取无需再重新计算。如果该block不存在,则需要调用RDD的computeOrReadCheckpoint方法,读取checkpoint或者计算得到新的block,并将其存储到BlockManager中。需要注意的是block的计算和存储是阻塞的,若另一线程也需要用到此block则需等到该线程block的loading结束。

END

时间: 2024-11-05 20:41:32

Apache Spark-1.0.0浅析(十):数据存储——读写操作的相关文章

Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN

Spark Streaming 编程指南 概述 一个入门示例 基础概念 依赖 初始化 StreamingContext Discretized Streams (DStreams)(离散化流) Input DStreams 和 Receivers(接收器) DStreams 上的 Transformations(转换) DStreams 上的输出操作 DataFrame 和 SQL 操作 MLlib 操作 缓存 / 持久性 Checkpointing Accumulators, Broadcas

Apache Spark 2.2.0 中文文档 - SparkR (R on Spark) | ApacheCN

SparkR (R on Spark) 概述 SparkDataFrame 启动: SparkSession 从 RStudio 来启动 创建 SparkDataFrames 从本地的 data frames 来创建 SparkDataFrames 从 Data Sources(数据源)创建 SparkDataFrame 从 Hive tables 来创建 SparkDataFrame SparkDataFrame 操作 Selecting rows(行), columns(列) Groupin

Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN

Spark SQL, DataFrames and Datasets Guide Overview SQL Datasets and DataFrames 开始入门 起始点: SparkSession 创建 DataFrames 无类型的Dataset操作 (aka DataFrame 操作) Running SQL Queries Programmatically 全局临时视图 创建Datasets RDD的互操作性 使用反射推断Schema 以编程的方式指定Schema Aggregatio

Apache Spark 2.2.0 中文文档 - Structured Streaming 编程指南 | ApacheCN

Structured Streaming 编程指南 概述 快速示例 Programming Model (编程模型) 基本概念 处理 Event-time 和延迟数据 容错语义 API 使用 Datasets 和 DataFrames 创建 streaming DataFrames 和 streaming Datasets Input Sources (输入源) streaming DataFrames/Datasets 的模式接口和分区 streaming DataFrames/Dataset

Apache Spark 2.2.0 中文文档 - GraphX Programming Guide | ApacheCN

GraphX Programming Guide 概述 入门 属性 Graph 示例属性 Graph Graph 运算符 运算符的汇总表 Property 运算符 Structural 运算符 Join 运算符 邻域聚合 聚合消息 (aggregateMessages) Map Reduce Triplets Transition Guide (Legacy) 计算级别信息 收集相邻点 Caching and Uncaching Pregel API Graph 建造者 Vertex and E

Apache Spark 2.2.0 中文文档 - Spark RDD(Resilient Distributed Datasets)

Spark RDD(Resilient Distributed Datasets)论文 概要 1: 介绍 2: Resilient Distributed Datasets(RDDs) 2.1 RDD 抽象 2.2 Spark 编程接口 2.2.1 例子 – 监控日志数据挖掘 2.3 RDD 模型的优势 2.4 不适合用 RDDs 的应用 3 Spark 编程接口 3.1 Spark 中 RDD 的操作 3.2 举例应用 3.2.1 线性回归 3.2.2 PageRank 4 表达 RDDs 5

Apache Spark 2.2.0新特性介绍(转载)

这个版本是 Structured Streaming 的一个重要里程碑,因为其终于可以正式在生产环境中使用,实验标签(experimental tag)已经被移除.在流系统中支持对任意状态进行操作:Apache Kafka 0.10 的 streaming 和 batch API支持读和写操作.除了在 SparkR, MLlib 和 GraphX 里面添加新功能外,该版本更多的工作在系统的可用性(usability).稳定性(stability)以及代码的润色(polish)并解决了超过 110

Apache Spark 2.2.0 中文文档 - Submitting Applications | ApacheCN

Submitting Applications 在 script in Spark的 bin 目录中的spark-submit 脚本用与在集群上启动应用程序.它可以通过一个统一的接口使用所有 Spark 支持的 cluster managers,所以您不需要专门的为每个cluster managers配置您的应用程序. 打包应用依赖 如果您的代码依赖了其它的项目,为了分发代码到 Spark 集群中您将需要将它们和您的应用程序一起打包.为此,创建一个包含您的代码以及依赖的 assembly jar

Apache Spark 1.5.0正式发布

Spark 1.5.0是1.x线上的第6个发行版.这个版本共处理了来自230+contributors和80+机构的1400+个patches.Spark 1.5的许多改变都是围绕在提升Spark的性能.可用性以及操作稳定性.Spark 1.5.0焦点在Tungsten项目,它主要是通过对低层次的组建进行优化从而提升Spark的性能.Spark 1.5版本为Streaming增加了operational特性,比如支持backpressure.另外比较重要的更新就是新增加了一些机器学习算法和工具,