Spark RDD基本概念、宽窄依赖、转换行为操作

目录

  • RDD概述

    • RDD的内部代码
    • 案例
    • 小总结
  • 转换、行动算子
  • 宽、窄依赖
  • Reference

本文介绍一下rdd的基本属性概念、rdd的转换/行动操作、rdd的宽/窄依赖。

RDD:Resilient Distributed Dataset 弹性分布式数据集,是Spark中的基本抽象。

RDD表示可以并行操作的元素的不变分区集合

RDD提供了许多基本的函数(map、filter、reduce等)供我们进行数据处理。

RDD概述

通常来说,每个RDD有5个主要的属性组成:

  • 分区列表

    RDD是由多个分区组成的,分区是逻辑上的概念。RDD的计算是以分区为单位进行的。

  • 用于计算每个分区的函数

    作用于每个分区数据的计算函数。

  • 对其他RDD的依赖关系列表

    RDD中保存了对于父RDD的依赖,根据依赖关系组成了Spark的DAG(有向无环图),实现了spark巧妙、容错的编程模型

  • 针对键值型RDD的分区器

    分区器针对键值型RDD而言的,将key传入分区器获取唯一的分区id。在shuffle中,分区器有很重要的体现。

  • 对每个分区进行计算的首选位置列表

    根据数据本地性的特性,获取计算的首选位置列表,尽可能的把计算分配到靠近数据的位置,减少数据的网络传输。

RDD的内部代码

先看看基本概念的代码:
//创建此RDD的SparkContext
def sparkContext: SparkContext = sc
// 唯一的id
val id: Int = sc.newRddId()
// rdd友善的名字
@transient var name: String = _
// 分区器
val partitioner: Option[Partitioner] = None
// 获取依赖列表
// dependencies和partitions中都用到了checkpointRDD,如果进行了checkpoint,checkpointRDD表示进行checkpoint后的rdd
final def dependencies: Seq[Dependency[_]] = {
    // 一对一的窄依赖
    checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
        if (dependencies_ == null) {
            dependencies_ = getDependencies
        }
        dependencies_
    }
}
// 获取分区列表
final def partitions: Array[Partition] = {
    checkpointRDD.map(_.partitions).getOrElse {
        if (partitions_ == null) {
            partitions_ = getPartitions
            partitions_.zipWithIndex.foreach { case (partition, index) =>
                require(partition.index == index,
                        s"partitions($index).partition == ${partition.index}, but it should equal $index")
            }
        }
        partitions_
    }
}
// 获取分区的首选位置
final def preferredLocations(split: Partition): Seq[String] = {
    checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
        getPreferredLocations(split)
    }
}
// 对应到每个分区的计算函数
def compute(split: Partition, context: TaskContext): Iterator[T]

主要就是围绕上面5个重要属性的一些操作

常用的函数/算子
// 返回仅包含满足过滤条件的元素的新RDD。
def filter(f: T => Boolean): RDD[T] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
        this,
        (context, pid, iter) => iter.filter(cleanF),
        preservesPartitioning = true)
}
// 通过将函数应用于此RDD的所有元素来返回新的RDD。
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
// 首先向该RDD的所有元素应用函数,然后将结果展平,以返回新的RDD。
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}

我们可以发现几乎每个算子都会以当前RDD和对应的计算函数创建新的RDD,每个子RDD都持有父RDD的引用。

这就印证了RDD的不变性,也表明了RDD的计算是通过对RDD进行转换实现的。

案例

val words = Seq("hello spark", "hello scala", "hello java")
val rdd = sc.makeRDD(words)
rdd
    .flatMap(_.split(" "))
    .map((_, 1))
    .reduceByKey(_ + _)
    .foreach(println(_))

上面是一个简单的RDD的操作,我们先调用makeRDD创建了一个RDD,之后对rdd进行一顿算子调用。

首先调用flatMap,flatMap内部会以当前rdd和我们传入的_.split(" ")构建新的MapPartitionsRDD;

之后map,map以上步生成的MapPartitionsRDD和我们传入的(_, 1)构造新的MapPartitionsRDD;

