spark基础知识三

主要围绕spark的底层核心抽象RDD和原理进行理解。主要包括以下几个方面

  1. RDD弹性分布式数据集的依赖关系
  2. RDD弹性分布式数据集的lineage血统机制
  3. RDD弹性分布式数据集的缓存机制
  4. spark任务的DAG有向无环图的构建
  5. spark任务如何划分stage
  6. spark任务的提交和调度流程

1. RDD的依赖关系

  • RDD和它依赖的父RDD的关系有两种不同的类型
  • 窄依赖(narrow dependency)和宽依赖(wide dependency)
    • 窄依赖

      • 窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用

        • 总结:窄依赖我们形象的比喻为独生子女
        哪些算子操作是窄依赖:
            map/flatMap/filter/union等等
            所有的窄依赖不会产生shuffle
    • 宽依赖
      • 宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition

        • 总结:宽依赖我们形象的比喻为超生
        哪些算子操作是宽依赖:
            reduceByKey/sortByKey/groupBy/groupByKey/join等等
            所有的宽依赖会产生shuffle
    • 补充说明
      由上图可知,join分为宽依赖和窄依赖,如果RDD有相同的partitioner,那么将不会引起shuffle,这种join是窄依赖,反之就是宽依赖

2. lineage(血统)

  • RDD只支持粗粒度转换

    • 即只记录单个块上执行的单个操作。
  • 将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区
  • RDD的Lineage会记录RDD的元数据信息和转换行为,lineage保存了RDD的依赖关系,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

3. RDD的缓存机制

3.1 什么是rdd的缓存

可以把一个rdd的数据缓存起来,后续有其他的job需要用到该rdd的结果数据,可以直接从缓存中获取得到,避免了重复计算。缓存是加快后续对该数据的访问操作。

3.2 如何对rdd设置缓存

  • RDD通过persist方法或cache方法可以将前面的计算结果缓存。

    • 但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

  • 通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。

  • 使用演示
val rdd1=sc.textFile("/words.txt")
val rdd2=rdd1.flatMap(_.split(" "))
rdd2.cache
rdd2.collect
?
val rdd3=rdd2.map((_,1))
rdd3.persist(缓存级别)
rdd3.collect

3.3 cache和persist区别

  • 重点

    • 例如

      • 简述下如何对RDD设置缓存,以及它们的区别是什么?
        对RDD设置缓存成可以调用rdd的2个方法: 一个是cache,一个是persist
    调用上面2个方法都可以对rdd的数据设置缓存,但不是立即就触发缓存执行,后面需要有action,才会触发缓存的执行。
    ?
    cache方法和persist方法区别:
        cache:   默认是把数据缓存在内存中,其本质就是调用persist方法;
        persist:可以把数据缓存在内存或者是磁盘,有丰富的缓存级别,这些缓存级别都被定义在StorageLevel这个object中。

3.4 什么时候设置缓存

  • 1、某个rdd的数据后期被使用了多次

如上图所示的计算逻辑:
(1)当第一次使用rdd2做相应的算子操作得到rdd3的时候,就会从rdd1开始计算,先读取HDFS上的文件,然后对rdd1 做对应的算子操作得到rdd2,再由rdd2计算之后得到rdd3。同样为了计算得到rdd4,前面的逻辑会被重新计算。
(2)默认情况下多次对一个rdd执行算子操作, rdd都会对这个rdd及之前的父rdd全部重新计算一次。 这种情况在实际开发代码的时候会经常遇到,但是我们一定要避免一个rdd重复计算多次,否则会导致性能急剧降低。
?
总结:
可以把多次使用到的rdd,也就是公共rdd进行持久化,避免后续需要,再次重新计算,提升效率。

  • 2、为了获取得到一个rdd的结果数据,经过了大量的算子操作或者是计算逻辑比较复杂

    • 总之某个rdd的数据来之不易
val rdd2=rdd1.flatMap(函数).map(函数).reduceByKey(函数).xxx.xxx.xxx.xxx.xxx

