- 如何表示这样一个分区的,高效容错的而且能够持久化的分布式数据集呢?一般情况下抽象的RDD包含如下5大接口。
1)partition : 分区属性: 每个RDD包好多个分区, 这既是RDD的数据单位, 也是计算粒度, 每个分区在由一个Task线程处理. 在RDD创建的时候可以指定分区的个数, 如果没有指定, 那么, 默认分区的个数是CPU的核数. 每一块的分区对应一个内存上的block, 由BlockManager分配.
Partition实现:
partition 与 iterator 方法
RDD 的 def iterator(split: Partition, context: TaskContext): Iterator[T] 方法用来获取 split 指定的 Partition 对应的数据的迭代器,有了这个迭代器就能一条一条取出数据来按 compute chain 来执行一个个transform 操作。iterator 的实现如下:
其先判断 RDD 的 storageLevel 是否为 NONE,若不是,则尝试从缓存中读取,读取不到则通过计算来获取该Partition对应的数据的迭代器;若是,尝试从 checkpoint 中获取 Partition 对应数据的迭代器,若 checkpoint 不存在则通过计算(compute属性)
2)partitioner:RDD的分区方式, 这个属性指的是RDD的partitioner函数(分片函数), 分区函数就是将数据分配到指定的分区, 这个目前实现了HashPartitioner和RangePartitioner, 只有key-value的RDD才会有分片函数, 否则为none. 分片函数不仅决定了当前分片的个数, 同时决定parent shuffle RDD的输出的分区个数.
3)denpendiencies(): Spark的运行过程就是RDD之间的转换, 因此, 必须记录RDD之间的生成关系(新RDD是由哪个或哪几个RDD生成), 这就是所谓的依赖关系, 这样既有助于阶段和任务的划分, 也有助于在某个分区出错的时候, 只需要重新计算与当前出错的分区的有关的分区,而不需要计算所有的分区.
代码:
依赖类型:
- 窄依赖:父 RDD 的 partition 至多被一个子 RDD partition 依赖(OneToOneDependency,RangeDependency)
- 宽依赖:父 RDD 的 partition 被多个子 RDD partitions 依赖(ShuffleDependency)
图示:
4)compute(p,context): 计算属性: 当调用 RDD#iterator 方法无法从缓存或 checkpoint 中获取指定 partition 的迭代器时,就需要调用 compute 方法来获取
RDD不仅包含有数据, 还有在数据上的计算, 每个RDD以分区为计算粒度, 每个RDD会实现compute函数, compute函数会和迭代器(RDD之间转换的迭代器)进行复合, 这样就不需要保存每次compute运行的结果.
代码:
下面举几个算子操作:
map
首先来看下 map 的实现:
我们调用 map 时,会传入匿名函数 f: T => U,该函数将一个类型 T 实例转换成一个类型 U 的实例。在 map 函数中,将该函数进一步封装成 (context, pid, iter) => iter.map(cleanF) 的函数,该函数以迭代器作为参数,对迭代出的每一个元素执行 f 函数,然后以该封装后的函数作为参数来构造 MapPartitionsRDD,接下来看看 MapPartitionsRDD#compute 是怎么实现的:
上面代码中的 firstParent 是指本 RDD 的依赖 dependencies: Seq[Dependency[_]] 中的第一个,MapPartitionsRDD 的依赖中只有一个父 RDD。而 MapPartitionsRDD 的 partition 与其唯一的父 RDD partition 是一一对应的,所以其 compute 方法可以描述为:对父 RDD partition 中的每一个元素执行传入 map 的方法得到自身的 partition 及迭代器
图示:
groupByKey
与 map、union 不同,groupByKey 是一个会产生宽依赖的 transform,其最终生成的 RDD 是 ShuffledRDD,来看看其 compute 实现:
可以看到,ShuffledRDD 的 compute 使用 ShuffleManager 来获取一个 reader,该 reader 将从本地或远程 BlockManager 拉取 map output 的 file 数据,每个 reduce task 拉取一个 partition 数据。
图示:
5)preferredLocations(p):对于分区而言返回数据本地化计算的节点,
也就是说, 每个RDD会报出一个列表(Seq), 而这个列表保存着分片优先分配给哪个Worker节点计算, spark坚持移动计算而布移动数据的原则. 也就是尽量在存储数据的节点上进行计算.
要注意的是,并不是每个 RDD 都有 preferedLocation,比如从 Scala 集合中创建的 RDD 就没有,而从 HDFS 读取的 RDD 就有,其 partition 对应的优先位置及对应的 block 所在的各个节点。
参考链接:
http://mt.sohu.com/20160721/n460362468.shtml
文/牛肉圆粉不加葱(简书作者)
原文链接:http://www.jianshu.com/p/207607888767
著作权归作者所有,转载请联系作者获得授权,并标注“简书作者”。