spark shuffle过程分析

spark shuffle流程分析

回到ShuffleMapTask.runTask函数

现在回到ShuffleMapTask.runTask函数中:

overridedef runTask(context:TaskContext):
MapStatus = {

首先得到要reduce的task的个数。

valnumOutputSplits=
dep.partitioner.numPartitions

metrics= Some(context.taskMetrics)

valblockManager=
SparkEnv.get.blockManager

valshuffleBlockManager=
blockManager.shuffleBlockManager

varshuffle:ShuffleWriterGroup
= null

varsuccess
=false

try{

得到对数据时行serializer操作的类,

//Obtain all the block writers for shuffle blocks.

valser
=SparkEnv.get.serializerManager.get(dep.serializerClass,SparkEnv.get.conf)

通过shuffleid与要进行reduce的task个数,生成ShuffleBlockId,

同时根据blockid生成ShuffleWriterGroup.shuffle的实现为DiskBlockObjectWriter。

通过spark.shuffle.consolidateFiles配置是否合并文件的输入。默认的为false,

合并文件设置为true,下次再有task在本机运行时,会直接打开当前输入的文件进行输入。

shuffle=
shuffleBlockManager.forMapTask(dep.shuffleId,partitionId,numOutputSplits,ser)

根据rdd的iterator取出数据,根据element的key重新进行partition,重新写入到shuffle的

//Write the map output to its associated buckets.

for(elem
<-rdd.iterator(split,context)) {

valpair
=elem.asInstanceOf[Product2[Any,Any]]

valbucketId
=dep.partitioner.getPartition(pair._1)

每一个partition都对应着一个DiskBlockObjectWriter,通过此实例的write函数,写入shuffle的数据。

也就是说,这个时候此RDD远行的task个数为core的个数,此时打开的文件个数为corenum*numpartition。

shuffle.writers(bucketId).write(pair)

}

//Commit the writes. Get the size of each bucket block (total blocksize).

vartotalBytes=
0L

vartotalTime
=0L

把这次打开的所有的文件全部commit,同时关闭文件的输入。

valcompressedSizes:Array[Byte]
= shuffle.writers.map{ writer: BlockObjectWriter =>

writer.commit()

writer.close()

valsize
=writer.fileSegment().length

totalBytes+=
size

totalTime+= writer.timeWriting()

MapOutputTracker.compressSize(size)

}

//Update shuffle metrics.

valshuffleMetrics=
newShuffleWriteMetrics

shuffleMetrics.shuffleBytesWritten=
totalBytes

shuffleMetrics.shuffleWriteTime=
totalTime

metrics.get.shuffleWriteMetrics=
Some(shuffleMetrics)

success=
true

newMapStatus(blockManager.blockManagerId,compressedSizes)

}catch{
casee:Exception =>

//If there is an exception from running the task, revert the partialwrites

//and throw the exception upstream to Spark.

if(shuffle
!=null&&
shuffle.writers!=
null){

for(writer
<-shuffle.writers){

writer.revertPartialWrites()

writer.close()

}

}

throwe

}finally{

//Release the writers back to the shuffle block manager.

if(shuffle
!=null&&
shuffle.writers!=
null){

shuffle.releaseWriters(success)

}

//Execute the
callbackson task completion.

context.executeOnCompleteCallbacks()

}

}

关于SparkEnv

在ShuffleMapTask.runTask中开始就通过SparkEnv.get去获取SparkEnv里面的内容,

SparkEnv中主要通过ThreadLocal来存储此实例,

此实例中包含Akkaactor,serializer,BlockManager,shuffle使用的MapoutputTracker等。

SparkEnv实例生成包含两部分,master与worker,

master是在sparkcontext生成时生成,worker是在executor生成时生成

因此现在我们来分析下这个类定义

针对每一个Worker中的executor会生成一个SparkEnv实例:

在Executor实例生成时,会执行发下代码:

设置当前executor的属性env为创建一个SparkEnv实例,此实例通过当前的executorId与当前的host生成。

privateval
env= {

if(!isLocal) {

val_env
=SparkEnv.create(conf,executorId, slaveHostname,
0,

isDriver =
false,isLocal =
false)

SparkEnv.set(_env)

_env.metricsSystem.registerSource(executorSource)

_env

}else{

SparkEnv.get

}

}