之后reduceByKey,reduceByKey构造新的RDD;

走到foreach,foreach是行动操作,触发计算,输出。

小总结

  • RDD内部的计算除action算子以外,其他算子都是懒执行,不会触发计算,只是进行RDD的转换。
  • RDD的计算是基于分区为单位计算的,我们传进去的函数,作用于分区进行计算

转换、行动算子

从上面知道RDD是懒执行的,只有遇到行动算子才执行计算。

转换操作:在内部对根据父RDD创建新的RDD,不执行计算

行动操作:内部会调用sc.runJob,提交作业、划分阶段、执行作业。

一些常见的行动操作

foreach、foreachPartition、collect、reduce、count

除行动操作外,都是转换操作

宽、窄依赖

宽窄依赖是shuffle划分调度的重要依据。

先看看spark中与依赖有关的几个类(一层一层继承关系):

Dependency依赖的顶级父类
    NarrowDependency 窄依赖
        OneToOneDependency 表示父RDD和子RDD分区之间的一对一依赖关系的窄依赖
        RangeDependency 表示父RDD和子RDD中分区范围之间的一对一依赖关系的窄依赖
    ShuffleDependency 宽依赖

先说宽窄依赖的概念:

窄依赖:父RDD的每个分区只被一个子RDD分区使用

宽依赖:父RDD的每个分区都有可能被多个子RDD分区使用

其实就是父RDD的一个分区会被传到几个子RDD分区的区别。如果被传到一个子RDD分区,就可以不需要移动数据(移动计算);如果被传到多个子RDD分区,就需要进行数据的传输。

接下来看看Dependency内部的一些属性及方法:

// 依赖对应的rdd,其实就是当前rdd的父rdd。宽依赖和窄依赖都有这个属性
def rdd: RDD[T]
// 获取子分区对应的父分区(窄依赖的方法)
def getParents(partitionId: Int): Seq[Int]

// 以下是宽依赖的属性及方法
// 对应键值RDD的分区器
val partitioner: Partitioner
// 在数据传输时的序列化方法
val serializer: Serializer = SparkEnv.get.serializer
// 键的排序方式
val keyOrdering: Option[Ordering[K]] = None
// 一组用于聚合数据的功能
val aggregator: Option[Aggregator[K, V, C]] = None
// 是否需要map端预聚合
val mapSideCombine: Boolean = false
// 当前宽依赖的id
val shuffleId: Int = _rdd.context.newShuffleId()
// 向管理员注册一个shuffle,并获取一个句柄,以将其传递给任务
val shuffleHandle: ShuffleHandle =  _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
一些常见的宽窄依赖

窄依赖:map、filter、union、mapPartitions、join(当分区器是HashPartitioner)

宽依赖:sortByKey、join(分区器不是HashPartitioner时)

最后说一下reduceByKey,顺便说一下为什么当分区器HashPartitioner时就是窄依赖。

reduceByKey是用来将key分组后,执行我们传入的函数。

它是窄依赖,它内部默认会使用HashPartitioner分区。

同一个key进去HashPartitioner得到的分区id是一样的,这样进行计算前后同一个key得到的分区都一样,父RDD的分区就只被子RDD的一个分区依赖,就不需要移动数据。

所以join、reduceByKey在分区器是HashPartitioner时是窄依赖。

end. 个人理解,如有偏差,欢迎交流指正。

Reference

  • 《图解Spark核心技术与案例实战》
  • 宽窄依赖:https://www.jianshu.com/p/5c2301dfa360

扶我起来,我还能学。




个人公众号:码农峰,定时推送行业资讯,持续发布原创技术文章,欢迎大家关注。

原文地址:https://www.cnblogs.com/upupfeng/p/12344963.html

时间: 2024-08-07 04:13:57

Spark RDD基本概念、宽窄依赖、转换行为操作的相关文章

Spark RDD使用详解1--RDD原理

RDD简介 在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数据集(Resilient Distributed Dataset,RDD).RDD是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现.RDD是Spark最核心的东西,它表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现.RDD必须是可序列化的.RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操