3.5 清除缓存数据

  • 1、自动清除

    一个application应用程序结束之后,对应的缓存数据也就自动清除
  • 2、手动清除
    调用rdd的unpersist方法

4. RDD的checkpoint机制

4.1 checkpoint概念

  • 我们可以对rdd的数据进行缓存,保存在内存或者是磁盘中。

    • 后续就可以直接从内存或者磁盘中获取得到,但是它们不是特别安全。
    • cache
      它是直接把数据保存在内存中,后续操作起来速度比较快,直接从内存中获取得到。但这种方式很不安全,由于服务器挂掉或者是进程终止,会导致数据的丢失。
    • persist
      它可以把数据保存在本地磁盘中,后续可以从磁盘中获取得到该数据,但它也不是特别安全,由于系统管理员一些误操作删除了,或者是磁盘损坏,也有可能导致数据的丢失。
  • checkpoint(检查点)
    它是提供了一种相对而言更加可靠的数据持久化方式。它是把数据保存在分布式文件系统,
    比如HDFS上。这里就是利用了HDFS高可用性,高容错性(多副本)来最大程度保证数据的安全性。

4.2 如何设置checkpoint

  • 1、在hdfs上设置一个checkpoint目录

    sc.setCheckpointDir("hdfs://node1:9000/checkpoint") 
  • 2、对需要做checkpoint操作的rdd调用checkpoint方法
    val rdd1=sc.textFile("/words.txt")
    rdd1.checkpoint
    val rdd2=rdd1.flatMap(_.split(" ")) 
  • 3、最后需要有一个action操作去触发任务的运行
    rdd2.collect

4.3 cache、persist、checkpoint三者区别

  • cache和persist

    • cache默认数据缓存在内存中
    • persist可以把数据保存在内存或者磁盘中
    • 后续要触发 cache 和 persist 持久化操作,需要有一个action操作
    • 它不会开启其他新的任务,一个action操作就对应一个job
    • 它不会改变rdd的依赖关系,程序运行完成后对应的缓存数据就自动消失
  • checkpoint
    • 可以把数据持久化写入到hdfs上
    • 后续要触发checkpoint持久化操作,需要有一个action操作,后续会开启新的job执行checkpoint操作
    • 它会改变rdd的依赖关系,后续数据丢失了不能够在通过血统进行数据的恢复。
    • 程序运行完成后对应的checkpoint数据就不会消失
   sc.setCheckpointDir("/checkpoint")
   val rdd1=sc.textFile("/words.txt")
   rdd1.cache
   rdd1.checkpoint
   val rdd2=rdd1.flatMap(_.split(" "))
   rdd2.collect

   checkpoint操作要执行需要有一个action操作,一个action操作对应后续的一个job。该job执行完成之后,它会再次单独开启另外一个job来执行 rdd1.checkpoint操作。

   对checkpoint在使用的时候进行优化,在调用checkpoint操作之前,可以先来做一个cache操作,缓存对应rdd的结果数据,后续就可以直接从cache中获取到rdd的数据写入到指定checkpoint目录中

5. DAG有向无环图生成

5.1 DAG是什么

  • DAG(Directed Acyclic Graph)叫做有向无环图(有方向,无闭环,代表着数据的流向),原始的RDD通过一系列的转换就形成了DAG。
  • 下图是基于单词统计逻辑得到的DAG有向无环图

6. DAG划分stage

6.1 stage是什么

  • 一个Job会被拆分为多组Task,每组任务被称为一个stage
  • stage表示不同的调度阶段,一个spark job会对应产生很多个stage
    • stage类型一共有2种

      • ShuffleMapStage

        • 最后一个shuffle之前的所有变换叫ShuffleMapStage

          • 它对应的task是shuffleMapTask
      • ResultStage
        • 最后一个shuffle之后的操作叫ResultStage,它是最后一个Stage。

          • 它对应的task是ResultTask

6.2 为什么要划分stage

根据RDD之间依赖关系的不同将DAG划分成不同的Stage(调度阶段)
对于窄依赖,partition的转换处理在一个Stage中完成计算
对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,
?
由于划分完stage之后,在同一个stage中只有窄依赖,没有宽依赖,可以实现流水线计算,
stage中的每一个分区对应一个task,在同一个stage中就有很多可以并行运行的task。