针对master启动时生成的SparkEnv实例:

通过在生成SparkContext实例时,生成SparkEnv属性:

private[spark]val
env= SparkEnv.create(

conf,

//注意:此处使用的是driver,表示这是一个driver程序(master),worker时这里传入的是具体的executorid

"<driver>",

conf.get("spark.driver.host"),

conf.get("spark.driver.port").toInt,

isDriver =
true,

isLocal =
isLocal)

SparkEnv.set(env)

生成的env实例,此实例是一个线程本地实例,每一个线程都有自己独立的SparkEnv

private
valenv =
newThreadLocal[SparkEnv]

声明可变的变量,用来存储最后变化的实例,通过sparkEnv.get时如果env不存在,会拿这个值

@volatileprivatevarlastSetSparkEnv:
SparkEnv = _

defset(e: SparkEnv) {

lastSetSparkEnv= e

env.set(e)

}

defget: SparkEnv = {

Option(env.get()).getOrElse(lastSetSparkEnv)

}

下面是sparkenv的create函数:

private[spark]def create(

conf: SparkConf,

executorId: String,

hostname: String,

port: Int,

isDriver: Boolean,

isLocal: Boolean): SparkEnv = {

val(actorSystem,boundPort)=
AkkaUtils.createActorSystem("spark",hostname, port,

conf = conf)

//Bit of a hack: If this is the driver and our port was 0 (meaning bindto any free port),

//figure out which port number
Akkaactually bound to and set spark.driver.port to it.

if(isDriver && port ==
0){

conf.set("spark.driver.port",
boundPort.toString)

}

valclassLoader=
Thread.currentThread.getContextClassLoader

//Create an instance of the class named by the given Java systemproperty, or by

//defaultClassName if the property is not set, and return it as a T

definstantiateClass[T](propertyName:
String, defaultClassName: String):T = {

valname
=conf.get(propertyName, defaultClassName)

Class.forName(name,true,classLoader).newInstance().asInstanceOf[T]

}

生成一个Serializermanager实例

valserializerManager=
newSerializerManager

得到配置的Serializer实例,这个地方有部分资料建议配置为org.apache.spark.serializer.KryoSerializer.

请参见http://spark.apache.org/docs/0.9.0/tuning.html的说明。

valserializer=
serializerManager.setDefault(

conf.get("spark.serializer","org.apache.spark.serializer.JavaSerializer"),conf)

闭包使用的serializer,如果闭包中函数使用了大量的对象,可修改默认的值

valclosureSerializer=
serializerManager.get(

conf.get("spark.closure.serializer","org.apache.spark.serializer.JavaSerializer"),

conf)

此部分检查是否是driver(也就是是否是master)

defregisterOrLookup(name: String, newActor:
=> Actor): ActorRef = {

如果是master时,生成一个actor的实例,

if(isDriver) {

logInfo("Registering" + name)

actorSystem.actorOf(Props(newActor),name = name)

}
else{

否则表示是worker,生成一个actor的引用。对指定的actor进行连接,生成actorref

valdriverHost:String
= conf.get("spark.driver.host","localhost")

valdriverPort:Int
= conf.getInt("spark.driver.port",7077)

Utils.checkHost(driverHost,"Expected
hostname")

valurl
=s"akka.tcp://[email protected]$driverHost:$driverPort/user/$name"

valtimeout
=AkkaUtils.lookupTimeout(conf)

logInfo(s"Connectingto $name:$url")

Await.result(actorSystem.actorSelection(url).resolveOne(timeout),timeout)

}

}

此处生成BlockManagerMaster实例,如果是driver时,

会生成一个名称为BlockManagerMaster的BlockManagerMasterActor实例。

否则表示是worker,生成BlockManagerMaster,并创建与master中的BlockManagerMasterActor的actorref引用。

BlockManagerMasterActor中通过配置spark.storage.blockManagerTimeoutIntervalMs,,默认值为60000ms

定期检查上面注册的BlockManagerId是否过期。

valblockManagerMaster=
newBlockManagerMaster(registerOrLookup(

"BlockManagerMaster",

newBlockManagerMasterActor(isLocal,
conf)), conf)

生成BlockManager,BlockManager中会生成ShuffleBlockManager,DiskBlockManager,memory/disk的store.

针对此BlockManager,生成一个BlockManagerId实例,

通过master的actor(BlockManagerMasterActor),向master注册此block,并定期向master发送心跳。

心跳的发送通过spark.storage.blockManagerTimeoutIntervalMs配置的值/4

valblockManager=
newBlockManager(executorId,

actorSystem,blockManagerMaster,serializer,conf)

valconnectionManager=
blockManager.connectionManager

valbroadcastManager=
newBroadcastManager(isDriver, conf)

生成CacheManager,

valcacheManager=
newCacheManager(blockManager)

生成MapOutputTracker,如果是master时,生成MapOutputTrackerMaster,否则生成MapOutputTracker

//Have to assign trackerActor after initialization asMapOutputTrackerActor

//requires the MapOutputTracker itself

valmapOutputTracker=
if(isDriver) {

newMapOutputTrackerMaster(conf)

}else{

newMapOutputTracker(conf)

}

如果是master时,生成MapOutputTrackerMasterActor实例,否则生成对actor的引用。

mapOutputTracker.trackerActor=
registerOrLookup(

"MapOutputTracker",

newMapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]))