spark-DAG,宽窄依赖,Stage,Shuffle

spark-DAG图 DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,RDD之间的依赖关系形成了DAG图,而根据RDD之间的依赖关系的不同将DAG划分成不同的Stage. 宽窄依赖 窄依赖:父RDD和子RDD partition之间的关系是一对一的.或者父RDD一个partition只对应一个子RDD的partition情况下的父RDD和子RDD partition关系是多对一的.不会有shuffle的产生.父RDD的一个分区去

Spark RDD 宽窄依赖

RDD 宽窄依赖 RDD之间有一系列的依赖关系, 可分为窄依赖和宽依赖 窄依赖 从 RDD 的 parition 角度来看 父 RRD 的 parition 和 子 RDD 的 parition 之间的关系是一对一的 (或 者是多对一的). 不会有 shuffle 产生 宽依赖 父 RRD 的 parition 和 子 RDD 的 parition 之间的关系是一对多的 会产生shuffle 理解图 对stage(阶段)划分的影响 DAGSchedular 根据依赖类型切割RDD划分stage,

Apache Spark RDD之RDD的转换

RDD的转换 Spark会根据用户提交的计算逻辑中的RDD的转换和动作来生成RDD之间的依赖关系,同时这个计算链也就生成了逻辑上的DAG.接下来以“Word Count”为例,详细描述这个DAG生成的实现过程. Spark Scala版本的Word Count程序如下: 1: val file = spark.textFile("hdfs://...") 2: val counts = file.flatMap(line => line.split(" "))

RDD的缓存,依赖,spark提交任务流程

1.RDD的缓存 Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或缓存个数据集.当持久化某个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此RDD或衍生出的RDD进行的其他动作中重用.这使得后续的动作变得更加迅速.RDD相关的持久化和缓存,是Spark最重要的特征之一.可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键. RDD缓存方式 RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发

(1)spark核心RDD的概念解析、创建、以及相关操作

spark核心之RDD 什么是RDD RDD指的是弹性分布式数据集(Resilient Distributed Dataset),它是spark计算的核心.尽管后面我们会使用DataFrame.Dataset进行编程,但是它们的底层依旧是依赖于RDD的.我们来解释一下RDD(Resilient Distributed Dataset)的这几个单词含义. 弹性:在计算上具有容错性,spark是一个计算框架,如果某一个节点挂了,可以自动进行计算之间血缘关系的跟踪 分布式:很好理解,hdfs上数据是跨

spark 源码分析之一 -- RDD的四种依赖关系

RDD的四种依赖关系 RDD四种依赖关系,分别是 ShuffleDependency.PrunDependency.RangeDependency和OneToOneDependency四种依赖关系.如下图所示:org.apache.spark.Dependency有两个一级子类,分别是 ShuffleDependency 和 NarrowDependency.其中,NarrowDependency 是一个抽象类,它有三个实现类,分别是OneToOneDependency.RangeDepende

Spark RDD 的宽依赖和窄依赖 -- (视频笔记)

窄依赖 narrow dependency map,filter,union , join(co-partitioned)制定了父RDD中的分片具体交给哪个唯一的子RDD 并行的,RDD分片是独立的. 只依赖相同ID的分片 range分片 one to dependency range dependency 内部可以previously computed partition 可以将计算合并,可以极大的提升效率,编写的时候可能是多个函数,执行的时候合并成一个函数,极大的减少了零碎内存或磁盘资源.

Apache Spark RDD(Resilient Distributed Datasets)论文

Spark RDD(Resilient Distributed Datasets)论文 概要 1: 介绍 2: Resilient Distributed Datasets(RDDs) 2.1 RDD 抽象 2.2 Spark 编程接口 2.2.1 例子 – 监控日志数据挖掘 2.3 RDD 模型的优势 2.4 不适合用 RDDs 的应用 3 Spark 编程接口 3.1 Spark 中 RDD 的操作 3.2 举例应用 3.2.1 线性回归 3.2.2 PageRank 4 表达 RDDs 5