Spark分析之Dependency

在Spark中,每一个RDD是对于数据集在某一状态下的表现形式,比如说:map、filter、group by等都算一次操作,这个状态有可能是从前一状态转换而来的;

因此换句话说一个RDD可能与之前的RDD(s)有依赖关系;RDD之间存在依赖关系;

根据依赖关系的不同,可以将RDD分成两种不同的类型:宽依赖和窄依赖

窄依赖:一个父RDD的partition至多被子RDD的某个partition使用一次

宽依赖:一个父RDD的partition会被子RDD的partition使用多次,需要shuffle操作

图中方框描述:外面的大方框是一个RDD,里面的小方块是RDD中的partition,多个partition组成一个RDD

窄依赖

定义:一个父RDD的partition至多被子RDD的某个partition使用一次

不需要shuffle,partition范围不会改变,一个partition经过transform后还是一个partition,虽然内容发生了变化;可以进行pipeline计算,快速完成;

在某个节点上可以一次性全部计算完所有的父partition(pipeline流水式的计算方式):

a.map().filter().reduceByKey() 这样多步操作一次性计算完毕,而不需要第一步执行完后保存起来,第二步再去读取再计算再存储。。。。。。

窄依赖可以在单节点上完成运算,非常高效。

容错:某个partition挂了,快速将丢失的partition并行计算出来

容错和计算速度都比宽依赖强。

窄依赖又分为两种:

OneToOneDependency:一对一的依赖,一父一子,最典型的是map/filter。

RangeDependency:一定范围的RDD直接对应,最典型的是Union。

  parent RDD的某个分区的partitions对应到child RDD中某个区间的partitions;
  union:多个parent RDD合并到一个chind RDD,故每个parent RDD都对应到child RDD中的一个区间;
  注意:union不会把多个partition合并成一个partition,而是简单的把多个RDD的partitions放到一个RDD中,partition不会发生变化。

宽依赖

定义:一个父RDD的partition会被子RDD的partition使用多次;只能前面的算好后才能进行后续的计算;只有等到父partition的所有数据都传输到各个节点后才能计算(经典的mapreduce场景)

容错:某个partition挂了,要计算前面所有的父partition,代价很大

spark是把map部分的数据计算完成后物化到map端的磁盘上,挂了之后直接从磁盘中读取即可。

class ShuffleDependency[K, V](
    @transient rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializerClass: String = null)
  extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
  val shuffleId: Int = rdd.context.newShuffleId()
}

首先:需要基于PairRDD,因为一般需要依据key进行shuffle,所以数据结构往往是key-value;
其次:由于需要shuffle,所以就需要给出partitioner;
然后:shuffle不像map可以在local运行,往往需要网络传输或存储,所以需要serializerClass;
最后:每个shuffle需要分配一个全局的id,context.newShuffleId()的实现就是把全局id累加;

Spark分析之Dependency

时间: 2024-11-09 08:54:31

Spark分析之Dependency的相关文章

Window7 开发 Spark 分析 Nginx 日志

通过上文 Window7 开发 Spark 应用 ,展示了如何开发一个Spark应用,但文中使用的测试数据都是自己手动录入的. 所以本文讲解一下如何搭建一个开发闭环,本里使用了Nginx日志采集分析为例,分析页面访问最多的10个,404页面的10. 如果把这些开发成果最终展示到一个web网页中,在这篇文章中就不描述了,本博其他文章给出的示例已经足够你把Spark的应用能力暴露到Web中. 版本信息 OS: Window7 JAVA:1.8.0_181 Hadoop:3.2.1 Spark: 3.

Spark分析之DAGScheduler

DAGScheduler的主要功能1.接收用户提交的job;2.将job根据类型划分为不同的stage,并在每一个stage内产生一系列的task,并封装成TaskSet;3.向TaskScheduler提交TaskSet; 以如下示例描述Job提交过程: val sc = new SparkContext("local[2]", "WordCount", System.getenv("SPARK_HOME"), Seq(System.geten

Spark分析之Job Scheduling Process

经过前面文章的SparkContext.DAGScheduler.TaskScheduler分析,再从总体上了解Spark Job的调度流程 1.SparkContext将job的RDD DAG图提交给DAGScheduler: 2.DAGScheduler将job分解成Stage DAG,将每个Stage的Task封装成TaskSet提交给TaskScheduler:窄依赖以pipeline方式执行,效率高: 3.TaskScheduler将TaskSet中的一个个Task提交到集群中去运行:

Spark分析之Master

override def preStart() { logInfo("Starting Spark master at " + masterUrl) webUi.bind() //绑定WEBUI masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort context.system.scheduler.schedule(0 millis, WORKER_TIM

Spark分析之Master、Worker以及Application三者之间如何建立连接

Master.preStart(){ webUi.bind() context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) //定时任务检测是否有DEAD WORKER需要移除 case CheckForWorkerTimeOut => { timeOutDeadWorkers() } /** Check for, and remove, any timed-out

Spark分析之Worker

override def preStart() { webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) webUi.bind() //创建并绑定UI registerWithMaster() //注册到Master } def tryRegisterAllMasters() { for (masterUrl <- masterUrls) { logInfo("Connecting to master " + master

Spark分析之Standalone运行过程分析

一.集群启动过程--启动Master $SPARK_HOME/sbin/start-master.sh start-master.sh脚本关键内容: spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT 日志信息:$SPARK_HOME/logs/ 14/0

[大数据从入门到放弃系列教程]第一个spark分析程序

文章施工中,由于部分网站会在我还没有写完就抓取到这篇文章,导致你看到的内容不完整,请点击这里: 或者复制访问 http://www.cnblogs.com/blog5277/p/8580007.html 来查看更完整的内容 [大数据从入门到放弃系列教程]第一个spark分析程序 原文链接:http://www.cnblogs.com/blog5277/p/8580007.html 原文作者:博客园--曲高终和寡 *********************分割线******************

Spark 分析网站排名热度

需求: / 解决是一个各个子模块内的热度排名--> 排名得用sortBy ---> (可能就是简单的排序,或者是二次排序) ---> // 前面有一个wordCount---> 算出次数出来  --> 考虑什么作为key //算的一个网站下面,每个子模块下面的网页热度前2名 : // 算的一个网站下面,每个子模块下面的网页热度前2名 --> 每个子模块下面的网页的次数的前2名 // 在实际开发中,真正代码时间可能只占20-30% ,其他时间都在理解需求,想思路 impo