生成ShuffleFetcher的实例,通过spark.shuffle.fetcher配置,默认为BlockStoreShuffleFetcher。

valshuffleFetcher=
instantiateClass[ShuffleFetcher](

"spark.shuffle.fetcher","org.apache.spark.BlockStoreShuffleFetcher")

valhttpFileServer=
newHttpFileServer()

httpFileServer.initialize()

conf.set("spark.fileserver.uri",
httpFileServer.serverUri)

valmetricsSystem=
if(isDriver) {

MetricsSystem.createMetricsSystem("driver",conf)

}else{

MetricsSystem.createMetricsSystem("executor",conf)

}

metricsSystem.start()

//Set the sparkFiles directory, used when downloading dependencies. Inlocal mode,

//this is a temporary directory; in distributed mode, this is theexecutor‘s current working

//directory.

valsparkFilesDir:String
= if(isDriver) {

Utils.createTempDir().getAbsolutePath

}else{

"."

}

//Warn about deprecated spark.cache.class property

if(conf.contains("spark.cache.class")){

logWarning("Thespark.cache.class property is no longer being used! Specify storage "+

"levelsusing the RDD.persist() method instead.")

}

newSparkEnv(

executorId,

actorSystem,

serializerManager,

serializer,

closureSerializer,

cacheManager,

mapOutputTracker,

shuffleFetcher,

broadcastManager,

blockManager,

connectionManager,

httpFileServer,

sparkFilesDir,

metricsSystem,

conf)

}

ShuffleBlockManager.forMapTask函数

shuffleBlockManager.forMapTask函数是shufflemaptask运行shuffle的核心函数,

此函数中会生成ShuffleWriterGroup实例,

并根据运行的task个数,通常是cpucore个数*reduce的partition个shuffle个文件,每一次的运行都会生成这么多个文件。

因此这部分会同时打开core*reduceparitionnum个file,每一个的maptask运行都会生成这么多个文件。

此部分完成后就会产生大量的mapoutput文件个数,总文件个数为maptasknum*reducetasknum个文件。

同时spark中为了控制文件的生成个数,可通过spark.shuffle.consolidateFiles配置是否重用write文件。默认为false,

如果此值设置为true,每一个worker通常只生成core*reducetasknum个文件。

每一个文件打开通过spark.shuffle.file.buffer.kb配置的缓存大小。默认为100kb,也就是一次运行中

每一个worker中会有core*reducetasknum*100kb的内存buffer的使用。由这部分我个人认为,

这玩意还是不合适maptask的任务太多的分析任务。Mapreduce的shuffle从性能上会比这要慢一些,

但是从对大数据量的支持上还是要好一些。

函数定义

defforMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer:Serializer)
= {

生成一个ShuffleWriterGroup实例

newShuffleWriterGroup {

shuffleStates.putIfAbsent(shuffleId,newShuffleState(numBuckets))

privatevalshuffleState=
shuffleStates(shuffleId)

privatevarfileGroup:ShuffleFileGroup
= null

如果spark.shuffle.consolidateFiles配置的值为true,检查是否有上次生成的writer文件,重新打开这个文件。

也就是在文件中进行append操作。

valwriters:Array[BlockObjectWriter]
= if(consolidateShuffleFiles){

fileGroup= getUnusedFileGroup()

Array.tabulate[BlockObjectWriter](numBuckets){ bucketId =>

valblockId
=ShuffleBlockId(shuffleId, mapId, bucketId)

blockManager.getDiskWriter(blockId,fileGroup(bucketId),serializer,
bufferSize)

}

}
else{

否则每一个task都会生成新的writer文件。

Array.tabulate[BlockObjectWriter](numBuckets){ bucketId =>

valblockId
=ShuffleBlockId(shuffleId, mapId, bucketId)

此处主要是通过sparkenv中的diskBlockMangaer来在指定的路径下生成文件。

路径通过spark.local.dir配置。默认为java.io.tmpdir。

valblockFile
=blockManager.diskBlockManager.getFile(blockId)

//Because of previous failures, the shuffle file may already exist onthis machine.

//If so, remove it.

if(blockFile.exists){

if(blockFile.delete()){

logInfo(s"Removedexisting shuffle file $blockFile")

}
else{

logWarning(s"Failedto remove existing shuffle file $blockFile")

}

}

blockManager.getDiskWriter(blockId,blockFile,serializer,
bufferSize)

}

}

这个函数在shuffleMapTask执行完成的时候调用。如果上面提到的配置为true时,

会把writer的blockfile放到一个容器中,下一次task运行时,会直接打开这个blockfile文件。

overridedefreleaseWriters(success:
Boolean) {

if(consolidateShuffleFiles){

if(success) {

valoffsets
=writers.map(_.fileSegment().offset)

fileGroup.recordMapOutput(mapId,offsets)

}

recycleFileGroup(fileGroup)

}
else{

shuffleState.completedMapTasks.add(mapId)

}

}

privatedefgetUnusedFileGroup():
ShuffleFileGroup = {

valfileGroup
=shuffleState.unusedFileGroups.poll()

if(fileGroup!=
null)fileGroupelsenewFileGroup()

}

privatedefnewFileGroup():
ShuffleFileGroup = {

valfileId
=shuffleState.nextFileId.getAndIncrement()

valfiles
=Array.tabulate[File](numBuckets) { bucketId =>

valfilename
=physicalFileName(shuffleId, bucketId, fileId)

blockManager.diskBlockManager.getFile(filename)

}

valfileGroup
=newShuffleFileGroup(fileId,shuffleId,
files)

shuffleState.allFileGroups.add(fileGroup)

fileGroup

}

privatedefrecycleFileGroup(group:
ShuffleFileGroup) {

shuffleState.unusedFileGroups.add(group)

}

}

}

DAGShuduler中注册shuffleid与mapStatus

在DAGSheduler的调度中,启动一个stage时,如果是shufflestage,会执行如下代码:

DAGsheduler.runjob-->submitJob-->JobSubmittedactor-->

newStage传入参数getParentStages-->getShuffleMapStage-->newOrUsedStage

privatedef newOrUsedStage(

rdd: RDD[_],

numTasks: Int,

shuffleDep:ShuffleDependency[_,_],

jobId: Int,

callSite: Option[String] = None)

:Stage =

{

valstage
=newStage(rdd, numTasks, Some(shuffleDep), jobId, callSite)

if(mapOutputTracker.has(shuffleDep.shuffleId)){

valserLocs
=mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)

vallocs
=MapOutputTracker.deserializeMapStatuses(serLocs)

for(i
<- 0until
locs.size)stage.outputLocs(i)=
List(locs(i))

stage.numAvailableOutputs=
locs.size

}else{

在master中注册此shuffleid

//Kind of ugly: need to register RDDs with the cache and map outputtracker here

//since we can‘t do it in the RDD constructor because # of partitionsis unknown

logInfo("RegisteringRDD " + rdd.id+
" ("+ rdd.origin+
")")

mapOutputTracker.registerShuffle(shuffleDep.shuffleId,rdd.partitions.size)

}

stage

}

回到dagsheduler的调度中,当shuffle的所有的task处理完成后,会调用如下代码:

