RDD的依赖关系

RDD的依赖关系

Rdd之间的依赖关系通过rdd中的getDependencies来进行表示,

在提交job后,会通过在DAGShuduler.submitStage-->getMissingParentStages

privatedefgetMissingParentStages(stage:
Stage): List[Stage] = {

valmissing
=newHashSet[Stage]

valvisited
=newHashSet[RDD[_]]

defvisit(rdd: RDD[_]) {

if(!visited(rdd)){

visited+= rdd

if(getCacheLocs(rdd).contains(Nil)){

for(dep
<-rdd.dependencies) {

depmatch{

caseshufDep:ShuffleDependency[_,_]
=>

valmapStage
=getShuffleMapStage(shufDep,stage.jobId)

if(!mapStage.isAvailable){

missing+=
mapStage

}

casenarrowDep:NarrowDependency[_]
=>

visit(narrowDep.rdd)

}

}

}

}

}

visit(stage.rdd)

missing.toList

}

在以上代码中得到rdd的相关dependencies,每一个rdd生成时传入rdd的dependencies信息。

如SparkContext.textFile,时生成的HadoopRDD时。此RDD的默认为dependencys为Nil.

Nil是一个空的列表。

classHadoopRDD[K, V](

sc: SparkContext,

broadcastedConf:Broadcast[SerializableWritable[Configuration]],

initLocalJobConfFuncOpt:Option[JobConf => Unit],

inputFormatClass: Class[_ <:InputFormat[K, V]],

keyClass: Class[K],

valueClass: Class[V],

minSplits: Int)

extendsRDD[(K,
V)](sc, Nil) withLogging {

Dependency分为ShuffleDependency与NarrowDependency。

其中NarrowDependency又包含OneToOneDependency/RangeDependency

Dependency唯一的成员就是rdd,即所依赖的rdd,或parentrdd

abstractclassDependency[T](valrdd:RDD[T])
extendsSerializable

OneToOneDependency关系:

最简单的依赖关系,即parent和child里面的partitions是一一对应的,典型的操作就是map,filter

其实partitionId就是partition在RDD中的序号,所以如果是一一对应,

那么parent和child中的partition的序号应该是一样的,如下是OneToOneDependency的定义

/**

*Represents a one-to-one dependency between partitions of the parentand child RDDs.

*/

classOneToOneDependency[T](rdd: RDD[T])
extendsNarrowDependency[T](rdd) {

此类的Dependency中parent中的partitionId与childRDD中的partitionId是一对一的关系。

也就是partition本身范围不会改变,一个parition经过transform还是一个partition,

虽然内容发生了变化,所以可以在local完成,此类场景通常像mapreduce中只有map的场景,

第一个RDD执行完成后的MAP的parition直接运行第二个RDD的Map,也就是local执行。

overridedefgetParents(partitionId:
Int) = List(partitionId)

}

RangeDependency关系:

此类应用虽然仍然是一一对应,但是是parentRDD中的某个区间的partitions对应到childRDD中的某个区间的partitions

典型的操作是union,多个parentRDD合并到一个childRDD,
故每个parentRDD都对应到childRDD中的一个区间

需要注意的是,这里的union不会把多个partition合并成一个partition,而是的简单的把多个RDD中的partitions放到一个RDD里面,partition不会发生变化,

rdd参数,parentRDD

inStart参数,parentRDD的partitionId计算的起点位置。

outStart参数,childRDD中计算parentRDD的partitionId的起点位置,

length参数,parentRDD中partition的个数。

classRangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length:Int)

extendsNarrowDependency[T](rdd) {

overridedefgetParents(partitionId:
Int) = {

检查partitionId的合理性,此partitionId在childRDD的partitionId中的范围需要合理。

if(partitionId >= outStart && partitionId
< outStart +length) {

计算出ParentRDD的partitionId的值。

List(partitionId - outStart +inStart)

}else{

Nil

}

}

}

典型的应用场景union的场景把两个RDD合并到一个新的RDD中。

defunion(other: RDD[T]): RDD[T] =
newUnionRDD(sc,
Array(this,other))

使用union的,第二个参数是,两个RDD的array,返回值就是把这两个RDDunion后产生的新的RDD

