Spark之RDD的定义及五大特性

  RDD是分布式内存的一个抽象概念,是一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,能横跨集群所有节点并行计算,是一种基于工作集的应用抽象。

  RDD底层存储原理:其数据分布存储于多台机器上,事实上,每个RDD的数据都以Block的形式存储于多台机器上,每个Executor会启动一个BlockManagerSlave,并管理一部分Block;而Block的元数据由Driver节点上的BlockManagerMaster保存,BlockManagerSlave生成Block后向BlockManagerMaster注册该Block,BlockManagerMaster管理RDD与Block的关系,当RDD不再需要存储的时候,将向BlockManagerSlave发送指令删除相应的Block。

  BlockManager管理RDD的物理分区,每个Block就是节点上对应的一个数据块,可以存储在内存或者磁盘上。而RDD中的Partition是一个逻辑数据块,对应相应的物理块Block。本质上,一个RDD在代码中相当于数据的一个元数据结构,存储着数据分区及其逻辑结构映射关系,存储着RDD之前的依赖转换关系。

  BlockManager在每个节点上运行管理Block(Driver和Executors),它提供一个接口检索本地和远程的存储变量,如memory、disk、off-heap。使用BlockManager前必须先初始化。BlockManager.scala的部分源码如下所示:


1

2

3

4

5

6

7

8

9

10

11

12

13