....

execBackend.statusUpdate(taskId,TaskState.FINISHED,
serializedResult)

.....

casesmt: ShuffleMapTask =>

valstatus
=event.result.asInstanceOf[MapStatus]

valexecId
=status.location.executorId

logDebug("ShuffleMapTaskfinished on " +
execId)

if(failedEpoch.contains(execId)&&
smt.epoch<=
failedEpoch(execId)){

logInfo("Ignoringpossibly bogus ShuffleMapTask completion from "+
execId)

}
else{

第一个task完成后,都会把map返回的MapStatus(记录有location信息)记录到stage的outputloc中。

stage.addOutputLoc(smt.partitionId,status)

}

if(running.contains(stage)&&
pendingTasks(stage).isEmpty){

markStageAsFinished(stage)

logInfo("lookingfor newly runnable stages")

logInfo("running:" +
running)

logInfo("waiting:" +
waiting)

logInfo("failed:" +
failed)

if(stage.shuffleDep!=
None) {

.........................................

如果所有的shuffle的task都执行完成,把此stage对应的shuffled与所有的location注册到mapOutputTracker中

此处是通过DAGSheculer来完成的,因此,mapoutputtracker是一个MapOutputTrackerMaster的实现。

mapOutputTracker.registerMapOutputs(

stage.shuffleDep.get.shuffleId,

stage.outputLocs.map(list=>
if(list.isEmpty)
nullelselist.head).toArray,

changeEpoch =
true)

}

Shuffle的读取计算

此时shuffle的MAPRDD执行完成后,会通过PairRDDFunctions来做处理

回到PairRDDFunctions中的reduceByKey,

reduceByKey-->combineByKey

再次来看这个函数的定义

defcombineByKey[C](createCombiner: V => C,

mergeValue: (C, V) => C,

mergeCombiners: (C, C) => C,

partitioner: Partitioner,

mapSideCombine: Boolean =
true,

serializerClass: String =
null):RDD[(K, C)] = {

if(getKeyClass().isArray) {

if(mapSideCombine) {

thrownewSparkException("Cannot
use map-sidecombining with array keys.")

}

if(partitioner.isInstanceOf[HashPartitioner])
{

thrownewSparkException("Default
partitionercannot partition array keys.")

}

}

valaggregator=
newAggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)

如果当前的RDD的partitioner与传入的partitioner相等,表示是一个map,不需要进行shuffle,直接在map端合并。

if(self.partitioner==
Some(partitioner)) {

self.mapPartitionsWithContext((context,iter) => {

newInterruptibleIterator(context,
aggregator.combineValuesByKey(iter,context))

}, preservesPartitioning =
true)

}elseif(mapSideCombine)
{

如果设置有在map端先进行一次合并,类似于mapreduce中的combine,先在map端执行一次合并,

并生成MapPartitionsRDD

valcombined
=self.mapPartitionsWithContext((context, iter) => {

aggregator.combineValuesByKey(iter,context)

}, preservesPartitioning =
true)

生成一个ShuffledRDD实例,在reduce端执行合并操作。合并的核心函数是aggregator实例中定义的相关函数。

valpartitioned=
newShuffledRDD[K, C, (K, C)](combined,partitioner)

.setSerializer(serializerClass)

partitioned.mapPartitionsWithContext((context,iter)
=> {

newInterruptibleIterator(context,
aggregator.combineCombinersByKey(iter,context))

}, preservesPartitioning =
true)

}else{

不执行combiner操作,直接在reduce端进行shuffle操作。

//Don‘t apply map-side combiner.

valvalues
=newShuffledRDD[K, V, (K, V)](self,partitioner).setSerializer(serializerClass)

values.mapPartitionsWithContext((context,iter) => {

newInterruptibleIterator(context,
aggregator.combineValuesByKey(iter,context))

}, preservesPartitioning =
true)

}

}

在Reduce端,生成为ShuffledRDD。数据计算函数通过compute函数完成。

ShuffledRDD中计算函数的实现

overridedef compute(split:
Partition,context: TaskContext): Iterator[P] = {

valshuffledId=
dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId

通过指定的shuffledid,拿到shuffle完成的数据。

SparkEnv.get.shuffleFetcher.fetch[P](shuffledId,split.index,
context,

SparkEnv.get.serializerManager.get(serializerClass,SparkEnv.get.conf))

}

从SparkEnv中拿到shuffleFetcher的实例。从SparkEnv生成来看,

通过spark.shuffle.fetcher配置,默认为BlockStoreShuffleFetcher。

Sparkenv中的定义

valshuffleFetcher=
instantiateClass[ShuffleFetcher](

"spark.shuffle.fetcher","org.apache.spark.BlockStoreShuffleFetcher")

BlockStoreShuffleFetcher.fetch的函数:

overridedef fetch[T](

shuffleId: Int,

reduceId: Int,

context: TaskContext,

serializer: Serializer)

:Iterator[T] =

{

logDebug("Fetchingoutputs for shuffle %d, reduce %d".format(shuffleId,reduceId))

valblockManager=
SparkEnv.get.blockManager

valstartTime
=System.currentTimeMillis

在executor中的mapoutputtracker会通过GetMapOutputStatuses事件

向mapoutputtrackermaster中的MapOutputTrackerMasterActor发起得到所有的mapStatus事件。

valstatuses
=SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId,reduceId)

...........................

valsplitsByAddress=
newHashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]

把BlockManagerid相同的map结果进行合并,index的值就是map的partition

for(((address,size),index)
<-statuses.zipWithIndex){

splitsByAddress.getOrElseUpdate(address,ArrayBuffer())
+= ((index,size))

}

得到每一个map的输出文件的结果集地址,地址由shuffleid,mappartitionnum,reduceparttion组成。

valblocksByAddress:Seq[(BlockManagerId,
Seq[(BlockId, Long)])] =splitsByAddress.toSeq.map{

case(address,splits)
=>

(address,splits.map(s=>
(ShuffleBlockId(shuffleId, s._1,reduceId), s._2)))

}

defunpackBlock(blockPair: (BlockId,
Option[Iterator[Any]])) :Iterator[T] = {

valblockId
=blockPair._1

valblockOption=
blockPair._2

blockOptionmatch{

caseSome(block)=>
{

block.asInstanceOf[Iterator[T]]

}

caseNone => {

blockIdmatch{

caseShuffleBlockId(shufId,mapId,
_)=>

valaddress
=statuses(mapId.toInt)._1

thrownewFetchFailedException(address,shufId.toInt,mapId.toInt,reduceId,
null)

case_ =>

thrownewSparkException(

"Failedto get block " +
blockId+
", which is not a shuffle block")

}

}

}

}

通过blockManager从blockid中获取Iterator,用来得到数据

这里的blockManager中reduce进行shuffle的具体有两个实现,默认为BasicBlockFetcherIterator,

如果spark.shuffle.use.netty配置为true时,实现类为NettyBlockFetcherIterator。

在BasicBlockFetcherIterator中通过nio的方式使用sparkenv中的ConnectionManager来接收数据,

而NettyBlockFetcherIterator通过netty的通信框架进行操作,使用netty时,

通过reduce端spark.shuffle.copier.threads配置的线程数来获取数据,默认的线程个数为6.

valblockFetcherItr=
blockManager.getMultiple(blocksByAddress,serializer)

取出每一个blockid中的values部分的iterator.

valitr
=blockFetcherItr.flatMap(unpackBlock)

valcompletionIter=
CompletionIterator[T, Iterator[T]](itr,{

valshuffleMetrics=
newShuffleReadMetrics

shuffleMetrics.shuffleFinishTime=
System.currentTimeMillis

shuffleMetrics.remoteFetchTime=
blockFetcherItr.remoteFetchTime

shuffleMetrics.fetchWaitTime=
blockFetcherItr.fetchWaitTime

shuffleMetrics.remoteBytesRead=
blockFetcherItr.remoteBytesRead

shuffleMetrics.totalBlocksFetched=
blockFetcherItr.totalBlocks

shuffleMetrics.localBlocksFetched=
blockFetcherItr.numLocalBlocks

shuffleMetrics.remoteBlocksFetched=
blockFetcherItr.numRemoteBlocks

context.taskMetrics.shuffleReadMetrics=
Some(shuffleMetrics)

})

newInterruptibleIterator[T](context,
completionIter)

}