ShuffleDependency关系:

此类依赖首先要求是Product2与PairRDDFunctions的k,v的形式,这样才能做shuffle,和hadoop一样。

其次,由于需要shuffle,所以当然需要给出partitioner,默认是HashPartitioner如何完成shuffle

然后,shuffle不象map可以在local进行,往往需要网络传输或存储,所以需要serializerClass

默认是JavaSerializer,一个类名,用于序列化网络传输或者以序列化形式缓存起来的各种对象。

默认情况下Java的序列化机制可以序列化任何实现了Serializable接口的对象,

但是速度是很慢的,

因此当你在意运行速度的时候我们建议你使用spark.KryoSerializer并且配置Kryoserialization。

可以是任何spark.Serializer的子类。

最后,每个shuffle需要分配一个全局的id,context.newShuffleId()的实现就是把全局id累加

classShuffleDependency[K, V](

@transientrdd: RDD[_ <: Product2[K, V]],

valpartitioner:Partitioner,

valserializerClass:String
= null)

extendsDependency(rdd.asInstanceOf[RDD[Product2[K,
V]]]) {

valshuffleId:Int
= rdd.context.newShuffleId()

}

生成RDD过程分析

生成rdd我们还是按wordcount中的例子来说明;

val
file= sc.textFile("/hadoop-test.txt")

valcounts
=file.flatMap(line=> line.split(" "))

.map(word => (word,1)).reduceByKey(_+ _)

counts.saveAsTextFile("/newtest.txt")

1.首先SparkContext.textFile通过调用hadoopFile生成HadoopRDD实例,

textFile-->hadoopFile-->HadoopRDD,此时RDD的Dependency为Nil,一个空的列表。

此时的HadoopRDD为RDD<K,V>,每执行next方法时返回一个Pair,也就是一个KV(通过compute函数)

2.textFile得到HadoopRDD后,调用map函数,

map中每执行一次得到一个KV(compute中getNext,newNextIterator[(K,
V)] ),

取出value的值并toString,生成MappedRDD<String>。此RDD的上层RDD就是1中生成的RDD。

同时此RDD的Dependency为OneToOneDependency。

deftextFile(path: String, minSplits:
Int = defaultMinSplits):RDD[String] = {

hadoopFile(path,classOf[TextInputFormat], classOf[LongWritable], classOf[Text],

minSplits).map(pair =>pair._2.toString)

}

defmap[U: ClassTag](f: T => U): RDD[U]
= newMappedRDD(this,sc.clean(f))

以上代码中传入的this其实就是1中生成的HadoopRDD.

3.flatMap函数,把2中每一行输出通过一定的条件修改成0到多个新的item.生成FlatMappedRDD实例,

同时根据implicit隐式转换生成PairRDDFunctions。下面两处代码中的红色部分。

在生成FlatMappedRDD是,此时的上一层RDD就是2中生成的RDD。

同时此RDD的Dependency为OneToOneDependency。

classFlatMappedRDD[U:
ClassTag, T: ClassTag](

prev: RDD[T],

f:T => TraversableOnce[U])

extendsRDD[U](prev)

implicitdefrddToPairRDDFunctions[K:
ClassTag, V:ClassTag](rdd: RDD[(K, V)]) =

newPairRDDFunctions(rdd)

4.map函数,由于3中生成的FlatMappedRDD生成出来的结果,通过implicit的隐式转换生成PairRDDFunctions。

此时的map函数需要生成隐式转换传入的RDD<K,V>的一个RDD,

因此map函数的执行需要生成一个MappedRDD<K,V>的RDD,同时此RDD的Dependency为OneToOneDependency。

以下代码的红色部分。---RDD[(K,V)]。。

valcounts=
file.flatMap(line=>
line.split(
""))

.map(word=>
(word, 1)).reduceByKey(_+ _)

5.reduceByKey函数,此函数通过implicit的隐式转换中的函数来进行,主要是传入一个计算两个value的函数。

reduceByKey这类的shuffle的RDD时,最终生成一个ShuffleRDD,

此RDD生成的Dependency为ShuffleDependency。

具体说明在下面的reduceByKey代码中,

首先在每一个map生成MapPartitionsRDD把各partitioner中的数据通过进行合并。合并通过Aggregator实例。