6.3 如何划分stage

  • 划分stage的依据就是宽依赖
(1) 首先根据rdd的算子操作顺序生成DAG有向无环图,接下里从最后一个rdd往前推,创建一个新的stage,把该rdd加入到该stage中,它是最后一个stage。
?
(2) 在往前推的过程中运行遇到了窄依赖就把该rdd加入到本stage中,如果遇到了宽依赖,就从宽依赖切开,那么最后一个stage也就结束了。
?
(3) 重新创建一个新的stage,按照第二个步骤继续往前推,一直到最开始的rdd,整个划分stage也就结束了

6.4 stage与stage之间的关系

划分完stage之后,每一个stage中有很多可以并行运行的task,后期把每一个stage中的task封装在一个taskSet集合中,最后把一个一个的taskSet集合提交到worker节点上的executor进程中运行。
?
rdd与rdd之间存在依赖关系,stage与stage之前也存在依赖关系,前面stage中的task先运行,运行完成了再运行后面stage中的task,也就是说后面stage中的task输入数据是前面stage中task的输出结果数据。

7. spark的任务调度

(1) Driver端运行客户端的main方法,构建SparkContext对象,在SparkContext对象内部依次构建DAGScheduler和TaskScheduler
?
(2) 按照rdd的一系列操作顺序,来生成DAG有向无环图
?
(3) DAGScheduler拿到DAG有向无环图之后,按照宽依赖进行stage的划分。每一个stage内部有很多可以并行运行的task,最后封装在一个一个的taskSet集合中,然后把taskSet发送给TaskScheduler
?
(4)TaskScheduler得到taskSet集合之后,依次遍历取出每一个task提交到worker节点上的executor进程中运行。
?
(5)所有task运行完成,整个任务也就结束了

8. spark的运行架构

(1) Driver端向资源管理器Master发送注册和申请计算资源的请求
?
(2) Master通知对应的worker节点启动executor进程(计算资源)
?
(3) executor进程向Driver端发送注册并且申请task请求
?
(4) Driver端运行客户端的main方法,构建SparkContext对象,在SparkContext对象内部依次构建DAGScheduler和TaskScheduler
?
(5) 按照客户端代码洪rdd的一系列操作顺序,生成DAG有向无环图
?
(6) DAGScheduler拿到DAG有向无环图之后,按照宽依赖进行stage的划分。每一个stage内部有很多可以并行运行的task,最后封装在一个一个的taskSet集合中,然后把taskSet发送给TaskScheduler
?
(7)TaskScheduler得到taskSet集合之后,依次遍历取出每一个task提交到worker节点上的executor进程中运行
?
(8)所有task运行完成,Driver端向Master发送注销请求,Master通知Worker关闭executor进程,Worker上的计算资源得到释放,最后整个任务也就结束了。

9. 基于wordcount程序剖析spark任务的提交、划分、调度流程

原文地址:https://www.cnblogs.com/lojun/p/11632695.html

时间: 2024-11-08 21:24:10

spark基础知识三的相关文章

最全的Spark基础知识解答

最全的Spark基础知识解答 时间:2016-12-12 12:00:50      阅读:19      评论:0      收藏:0      [点我收藏] 原文:http://www.cnblogs.com/sanyuanempire/p/6163732.html 一. Spark基础知识 1.Spark是什么? UCBerkeley AMPlab所开源的类HadoopMapReduce的通用的并行计算框架. dfsSpark基于mapreduce算法实现的分布式计算,拥有HadoopMa

20_Shell语言———VIM编辑器基础知识三之窗口属性定制、配置文件及查找替换功能

Vim编辑器可以让用户按照需求来定制一些使用属性. 一.窗口属性定义 1)显示行号 行号不是内容,只是用来帮助用户确认文本所在的行.在vim编辑器中,如果要显示行号,可以在末行模式下输入: set number 如果想关闭,则可以在功能名称前面加上no,即: set nonumber 命令可以被简写,如set number 可以简写为 set nu:set nonumber 可以简写为 set nonu. 注意,上述设定仅对当前vim的进程有效,一旦当前进程关闭,这些设定就会失效,如果要使设定永