通过MapOutputTracker得到shuffle的stage的map完成的mapstatus

上面得到MapStatus的容器的函数定义

defgetServerStatuses(shuffleId: Int, reduceId: Int):Array[(BlockManagerId, Long)]
= {

检查executor本地是否有此shuffleid的mapstatuses信息,

valstatuses
=mapStatuses.get(shuffleId).orNull

如果本地还没有shuffle的状态数据(所有的shuffle完成的状态都需要从master中同步过来),

if(statuses==
null){

logInfo("Don‘thave map outputs for shuffle " + shuffleId +
",fetching them")

varfetchedStatuses:Array[MapStatus]
= null

出于线程安全考虑,

fetching.synchronized{

如果shuffleid已经在fetching中存在,等待shuffle从master获取MapStatus完成。

这里主要是为了多个task同时来获取数据,第一个task已经向master发起申请,

第二个就不需要在发起只需要等待第一个完成申请并得到数据存储到fetchedStatuses中。

if(fetching.contains(shuffleId)){

//Someone else is fetching it; wait for them to be done

while(fetching.contains(shuffleId)){

try{

fetching.wait()

}
catch{

casee:InterruptedException
=>

}

}

}

if(fetchedStatuses==
null){

//We
wonthe race to fetch the output
locs;do so

logInfo("Doingthe fetch; tracker actor = " +trackerActor)

//This try-finally prevents hangs due to timeouts:

try{

通过askTracker函数,通过actorref向MapoutputTrackerMasterActor发起GetMapOutputStatuses事件。

得到此stage完成的所有的task的MapStatus信息

valfetchedBytes=

askTracker(GetMapOutputStatuses(shuffleId)).asInstanceOf[Array[Byte]]

解析成fetchedStatuses数据。

fetchedStatuses= MapOutputTracker.deserializeMapStatuses(fetchedBytes)

logInfo("Gotthe output locations")

添加到executor中的MapStatuses容器中。缓存起来,共下一个task实例。

mapStatuses.put(shuffleId,fetchedStatuses)

}
finally{

从master中获取数据完成,把fetching中的shuffleid移出。

fetching.synchronized{

fetching-= shuffleId

fetching.notifyAll()

}

}

}

if(fetchedStatuses!=
null){

fetchedStatuses.synchronized{

通过指定的shuffleid与reduceid的值,得到此reduce在blockid中要获取数据的大小。

returnMapOutputTracker.convertMapStatuses(shuffleId,
reduceId,fetchedStatuses)

}

}

else{

thrownewFetchFailedException(null,shuffleId,
-1,reduceId,

newException("Missing
all outputlocations for shuffle " +shuffleId))

}

}else{

通过指定的shuffleid与reduceid的值,得到此reduce在blockid中要获取数据的大小。local的cache模式

statuses.synchronized{

returnMapOutputTracker.convertMapStatuses(shuffleId,
reduceId, statuses)

}

}

}

MapOutputTracker.convertMapStatuses函数

privatedef convertMapStatuses(

shuffleId: Int,

reduceId: Int,

statuses: Array[MapStatus]):Array[(BlockManagerId,Long)]
= {

assert (statuses !=
null)

statuses.map {

status =>

if(status ==
null){

thrownewFetchFailedException(null,shuffleId,
-1,reduceId,

newException("Missing
an outputlocation for shuffle " +shuffleId))

}
else{

取出MapStatus中,针对此reduce的partition中的shuffle的内容大小。

(status.location,decompressSize(status.compressedSizes(reduceId)))

}

}

}

........

spark shuffle过程分析,布布扣,bubuko.com

时间: 2025-01-07 10:28:08

spark shuffle过程分析的相关文章

spark源码阅读--shuffle过程分析

ShuffleManager(一) 本篇,我们来看一下spark内核中另一个重要的模块,Shuffle管理器ShuffleManager.shuffle可以说是分布式计算中最重要的一个概念了,数据的join,聚合去重等操作都需要这个步骤.另一方面,spark之所以比mapReduce的性能高其中一个主要的原因就是对shuffle过程的优化,一方面spark的shuffle过程更好地利用内存(也就是我们前面在分析内存管理时所说的执行内存),另一方面对于shuffle过程中溢写的磁盘文件归并排序和引

spark shuffle 内幕彻底解密课程

一:到底什么是Shuffle? Shuffle中文翻译为"洗牌",需要Shuffle的关键性原因是某种具有共同特征的数据需要最终汇聚到一个计算节点上进行计算. 二:Shuffle可能面临的问题?运行Task的时候才会产生Shuffle(Shuffle已经融化在Spark的算子中了). 1, 数据量非常大: 2, 数据如何分类,即如何Partition,Hash.Sort.钨丝计算: 3, 负载均衡(数据倾斜): 4, 网络传输效率,需要在压缩和解压缩之间做出权衡,序列化和反序列也是要考

Spark Shuffle过程详细分析

在MapReduce中shuffle和Spark的shuffle的过程有一些区别.这里做一下具体的介绍. Mapreduce的shuffle过程图解 Spark shuffle过程图解 注意:spark shuffle过程中没有分区和排序的过程,而且存储结果存储在内存中,所以速度要比mapreduce要快很多. 先就到这里吧,图解的说明应该比较清晰了.有问题欢迎留言

Spark Shuffle 堆外内存溢出问题与解决(Shuffle通信原理)

Spark Shuffle 堆外内存溢出问题与解决(Shuffle通信原理) 问题描述 Spark-1.6.0已经在一月份release,为了验证一下它的性能,我使用了一些大的SQL验证其性能,其中部分SQL出现了Shuffle失败问题,详细的堆栈信息如下所示: 16/02/17 15:36:36 WARN server.TransportChannelHandler: Exception in connection from /10.196.134.220:7337 java.lang.Out

spark性能调优(四) spark shuffle中JVM内存使用及配置内幕详情

转载:http://www.cnblogs.com/jcchoiling/p/6494652.html 引言 Spark 从1.6.x 开始对 JVM 的内存使用作出了一种全新的改变,Spark 1.6.x 以前是基于静态固定的JVM内存使用架构和运行机制,如果你不知道 Spark 到底对 JVM 是怎么使用,你怎么可以很有信心地或者是完全确定地掌握和控制数据的缓存空间呢,所以掌握Spark对JVM的内存使用内幕是至关重要的.很多人对 Spark 的印象是:它是基于内存的,而且可以缓存一大堆数据

Spark Shuffle数据处理过程与部分调优(源码阅读七)

shuffle...相当重要,为什么咩,因为shuffle的性能优劣直接决定了整个计算引擎的性能和吞吐量.相比于Hadoop的MapReduce,可以看到Spark提供多种计算结果处理方式,对shuffle过程进行了优化. 那么我们从RDD的iterator方法开始: 我们可以看到,它调用了cacheManager的getOrCompute方法,如果分区任务第一次执行还没有缓存,那么会调用computeOrReadCheckpoint.如果某个partition任务执行失败,可以利用DAG重新调

spark shuffle内在原理说明

在MapReduce框架中,shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过shuffle这个环节,shuffle的性能高低直接影响了整个程序的性能和吞吐量.Spark作为MapReduce框架的一种实现,自然也实现了shuffle的逻辑. Shuffle Shuffle是MapReduce框架中的一个特定的phase,介于Map phase和Reduce phase之间,当Map的输出结果要被Reduce使用时,输出结果需要按key哈希,并且分发到每

Spark Shuffle的技术演进

在Spark或Hadoop MapReduce的分布式计算框架中,数据被按照key分成一块一块的分区,打散分布在集群中各个节点的物理存储或内存空间中,每个计算任务一次处理一个分区,但map端和reduce端的计算任务并非按照一种方式对相同的分区进行计算,例如,当需要对数据进行排序时,就需要将key相同的数据分布到同一个分区中,原分区的数据需要被打乱重组,这个按照一定的规则对数据重新分区的过程就是Shuffle(洗牌). Spark Shuffle的两阶段 对于Spark来讲,一些Transfor

Spark Shuffle Write 阶段函数调用分析

Shuffle Write阶段函数调用如下: org.apache.spark.executor.run() --> org.apache.spark.scheduler.Task.run() --> org.apache.spark.scheduler.runTask() --> org.apache.spark.shuffle.hash.HashShuffleWriter.write() --> org.apache.spark.storage.DiskBlockObjectW