private[spark] class BlockManager(

    executorId: String,

    rpcEnv: RpcEnv,

    val master: BlockManagerMaster,

    serializerManager: SerializerManager,

    val conf: SparkConf,

    memoryManager: MemoryManager,

    mapOutputTracker: MapOutputTracker,

    shuffleManager: ShuffleManager,

    val blockTransferService: BlockTransferService,

    securityManager: SecurityManager,

    numUsableCores: Int)

  extends BlockDataManager with BlockEvictionHandler with Logging {

  BlockManagerMaster会持有整个Application的Block的位置、Block所占用的存储空间等元数据信息,在Spark的Driver的DAGScheduler中,就是通过这些信息来确认数据运行的本地性的。Spark支持重分区,数据通过Spark默认的或者用户自定义的分区器决定数据块分布在哪些节点。RDD的物理分区是由Block-Manager管理的,每个Block就是节点上对应的一个数据块,可以存储在内存或者磁盘。而RDD中的partition是一个逻辑数据块,对应相应的物理块Block。本质上,一个RDD在代码中相当于数据的一个元数据结构(一个RDD就是一组分区),存储着数据分区及Block、Node等的映射关系,以及其他元数据信息,存储着RDD之前的依赖转换关系。分区是一个逻辑概念,Transformation前后的新旧分区在物理上可能是同一块内存存储。  

  Spark通过读取外部数据创建RDD,或通过其他RDD执行确定的转换Transformation操作(如map、union和groubByKey)而创建,从而构成了线性依赖关系,或者说血统关系(Lineage),在数据分片丢失时可以从依赖关系中恢复自己独立的数据分片,对其他数据分片或计算机没有影响,基本没有检查点开销,使得实现容错的开销很低,失效时只需要重新计算RDD分区,就可以在不同节点上并行执行,而不需要回滚(Roll Back)整个程序。落后任务(即运行很慢的节点)是通过任务备份,重新调用执行进行处理的。

  因为RDD本身支持基于工作集的运用,所以可以使Spark的RDD持久化(persist)到内存中,在并行计算中高效重用。多个查询时,我们就可以显性地将工作集中的数据缓存到内存中,为后续查询提供复用,这极大地提升了查询的速度。在Spark中,一个RDD就是一个分布式对象集合,每个RDD可分为多个片(Partitions),而分片可以在集群环境的不同节点上计算。

  RDD作为泛型的抽象的数据结构,支持两种计算操作算子:Transformation(变换)与Action(行动)。且RDD的写操作是粗粒度的,读操作既可以是粗粒度的,也可以是细粒度的。RDD.scala的源码如下:


1

2

3

4

5

6

7

8

9

10

11

12

13

/**

 * Internally, each RDD is characterized by five main properties:

 * 每个RDD都有5个主要特性

 *  - A list of partitions    分区列表

 *  - A function for computing each split    每个分区都有一个计算函数

 *  - A list of dependencies on other RDDs    依赖于其他RDD的列表

 *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)    数据类型(key-value)的RDD分区器

 *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for    每个分区都有一个分区位置列表

 */

abstract class RDD[T: ClassTag](

    @transient private var _sc: SparkContext,

    @transient private var deps: Seq[Dependency[_]]

  extends Serializable with Logging {

  其中,SparkContext是Spark功能的主要入口点,一个SparkContext代表一个集群连接,可以用其在集群中创建RDD、累加变量、广播变量等,在每一个可用的JVM中只有一个SparkContext,在创建一个新的SparkContext之前,必须先停止该JVM中可用的SparkContext,这种限制可能最终会被修改。SparkContext被实例化时需要一个SparkConf对象去描述应用的配置信息,在这个配置对象中设置的信息,会覆盖系统默认的配置。

  RDD五大特性:

  (1)分区列表(a list of partitions)。Spark RDD是被分区的,每一个分区都会被一个计算任务(Task)处理,分区数决定并行计算数量,RDD的并行度默认从父RDD传给子RDD。默认情况下,一个HDFS上的数据分片就是一个Partition,RDD分片数决定了并行计算的力度,可以在创建RDD时指定RDD分片个数,如果不指定分区数量,当RDD从集合创建时,则默认分区数量为该程序所分配到的资源的CPU核数(每个Core可以承载2~4个Partition),如果是从HDFS文件创建,默认为文件的Block数。

  (2)每一个分区都有一个计算函数(a function for computing each split)。每个分区都会有计算函数,Spark的RDD的计算函数是以分片为基本单位的,每个RDD都会实现compute函数,对具体的分片进行计算,RDD中的分片是并行的,所以是分布式并行计算。有一点非常重要,就是由于RDD有前后依赖关系,遇到宽依赖关系,例如,遇到reduceBykey等宽依赖操作的算子,Spark将根据宽依赖划分Stage,Stage内部通过Pipeline操作,通过Block Manager获取相关的数据,因为具体的split要从外界读数据,也要把具体的计算结果写入外界,所以用了一个管理器,具体的split都会映射成BlockManager的Block,而具体split会被函数处理,函数处理的具体形式是以任务的形式进行的。

  (3)依赖于其他RDD的列表(a list of dependencies on other RDDs)。RDD的依赖关系,由于RDD每次转换都会生成新的RDD,所以RDD会形成类似流水线的前后依赖关系,当然,宽依赖就不类似于流水线了,宽依赖后面的RDD具体的数据分片会依赖前面所有的RDD的所有的数据分片,这时数据分片就不进行内存中的Pipeline,这时一般是跨机器的。因为有前后的依赖关系,所以当有分区数据丢失的时候,Spark会通过依赖关系重新计算,算出丢失的数据,而不是对RDD所有的分区进行重新计算。RDD之间的依赖有两种:窄依赖(Narrow Dependency)、宽依赖(Wide Dependency)。RDD是Spark的核心数据结构,通过RDD的依赖关系形成调度关系。通过对RDD的操作形成整个Spark程序。

    RDD有Narrow Dependency和Wide Dependency两种不同类型的依赖,其中的Narrow Dependency指的是每一个parent RDD的Partition最多被child RDD的一个Partition所使用,而Wide Dependency指的是多个child RDD的Partition会依赖于同一个parent RDD的Partition。可以从两个方面来理解RDD之间的依赖关系:一方面是该RDD的parent RDD是什么;另一方面是依赖于parent RDD的哪些Partitions;根据依赖于parent RDD的Partitions的不同情况,Spark将Dependency分为宽依赖和窄依赖两种。Spark中宽依赖指的是生成的RDD的每一个partition都依赖于父RDD的所有partition,宽依赖典型的操作有groupByKey、sortByKey等,宽依赖意味着shuffle操作,这是Spark划分Stage边界的依据,Spark中宽依赖支持两种Shuffle Manager,即HashShuffleManager和SortShuffleManager,前者是基于Hash的Shuffle机制,后者是基于排序的Shuffle机制。Spark 2.2现在的版本中已经没有Hash Shuffle的方式。

  (4)key-value数据类型的RDD分区器(-Optionally,a Partitioner for key-value RDDS),控制分区策略和分区数。每个key-value形式的RDD都有Partitioner属性,它决定了RDD如何分区。当然,Partition的个数还决定每个Stage的Task个数。RDD的分片函数,想控制RDD的分片函数的时候可以分区(Partitioner)传入相关的参数,如HashPartitioner、RangePartitioner,它本身针对key-value的形式,如果不是key-value的形式,它就不会有具体的Partitioner。Partitioner本身决定了下一步会产生多少并行的分片,同时,它本身也决定了当前并行(parallelize)Shuffle输出的并行数据,从而使Spark具有能够控制数据在不同节点上分区的特性,用户可以自定义分区策略,如Hash分区等。Spark提供了“partitionBy”运算符,能通过集群对RDD进行数据再分配来创建一个新的RDD。

  (5)每个分区都有一个优先位置列表(-Optionally,a list of preferred locations to compute each split on)。它会存储每个Partition的优先位置,对于一个HDFS文件来说,就是每个Partition块的位置。观察运行spark集群的控制台会发现Spark的具体计算,具体分片前,它已经清楚地知道任务发生在什么节点上,也就是说,任务本身是计算层面的、代码层面的,代码发生运算之前已经知道它要运算的数据在什么地方,有具体节点的信息。这就符合大数据中数据不动代码动的特点。数据不动代码动的最高境界是数据就在当前节点的内存中。这时有可能是memory级别或Alluxio级别的,Spark本身在进行任务调度时候,会尽可能将任务分配到处理数据的数据块所在的具体位置。据Spark的RDD.Scala源码函数getPreferredLocations可知,每次计算都符合完美的数据本地性。
RDD类源码文件中的4个方法和一个属性对应上述阐述的RDD的5大特性。RDD.scala的源码如下:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

/**

 * :: DeveloperApi ::

 * Implemented by subclasses to compute a given partition. 通过子类实现给定分区的计算

 */

@DeveloperApi

def compute(split: Partition, context: TaskContext): Iterator[T]

/**

 * Implemented by subclasses to return the set of partitions in this RDD. This method will only

 * be called once, so it is safe to implement a time-consuming computation in it.

 * 通过子类实现,返回一个RDD分区列表,这个方法只被调用一次,它是安全的执行一次耗时计算

 *

 * 数组中的分区必须符合以下属性设置

 * The partitions in this array must satisfy the following property:

 *   `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`

 */

protected def getPartitions: Array[Partition]

/**

 * 返回对父RDD的依赖列表,这个方法仅只被调用一次,它是安全的执行一次耗时计算

 * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only

 * be called once, so it is safe to implement a time-consuming computation in it.

 */

protected def getDependencies: Seq[Dependency[_]] = deps

/**

 * 可选的,指定优先位置,输入参数是spilt分片,输出结果是一组优先的节点位置

 * Optionally overridden by subclasses to specify placement preferences.

 */

protected def getPreferredLocations(split: Partition): Seq[String] = Nil

/**

 * Optionally overridden by subclasses to specify how they are partitioned.

 * 可选的,通过子类实现,指定如何分区

 */

@transient val partitioner: Option[Partitioner] = None

  其中,TaskContext是读取或改变执行任务的环境,用org.apache.spark.TaskContext.get()可返回当前可用的TaskContext,可以调用内部的函数访问正在运行任务的环境信息。Partitioner是一个对象,定义了如何在key-Value类型的RDD元素中用Key分区,从0到numPartitions-1区间内映射每一个Key到Partition ID。Partition是一个RDD的分区标识符。Partition.scala的源码如下。  


1

2

3

4

5

6

7

8

9

10

11

12

13

14

/**

 * An identifier for a partition in an RDD.

 */

trait Partition extends Serializable {

  /**

   * Get the partition‘s index within its parent RDD

   */

  def index: Int

  // A better default implementation of HashCode

  override def hashCode(): Int = index

  override def equals(other: Any): Boolean = super.equals(other)

}

原文地址:https://www.cnblogs.com/ninglinglong/p/11827361.html

时间: 2024-11-10 05:27:06

Spark之RDD的定义及五大特性的相关文章

Spark的RDD原理以及2.0特性的介绍

转载自:http://www.tuicool.com/articles/7VNfyif 王联辉,曾在腾讯,Intel 等公司从事大数据相关的工作.2013 年 - 2016 年先后负责腾讯 Yarn 集群和 Spark 平台的运营与研发.曾负责 Intel Hadoop 发行版的 Hive 及 HBase 版本研发.参与过百度用户行为数据仓库的建设和开发,以及淘宝数据魔方和淘宝指数的数据开发工作.给 Spark 社区贡献了 25+ 个 patch,接受的重要特性有 python on yarn-

Spark之RDD弹性特性

RDD作为弹性分布式数据集,它的弹性具体体现在以下七个方面. 1.自动进行内存和磁盘数据存储的切换 Spark会优先把数据放到内存中,如果内存实在放不下,会放到磁盘里面,不但能计算内存放下的数据,也能计算内存放不下的数据.如果实际数据大于内存,则要考虑数据放置策略和优化算法.当应用程序内存不足时,Spark应用程序将数据自动从内存存储切换到磁盘存储,以保障其高效运行. 2.基于Lineage(血统)的高效容错机制 Lineage是基于Spark RDD的依赖关系来完成的(依赖分为窄依赖和宽依赖两

(1)spark核心RDD的概念解析、创建、以及相关操作

spark核心之RDD 什么是RDD RDD指的是弹性分布式数据集(Resilient Distributed Dataset),它是spark计算的核心.尽管后面我们会使用DataFrame.Dataset进行编程,但是它们的底层依旧是依赖于RDD的.我们来解释一下RDD(Resilient Distributed Dataset)的这几个单词含义. 弹性:在计算上具有容错性,spark是一个计算框架,如果某一个节点挂了,可以自动进行计算之间血缘关系的跟踪 分布式:很好理解,hdfs上数据是跨

【Spark】RDD操作详解1——Transformation和Actions概况

Spark算子的作用 下图描述了Spark在运行转换中通过算子对RDD进行转换. 算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作. 输入:在Spark程序运行中,数据从外部数据空间(如分布式存储:textFile读取HDFS等,parallelize方法输入Scala集合或数据)输入Spark,数据进入Spark运行时数据空间,转化为Spark中的数据块,通过BlockManager进行管理. 运行:在Spark数据输入形成RDD后便可以通过变换算子,如filter等,对数据进行操

【Spark】RDD操作详解3——键值型Transformation算子

Transformation处理的数据为Key-Value形式的算子大致可以分为:输入分区与输出分区一对一.聚集.连接操作. 输入分区与输出分区一对一 mapValues mapValues:针对(Key,Value)型数据中的Value进行Map操作,而不对Key进行处理. 方框代表RDD分区.a=>a+2代表只对( V1, 1)数据中的1进行加2操作,返回结果为3. 源码: /** * Pass each value in the key-value pair RDD through a m

Spark核心—RDD初探

本文目的 ? 最近在使用Spark进行数据清理的相关工作,初次使用Spark时,遇到了一些挑(da)战(ken).感觉需要记录点什么,才对得起自己.下面的内容主要是关于Spark核心-RDD的相关的使用经验和原理介绍,作为个人备忘,也希望对读者有用. ? 为什么选择Spark ? 原因如下 代码复用:使用Scala高级语言操作Spark,灵活方便,面向对象,函数编程的语言特性可以全部拿来.Scala基本上可以无缝集成java及其相关库.最重要的是,可以封装组件,沉淀工作,提高工作效率.之前用hi

Spark的RDD检查点实现分析

概述 在<深入理解Spark:核心思想与源码分析>一书中只是简单介绍了下RDD的checkpoint,对本书是个遗憾.所以此文的目的旨在查漏补缺,完善本书的内容. Spark的RDD执行完成之后会保存检查点,便于当整个作业运行失败重新运行时候,从检查点恢复之前已经运行成功的RDD结果,这样就会大大减少重新计算的成本,提高任务恢复效率和执行效率,节省Spark各个计算节点的资源.本文着重分析检查点的代码实现,更深入理解其原理.在<深入理解Spark:核心思想与源码分析>一书的第5章中

Spark发行笔记8:解读Spark Streaming RDD的全生命周期

本节主要内容: 一.DStream与RDD关系的彻底的研究 二.StreamingRDD的生成彻底研究 Spark Streaming RDD思考三个关键的问题: RDD本身是基本对象,根据一定时间定时产生RDD的对象,随着时间的积累,不对其管理的话会导致内存会溢出,所以在BatchDuration时间内执行完RDD操作后,需对RDD进行管理. 1.DStream生成RDD的过程,DStream到底是怎么生成RDD的? 2.DStream和RDD到底什么关系? 3.运行之后怎么对RDD处理? 所

Spark学习之路 (三)Spark之RDD[转]

RDD的概述 什么是RDD? RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变.可分区.里面的元素可并行计算的集合.RDD具有数据流模型的特点:自动容错.位置感知性调度和可伸缩性.RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度. RDD的属性 (1)一组分片(Partition),即数据集的基本组成单位.对于RDD来说,每个分片都会被一个计算任务处