计算机科学基础知识(三)静态库和静态链接

三.将relocatable object file静态链接成可执行文件 将relocatable object file链接成可执行文件分成两步,第一步是符号分析(symbol resolution),第二步是符号重新定位(Relocation).本章主要描述这两个过程,为了完整性,静态库的概念也会在本章提及. 1.为什么会提出静态库的概念? 程序逻辑有共同的需求,例如数学库.字符串库等,如果每个程序员在撰写这些代码逻辑的时候都需要自己重新写那么该是多么麻烦的事情,而且容易出错,如果有现成的,

Dapper基础知识三

在下刚毕业工作,之前实习有用到Dapper?这几天新项目想用上Dapper,在下比较菜鸟,这块只是个人对Dapper的一种总结. Dapper,当项目在开发的时候,在没有必要使用依赖注入的时候,如何做到对项目的快速开发这里对Dapper做一个小的进阶. 结合上一篇的博客,就可以使用了. public class Demo { public string name { get; set; } public string DapperTest { get; set; } } public class

spark基础知识

1.Spark是什么? UCBerkeley AMPlab所开源的类HadoopMapReduce的通用的并行计算框架. dfsSpark基于mapreduce算法实现的分布式计算,拥有HadoopMapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法. 2.Spark与Hadoop的对比(Spark的优势) 1.Spark的中间数据放到内存

【基础知识三】线性模型

一.基本形式 通过属性的线性组合来进行预测, 许多非线性模型可以在线性模型的基础上,引入层级结构或高维映射而得. 二.线性回归 最小二乘法:求解ω和b: 多元线性回归:样本由多个属性描述,即x为多维向量: 若矩阵不满秩产生多个解,解决方法:引入正则化项: 三.对数/逻辑线性回归 广义线性模型: g(.)条件:连续且充分光滑(单调可微) 为了预测值连续,引入Sigmoid函数 得到, 极大似然估计:求解ω和b 四.线性判别分析LDA 也叫"Fisher判别" 将样例投影到一条直线上,使同

Spark入门三部曲之第一步Spark基础知识

Spark运行环境 Spark 是Scala写的, 运行在JVM上.所以运行环境是Java6或者以上. 如果想要使用 Python API,需要安装Python 解释器2.6版本或者以上. 目前Spark(1.2.0版本) 与Python 3不兼容. Spark下载 下载地址:http://spark.apache.org/downloads.html,选择Pre-built for Hadoop 2.4 and later 这个包,点击直接下载,这会下载一个spark-1.2.0-bin-ha

KnockoutJS基础知识(三)

对于knockoutJS来讲,模板绑定和Mapping插件绑定是十分重要的功能,虽然模板绑定在我工作中用的及其少,但模板绑定的重要性不可忽视,在其他前端框架中,如Angular.Vue等等,模板存在的意义十分重要,Mapping插件使得我们能够脱离手工绑定,及其方便我们快速绑定达到预期效果. KnockoutJS模型绑定更多用法:https://knockoutjs.com/documentation/template-binding.html 本文地址:https://www.cnblogs.

3. K线基础知识三

1. 阴线 证券市场上指开盘价高于收盘价的K线,K线图上一般用淡蓝色标注,表示股价下跌,当收盘价低于开盘价,也就是股价走势呈下降趋势时,我们称这种形态的K线为阴线. 中间部分实体为蓝色,此时,上影线的长度表示最高价和开盘价之间的价差.实体的长短代表开盘价比收盘价高出的幅度.下影线的长度则有收盘价和最高价之间的价差大小所决定. 2. 小阴星 小阴星的分时走势图与小阳星相似,只是收盘价格略低于开盘价格.表明行情疲软,发展方向不明. 3. 小阴线 表示空方呈打压态势,但力度不大. 4. 光脚阴线 光脚