“RDD是由不同的partition组成的,transformation和action是在partition上面进行的;而在storage模块内部,RDD又被视为由不同的block组成,对于RDD的存取是以block为单位进行的,本质上partition和block是等价的,只是看待的角度不同。在Spark storage模块中中存取数据的最小单位是block,所有的操作都是以block为单位进行的。”
BlockManager中定义了三种主要的存储类型(tackyonStore暂且不做分析)
private[storage] val memoryStore = new MemoryStore(this, maxMemory) private[storage] val diskStore = new DiskStore(this, diskBlockManager) private[storage] lazy val tachyonStore: TachyonStore
一、DiskStore
首先看diskStore,实例化DiskStore时带入diskBlockManager参数
val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
DiskBlockManager初始化时,类中为spark.local.dir中的每个路径创建一个本地目录,在这些目录中,创建多个子目录hash存放文件,避免在顶层目录存在大的索引节点
// Create one local directory for each path mentioned in spark.local.dir; then, inside this // directory, create multiple subdirectories that we will hash files into, in order to avoid // having really large inodes at the top level. private val localDirs: Array[File] = createLocalDirs() private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
createLocalDir创建本地目录,每个本地目录其实是一个文件夹,文件夹以“spark-local“+”日期”+“随机整数“形式命名,block以文件的形式存放在localDir中
private def createLocalDirs(): Array[File] = { logDebug("Creating local directories at root dirs ‘" + rootDirs + "‘") val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") rootDirs.split(",").map { rootDir => var foundLocalDir = false var localDir: File = null var localDirId: String = null var tries = 0 val rand = new Random() while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) { tries += 1 try { localDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536)) localDir = new File(rootDir, "spark-local-" + localDirId) if (!localDir.exists) { foundLocalDir = localDir.mkdirs() } } catch { case e: Exception => logWarning("Attempt " + tries + " to create local dir " + localDir + " failed", e) } } if (!foundLocalDir) { logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + " attempts to create local dir in " + rootDir) System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) } logInfo("Created local directory at " + localDir) localDir } }
所以,DiskStoreManager中对于block的所有操作,都归结为对于File的存取操作,即维护和创建逻辑Block和物理地址的映射。默认的,一个block映射为该blockID命名的文件。
getFile方法创建这种映射,根据filename,hash得到子文件夹,将filename映射到其中
def getFile(filename: String): File = { // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) val dirId = hash % localDirs.length val subDirId = (hash / localDirs.length) % subDirsPerLocalDir // Create the subdirectory if it doesn‘t already exist var subDir = subDirs(dirId)(subDirId) if (subDir == null) { subDir = subDirs(dirId).synchronized { val old = subDirs(dirId)(subDirId) if (old != null) { old } else { val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) newDir.mkdir() subDirs(dirId)(subDirId) = newDir newDir } } } new File(subDir, filename) }
存入数据时:
BlockManager对于diskStore,根据不同的值类型,有两种操作putValues和putBytes
putValues通过blockId得到对应file,根据file创建FileOutputStream,然后序列化写入values
override def putValues( blockId: BlockId, values: Iterator[Any], level: StorageLevel, returnValues: Boolean) : PutResult = { logDebug("Attempting to write values for block " + blockId) val startTime = System.currentTimeMillis val file = diskManager.getFile(blockId) val outputStream = new FileOutputStream(file) blockManager.dataSerializeStream(blockId, outputStream, values) val length = file.length val timeTaken = System.currentTimeMillis - startTime logDebug("Block %s stored as %s file on disk in %d ms".format( file.getName, Utils.bytesToString(length), timeTaken)) if (returnValues) { // Return a byte buffer for the contents of the file val buffer = getBytes(blockId).get PutResult(length, Right(buffer)) } else { PutResult(length, null) } }
putBytes则通过blockId得到file, 根据file获得通道channel,然后写入bytes
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) : PutResult = { // So that we do not modify the input offsets ! // duplicate does not copy buffer, so inexpensive val bytes = _bytes.duplicate() logDebug("Attempting to put block " + blockId) val startTime = System.currentTimeMillis val file = diskManager.getFile(blockId) val channel = new FileOutputStream(file).getChannel() while (bytes.remaining > 0) { channel.write(bytes) } channel.close() val finishTime = System.currentTimeMillis logDebug("Block %s stored as %s file on disk in %d ms".format( file.getName, Utils.bytesToString(bytes.limit), (finishTime - startTime))) return PutResult(bytes.limit(), Right(bytes.duplicate())) }
读取数据时:
执行getValues和getBytes,getValues也调用getBytes
override def getValues(blockId: BlockId): Option[Iterator[Any]] = { getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer)) }
getBytes通过blockId找到文件位置,然后建立channel,对于小文件,直接读入栈中,大文件则映射到内存中
override def getBytes(blockId: BlockId): Option[ByteBuffer] = { val segment = diskManager.getBlockLocation(blockId) val channel = new RandomAccessFile(segment.file, "r").getChannel() try { // For small files, directly read rather than memory map if (segment.length < minMemoryMapBytes) { val buf = ByteBuffer.allocate(segment.length.toInt) channel.read(buf, segment.offset) buf.flip() Some(buf) } else { Some(channel.map(MapMode.READ_ONLY, segment.offset, segment.length)) } } finally { channel.close() } }
二、MemoryStore
MemoryStore在内存中存放blocks,不是作为反序列化的Java对象的ArrayBuffer,就是作为序列化的ByteBuffer。与diskStore需要创建本地目录相比,memoryStore实例化时,创建一个LinkedHashMap,以此维护BlockId和block entry的映射
case class Entry(value: Any, size: Long, deserialized: Boolean) private val entries = new LinkedHashMap[BlockId, Entry](32, 0.75f, true)
存入数据时:
类似的,BlockManager对于memoryStore,也有两种操作putValues和putBytes
putValues首先估计存入实例的大小,然后调用tryToPut尝试放入内存中内存
override def putValues( blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel, returnValues: Boolean): PutResult = { if (level.deserialized) { val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef]) val putAttempt = tryToPut(blockId, values, sizeEstimate, deserialized = true) PutResult(sizeEstimate, Left(values.iterator), putAttempt.droppedBlocks) } else { val bytes = blockManager.dataSerialize(blockId, values.iterator) val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false) PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks) } }
putBytes同样估计存入实例的大小,然后调用tryToPut尝试存入内存
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = { // Work on a duplicate - since the original input might be used elsewhere. val bytes = _bytes.duplicate() bytes.rewind() if (level.deserialized) { val values = blockManager.dataDeserialize(blockId, bytes) val elements = new ArrayBuffer[Any] elements ++= values val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef]) tryToPut(blockId, elements, sizeEstimate, true) PutResult(sizeEstimate, Left(values.toIterator)) } else { tryToPut(blockId, bytes, bytes.limit, false) PutResult(bytes.limit(), Right(bytes.duplicate())) } }
两种操作最终都调用tryToPut,putLock用来确保所有的存放请求和相关block存入只被唯一的线程完成,否则,当一个线程正在向空闲内存存放block时,另一个线程可能也在用同一块空闲内存存放不同的block。调用ensureFreeSpace确保内存中有足够的空间存放block。如果memory空间充足,新建entry,加入LinkedHashMap;否则,调用dropFromMemory将block落入磁盘
/** * Try to put in a set of values, if we can free up enough space. The value should either be * an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) * size must also be passed by the caller. * * Lock on the object putLock to ensure that all the put requests and its associated block * dropping is done by only on thread at a time. Otherwise while one thread is dropping * blocks to free memory for one block, another thread may use up the freed space for * another block. * * Return whether put was successful, along with the blocks dropped in the process. */ private def tryToPut( blockId: BlockId, value: Any, size: Long, deserialized: Boolean): ResultWithDroppedBlocks = { /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has * been released, it must be ensured that those to-be-dropped blocks are not double counted * for freeing up more space for another block that needs to be put. Only then the actually * dropping of blocks (and writing to disk if necessary) can proceed in parallel. */ var putSuccess = false val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] putLock.synchronized { val freeSpaceResult = ensureFreeSpace(blockId, size) val enoughFreeSpace = freeSpaceResult.success droppedBlocks ++= freeSpaceResult.droppedBlocks if (enoughFreeSpace) { val entry = new Entry(value, size, deserialized) entries.synchronized { entries.put(blockId, entry) currentMemory += size } if (deserialized) { logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory))) } else { logInfo("Block %s stored as bytes to memory (size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory))) } putSuccess = true } else { // Tell the block manager that we couldn‘t put it in memory so that it can drop it to // disk if the block allows disk storage. val data = if (deserialized) { Left(value.asInstanceOf[ArrayBuffer[Any]]) } else { Right(value.asInstanceOf[ByteBuffer].duplicate()) } val droppedBlockStatus = blockManager.dropFromMemory(blockId, data) droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } } } ResultWithDroppedBlocks(putSuccess, droppedBlocks) }
dropFromMemory同样调用diskstore.putValue或diskstore.putBytes将blocks存入硬盘,当然首先需要判断存储级别是否使用硬盘,最后从memoryStore中删除blockId信息。如果存储级别不使用硬盘,则直接移除blockId
/** * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory * store reaches its limit and needs to free up space. * * Return the block status if the given block has been updated, else None. */ def dropFromMemory( blockId: BlockId, data: Either[ArrayBuffer[Any], ByteBuffer]): Option[BlockStatus] = { logInfo("Dropping block " + blockId + " from memory") val info = blockInfo.get(blockId).orNull // If the block has not already been dropped if (info != null) { info.synchronized { // required ? As of now, this will be invoked only for blocks which are ready // But in case this changes in future, adding for consistency sake. if (!info.waitForReady()) { // If we get here, the block write failed. logWarning("Block " + blockId + " was marked as failure. Nothing to drop") return None } var blockIsUpdated = false val level = info.level // Drop to disk, if storage level requires if (level.useDisk && !diskStore.contains(blockId)) { logInfo("Writing block " + blockId + " to disk") data match { case Left(elements) => diskStore.putValues(blockId, elements, level, false) case Right(bytes) => diskStore.putBytes(blockId, bytes, level) } blockIsUpdated = true } // Actually drop from memory store val droppedMemorySize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L val blockIsRemoved = memoryStore.remove(blockId) if (blockIsRemoved) { blockIsUpdated = true } else { logWarning("Block " + blockId + " could not be dropped from memory as it does not exist") } val status = getCurrentBlockStatus(blockId, info) if (info.tellMaster) { reportBlockStatus(blockId, info, status, droppedMemorySize) } if (!level.useDisk) { // The block is completely gone from this node; forget it so we can put() it again later. blockInfo.remove(blockId) } if (blockIsUpdated) { return Some(status) } } } None }
读取数据时:
相应的两个方法getValues和getBytes类似,都是先从entry中获得blockId,然后从LinkedHashMap中根据BlockId得到对应的数据。getValues如下
override def getValues(blockId: BlockId): Option[Iterator[Any]] = { val entry = entries.synchronized { entries.get(blockId) } if (entry == null) { None } else if (entry.deserialized) { Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator) } else { val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn‘t actually copy data Some(blockManager.dataDeserialize(blockId, buffer)) } }
getBytes如下
override def getBytes(blockId: BlockId): Option[ByteBuffer] = { val entry = entries.synchronized { entries.get(blockId) } if (entry == null) { None } else if (entry.deserialized) { Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)) } else { Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn‘t actually copy the data } }
三、BlockManager封装
BlockManager
为我们提供了doPut和doGet方法
,使用这两个方法对block进行存取操作,无需关心底层实现
存入操作:
对于三种不同的数据类型:Iterator, ArrayBuffer和ByteBuffer,有三种不同的put操作相对应,但是统一调用doPut方法
def put( blockId: BlockId, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean): Seq[(BlockId, BlockStatus)] = { doPut(blockId, IteratorValues(values), level, tellMaster) }
def put( blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel, tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = { require(values != null, "Values is null") doPut(blockId, ArrayBufferValues(values), level, tellMaster)}
def putBytes( blockId: BlockId, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = { require(bytes != null, "Bytes is null") doPut(blockId, ByteBufferValues(bytes), level, tellMaster)}
doPut方法,要求blockId和storageLevel不能为空,为block创建BlockInfo实例,同时在blockInfo中将其加锁,使其他线程不能get访问此block。然后根据storageLevel将数据存储到memory或者disk上,然后markReady使其可以被其他线程读取。最后,如果level.replication大于1,调用replicate将该block复制到其他节点
private def doPut( blockId: BlockId, data: Values, level: StorageLevel, tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = { require(blockId != null, "BlockId is null") require(level != null && level.isValid, "StorageLevel is null or invalid") // Return value val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] // Remember the block‘s storage level so that we can correctly drop it to disk if it needs // to be dropped right after it got put into memory. Note, however, that other threads will // not be able to get() this block until we call markReady on its BlockInfo. val putBlockInfo = { val tinfo = new BlockInfo(level, tellMaster) // Do atomically ! val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) if (oldBlockOpt.isDefined) { if (oldBlockOpt.get.waitForReady()) { logWarning("Block " + blockId + " already exists on this machine; not re-adding it") return updatedBlocks } // TODO: So the block info exists - but previous attempt to load it (?) failed. // What do we do now ? Retry on it ? oldBlockOpt.get } else { tinfo } } val startTimeMs = System.currentTimeMillis // If we‘re storing values and we need to replicate the data, we‘ll want access to the values, // but because our put will read the whole iterator, there will be no values left. For the // case where the put serializes data, we‘ll remember the bytes, above; but for the case where // it doesn‘t, such as deserialized storage, let‘s rely on the put returning an Iterator. var valuesAfterPut: Iterator[Any] = null // Ditto for the bytes after the put var bytesAfterPut: ByteBuffer = null // Size of the block in bytes var size = 0L // If we‘re storing bytes, then initiate the replication before storing them locally. // This is faster as data is already serialized and ready to send. val replicationFuture = if (data.isInstanceOf[ByteBufferValues] && level.replication > 1) { // Duplicate doesn‘t copy the bytes, just creates a wrapper val bufferView = data.asInstanceOf[ByteBufferValues].buffer.duplicate() Future { replicate(blockId, bufferView, level) } } else { null } putBlockInfo.synchronized { logTrace("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) + " to get into synchronized block") var marked = false try { if (level.useMemory) { // Save it just to memory first, even if it also has useDisk set to true; we will // drop it to disk later if the memory store can‘t hold it. val res = data match { case IteratorValues(iterator) => memoryStore.putValues(blockId, iterator, level, true) case ArrayBufferValues(array) => memoryStore.putValues(blockId, array, level, true) case ByteBufferValues(bytes) => bytes.rewind() memoryStore.putBytes(blockId, bytes, level) } size = res.size res.data match { case Right(newBytes) => bytesAfterPut = newBytes case Left(newIterator) => valuesAfterPut = newIterator } // Keep track of which blocks are dropped from memory res.droppedBlocks.foreach { block => updatedBlocks += block } } else if (level.useOffHeap) { // Save to Tachyon. val res = data match { case IteratorValues(iterator) => tachyonStore.putValues(blockId, iterator, level, false) case ArrayBufferValues(array) => tachyonStore.putValues(blockId, array, level, false) case ByteBufferValues(bytes) => bytes.rewind() tachyonStore.putBytes(blockId, bytes, level) } size = res.size res.data match { case Right(newBytes) => bytesAfterPut = newBytes case _ => } } else { // Save directly to disk. // Don‘t get back the bytes unless we replicate them. val askForBytes = level.replication > 1 val res = data match { case IteratorValues(iterator) => diskStore.putValues(blockId, iterator, level, askForBytes) case ArrayBufferValues(array) => diskStore.putValues(blockId, array, level, askForBytes) case ByteBufferValues(bytes) => bytes.rewind() diskStore.putBytes(blockId, bytes, level) } size = res.size res.data match { case Right(newBytes) => bytesAfterPut = newBytes case _ => } } val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) if (putBlockStatus.storageLevel != StorageLevel.NONE) { // Now that the block is in either the memory, tachyon, or disk store, // let other threads read it, and tell the master about it. marked = true putBlockInfo.markReady(size) if (tellMaster) { reportBlockStatus(blockId, putBlockInfo, putBlockStatus) } updatedBlocks += ((blockId, putBlockStatus)) } } finally { // If we failed in putting the block to memory/disk, notify other possible readers // that it has failed, and then remove it from the block info map. if (!marked) { // Note that the remove must happen before markFailure otherwise another thread // could‘ve inserted a new BlockInfo before we remove it. blockInfo.remove(blockId) putBlockInfo.markFailure() logWarning("Putting block " + blockId + " failed") } } } logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs)) // Either we‘re storing bytes and we asynchronously started replication, or we‘re storing // values and need to serialize and replicate them now: if (level.replication > 1) { data match { case ByteBufferValues(bytes) => Await.ready(replicationFuture, Duration.Inf) case _ => { val remoteStartTime = System.currentTimeMillis // Serialize the block if not already done if (bytesAfterPut == null) { if (valuesAfterPut == null) { throw new SparkException( "Underlying put returned neither an Iterator nor bytes! This shouldn‘t happen.") } bytesAfterPut = dataSerialize(blockId, valuesAfterPut) } replicate(blockId, bytesAfterPut, level) logDebug("Put block " + blockId + " remotely took " + Utils.getUsedTimeMs(remoteStartTime)) } } } BlockManager.dispose(bytesAfterPut) if (level.replication > 1) { logDebug("Put for block " + blockId + " with replication took " + Utils.getUsedTimeMs(startTimeMs)) } else { logDebug("Put for block " + blockId + " without replication took " + Utils.getUsedTimeMs(startTimeMs)) } updatedBlocks }
读取操作:
get首先根据blockId调用getLocal从本地获取block,如果不能得到,则调用getRemote从其他节点BlockManger获取block。“在通常情况下Spark任务的分配是根据block的分布决定的,任务往往会被分配到拥有block的节点上,因此getLocal()就能找到所需的block;但是在资源有限的情况下,Spark会将任务调度到与block不同的节点上,这样就必须通过getRemote()来获得block。”
def get(blockId: BlockId): Option[Iterator[Any]] = { val local = getLocal(blockId) if (local.isDefined) { logInfo("Found block %s locally".format(blockId)) return local } val remote = getRemote(blockId) if (remote.isDefined) { logInfo("Found block %s remotely".format(blockId)) return remote } None }
看一下getLocal,调用doGetLocal
/** * Get block from local block manager. */ def getLocal(blockId: BlockId): Option[Iterator[Any]] = { logDebug("Getting local block " + blockId) doGetLocal(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]] }
doGetLocal首先判断storageLevel如果使用内存,则读入block;然后判断是否使用硬盘,如果使用硬盘且使用内存,则将数据读入内存中,如果只使用硬盘不使用内存,则读取block并返回;如果不使用硬盘,block未在本地找到
private def doGetLocal(blockId: BlockId, asValues: Boolean): Option[Any] = { val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { // If another thread is writing the block, wait for it to become ready. if (!info.waitForReady()) { // If we get here, the block write failed. logWarning("Block " + blockId + " was marked as failure.") return None } val level = info.level logDebug("Level for block " + blockId + " is " + level) // Look for the block in memory if (level.useMemory) { logDebug("Getting block " + blockId + " from memory") val result = if (asValues) { memoryStore.getValues(blockId) } else { memoryStore.getBytes(blockId) } result match { case Some(values) => return Some(values) case None => logDebug("Block " + blockId + " not found in memory") } } // Look for the block in Tachyon if (level.useOffHeap) { logDebug("Getting block " + blockId + " from tachyon") if (tachyonStore.contains(blockId)) { tachyonStore.getBytes(blockId) match { case Some(bytes) => { if (!asValues) { return Some(bytes) } else { return Some(dataDeserialize(blockId, bytes)) } } case None => logDebug("Block " + blockId + " not found in tachyon") } } } // Look for block on disk, potentially storing it back into memory if required: if (level.useDisk) { logDebug("Getting block " + blockId + " from disk") val bytes: ByteBuffer = diskStore.getBytes(blockId) match { case Some(bytes) => bytes case None => throw new Exception("Block " + blockId + " not found on disk, though it should be") } assert (0 == bytes.position()) if (!level.useMemory) { // If the block shouldn‘t be stored in memory, we can just return it: if (asValues) { return Some(dataDeserialize(blockId, bytes)) } else { return Some(bytes) } } else { // Otherwise, we also have to store something in the memory store: if (!level.deserialized || !asValues) { // We‘ll store the bytes in memory if the block‘s storage level includes // "memory serialized", or if it should be cached as objects in memory // but we only requested its serialized bytes: val copyForMemory = ByteBuffer.allocate(bytes.limit) copyForMemory.put(bytes) memoryStore.putBytes(blockId, copyForMemory, level) bytes.rewind() } if (!asValues) { return Some(bytes) } else { val values = dataDeserialize(blockId, bytes) if (level.deserialized) { // Cache the values before returning them: // TODO: Consider creating a putValues that also takes in a iterator? val valuesBuffer = new ArrayBuffer[Any] valuesBuffer ++= values memoryStore.putValues(blockId, valuesBuffer, level, true).data match { case Left(values2) => return Some(values2) case _ => throw new Exception("Memory store did not return back an iterator") } } else { return Some(values) } } } } } } else { logDebug("Block " + blockId + " not registered locally") } None }
再看一下getRemote,调用doGetRemote
/** * Get block from remote block managers. */ def getRemote(blockId: BlockId): Option[Iterator[Any]] = { logDebug("Getting remote block " + blockId) doGetRemote(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]] }
doGetRemote首先取得该block的所有location信息,然后根据location向远端发送请求获取block,只要有一个远端返回block该函数就返回而不继续发送请求
private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = { require(blockId != null, "BlockId is null") val locations = Random.shuffle(master.getLocations(blockId)) for (loc <- locations) { logDebug("Getting remote block " + blockId + " from " + loc) val data = BlockManagerWorker.syncGetBlock( GetBlock(blockId), ConnectionManagerId(loc.host, loc.port)) if (data != null) { if (asValues) { return Some(dataDeserialize(blockId, data)) } else { return Some(data) } } logDebug("The value of block " + blockId + " is null") } logDebug("Block " + blockId + " not found") None }
四、Partition和Block
最后,说明Partition是如何转化为block存储的。资源调度——Task执行中曾经分析过,对于RDD的一系列transformation或action,将转化为对于partitions的tasks的执行,而最后是调用getOrCompute方法。getOrCompute首先根据RDD id和partition index构造出key(blockId),根据key从BlockManager
中取出相应的block。如果该block存在,表示此RDD在之前已经被计算过并存储在BlockManager
中,可以直接读取无需再重新计算。如果该block不存在,则需要调用RDD的computeOrReadCheckpoint
方法,读取checkpoint或者计算得到新的block,并将其存储到BlockManager
中。需要注意的是block的计算和存储是阻塞的,若另一线程也需要用到此block则需等到该线程block的loading结束。
END