最后通过对合并后的MapPartitionsRDD,此RDD相当于mapreduce中的combiner,生成ShuffleRDD.

defreduceByKey(func: (V, V) => V):
RDD[(K, V)] = {

reduceByKey(defaultPartitioner(self),func)

}

defcombineByKey[C](createCombiner:
V => C,//创建combiner,通过V的值创建C

mergeValue: (C, V) =>C,//combiner已经创建C已经有一个值,把第二个的V叠加到C中,

mergeCombiners: (C, C) =>C,//把两个C进行合并,其实就是两个value的合并。

partitioner:Partitioner,//Shuffle时需要的Partitioner

mapSideCombine: Boolean =true,//为了减小传输量,很多combine可以在map端先做,

比如叠加,可以先在一个partition中把所有相同的key的value叠加,再shuffle

serializerClass: String =
null):RDD[(K, C)] = {

if(getKeyClass().isArray) {

if(mapSideCombine) {

thrownewSparkException("Cannot
use map-sidecombining with array keys.")

}

if(partitioner.isInstanceOf[HashPartitioner])
{

thrownewSparkException("Default
partitionercannot partition array keys.")

}

}

生成一个Aggregator实例。

valaggregator=
newAggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)

如果RDD本身的partitioner与传入的partitioner相同,表示不需要进行shuffle

if(self.partitioner==
Some(partitioner)) {

生成MapPartitionsRDD,直接在map端当前的partitioner下调用Aggregator.combineValuesByKey。

把相同的key的value进行合并。

self.mapPartitionsWithContext((context,iter) => {

newInterruptibleIterator(context,
aggregator.combineValuesByKey(iter,context))

}, preservesPartitioning =
true)

}elseif(mapSideCombine)
{

生成MapPartitionsRDD,先在map端当前的partitioner下调用Aggregator.combineValuesByKey。

把相同的key的value进行合并。

combineValuesByKey中检查如果key对应的C如果不存在,通过createCombiner创建C,

否则key已经存在C时,通过mergeValue把新的V与上一次的C进行合并,

mergeValue其实就是传入的reduceByKey(_+
_) 括号中的函数,与reduce端函数相同。

valcombined
=self.mapPartitionsWithContext((context, iter) => {

aggregator.combineValuesByKey(iter,context)

}, preservesPartitioning =
true)

生成ShuffledRDD,进行shuffle操作,因为此时会生成ShuffleDependency,重新生成一个新的stage.

valpartitioned=
newShuffledRDD[K, C, (K, C)](combined,partitioner)

.setSerializer(serializerClass)

在上一步完成,也就是shuffle完成,重新在reduce端进行合并操作。通过Aggregator.combineCombinersByKey

spark这些地方的方法定义都是通过动态加载执行的函数的方式,所以可以做到map端执行完成后reduce再去执行后续的处理。

因为函数在map时只是进行了定义,reduce端才对函数进行执行。

partitioned.mapPartitionsWithContext((context,iter)
=> {

newInterruptibleIterator(context,
aggregator.combineCombinersByKey(iter,context))

}, preservesPartitioning =
true)

}else{

不执行map端的合并操作,直接shuffle,并在reduce中执行合并。

//Don‘t apply map-side combiner.

valvalues
=newShuffledRDD[K, V, (K, V)](self,partitioner).setSerializer(serializerClass)

values.mapPartitionsWithContext((context,iter) => {

newInterruptibleIterator(context,
aggregator.combineValuesByKey(iter,context))

}, preservesPartitioning =
true)

}

}

RDD的依赖关系,布布扣,bubuko.com

时间: 2024-10-11 05:47:42

RDD的依赖关系的相关文章

Spark IMF传奇行动第22课:RDD的依赖关系彻底解密

版权声明:本文为博主原创文章,未经博主允许不得转载.作者:HaiziS 昨晚听了王家林老师的Spark IMF传奇行动第22课:RDD的依赖关系彻底解密,笔记如下: 1,窄依赖是指每个父RDD的一个Partition最多被子RDD的一个Partition所使用,例如map.filter.union等都会产生窄依赖: 2宽依赖是指一个父RDD的Partition会被多个子RDD的Partition所使用,例如groupByKey.reduceByKey.sortByKey等操作都会产生宽依赖 表面

sparkRDD:第4节 RDD的依赖关系;第5节 RDD的缓存机制;第6节 DAG的生成

4.      RDD的依赖关系 6.1      RDD的依赖 RDD和它依赖的父RDD的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency). 6.2      窄依赖 窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用 总结:窄依赖我们形象的比喻为独生子女.窄依赖不会产生shuffle,比如说:flatMap/map/filter.... 6.3      宽依赖 宽依赖指的是多个子RDD的Pa

RDD算子、RDD依赖关系

RDD:弹性分布式数据集, 是分布式内存的一个抽象概念 RDD:1.一个分区的集合, 2.是计算每个分区的函数 ,    3.RDD之间有依赖关系 4.一个对于key-value的RDD的Partitioner 5.一个存储存取每个Partition的优先位置的列表 RDD算子: Transformations:不会立即执行,只是记录这些操作 Actions:计算只有在action被提交的时候才被触发. RDD依赖关系: 窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Part

spark 源码分析之一 -- RDD的四种依赖关系

RDD的四种依赖关系 RDD四种依赖关系,分别是 ShuffleDependency.PrunDependency.RangeDependency和OneToOneDependency四种依赖关系.如下图所示:org.apache.spark.Dependency有两个一级子类,分别是 ShuffleDependency 和 NarrowDependency.其中,NarrowDependency 是一个抽象类,它有三个实现类,分别是OneToOneDependency.RangeDepende

RDD依赖关系

概述 RDD只支持粗粒度转换,即在大量记录上执行的单个操作.将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区.RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区. 示例代码如下: def main(args: Array[String]): Unit = { val sc: SparkContext = new SparkContext(new SparkConf() .setMaste

Spark RDD 宽窄依赖

RDD 宽窄依赖 RDD之间有一系列的依赖关系, 可分为窄依赖和宽依赖 窄依赖 从 RDD 的 parition 角度来看 父 RRD 的 parition 和 子 RDD 的 parition 之间的关系是一对一的 (或 者是多对一的). 不会有 shuffle 产生 宽依赖 父 RRD 的 parition 和 子 RDD 的 parition 之间的关系是一对多的 会产生shuffle 理解图 对stage(阶段)划分的影响 DAGSchedular 根据依赖类型切割RDD划分stage,

Intellij IDEA 中如何查看maven项目中所有jar包的依赖关系图

Maven 组件界面介绍 如上图标注 1 所示,为常用的 Maven 工具栏,其中最常用的有: 第一个按钮:Reimport All Maven Projects 表示根据 pom.xml 重新载入项目.一般单我们在 pom.xml 添加了依赖包或是插件的时候,发现标注 4 的依赖区中没有看到最新写的依赖的话,可以尝试点击此按钮进行项目的重新载入. 第六个按钮:Execute Maven Goal 弹出可执行的 Maven 命令的输入框.有些情况下我们需要通过书写某些执行命令来构建项目,就可以通

Makefile 8——使用依赖关系文件

Makefile中存在一个include指令,它的作用如同C语言中的#include预处理指令.在Makefile中,可以通过include指令将自动生成的依赖关系文件包含进来,从而使得依赖关系文件中的内容成为Makefile的一部分. 在此之前,先介绍一下Makefile中的include的用法. 1 .PHONY:all clean 2 DIR_DEP=dep 3 DEPS=test_deps 4 all: exe 5 6 include $(DEPS) 7 8 dep: 9 mkdir d

Makefile 7——自动生成依赖关系 三颗星

后面会介绍gcc获得源文件依赖的方法,gcc这个功能就是为make而存在的.我们采用gcc的-MM选项结合sed命令.使用sed进行替换的目的是为了在目标名前加上"objs/"前缀.gcc的-E选项,预处理.在生成依赖关系时,其实并不需要gcc编译源文件,只要预处理就可以获得依赖关系了.通过-E选项,可以避免生成依赖关系时gcc发出警告,以及提高依赖关系的生成效率. 现在,已经找到自动生成依赖关系的方法了,那么如何将其整合到我们complicated项目的Makefile中呢?自动生成