大数据技术之_19_Spark学习_02_Spark Core 应用解析小结

1、RDD 全称 弹性分布式数据集 Resilient Distributed Dataset
它就是一个 class。

abstract class RDD[T: ClassTag](    @transient private var _sc: SparkContext,    @transient private var deps: Seq[Dependency[_]]  ) extends Serializable with Logging {

继承了 Serializable 和具有 Logging 的特质,为什么要Serializable?答:因为不同的 RDD 之间需要进行转化(序列化:数据转化成二进制,反序列:化二进制转化为数据)。

2、RDD 其实是 spark 为了减少用户对于不同数据结构之间的差异而提供的数据封装,为用户提供了很多数据处理的操作。

3、RDD 三个特点
  3.1、不可分,在 RDD 上调用转换算子,会生成一个新的 RDD,不会更改原 RDD 的数据结构。
  3.2、可分区,RDD 的数据可以根据配置分成多个分区,每个分区都被一个 Task 任务去处理,可以认为分区数就是并行度。
  3.3、弹性:
    3.3.1、存储的弹性,RDD 的数据可以在内存和磁盘进行自动切换,对用户透明。
    3.3.2、计算的弹性,RDD 的计算之间会有重试机制,避免由于网络等原因导致的任务失败。
    3.3.3、容错的弹性,RDD 可以通过血统机制来进行 RDD 的恢复。
    3.3.4、分区的弹性,可以根据需求来动态改变 RDD 分区的分区数,也就是动态改变了并行度。

4、Spark 到底做了什么?


简言之:从外部空间将数据加载到 Spark,对数据进行转换、缓存最后将数据通过行动操作保存到外部空间。

5、RDD 两种处理数据的方式
RDD 有两种处理数据的方式,一种叫转换操作【一个 RDD 调用该方法后返回一个 RDD】,另外一种叫行动操作【一个 RDD 调用该方法后返回一个标量或者直接将数据保存到外部空间】。

6、RDD 是懒执行的,如果没有行动操作出现,所有的转换操作都不会执行。

转换操作:1、def map[U: ClassTag](f: T => U): RDD[U]      映射,将一种类型的数据转换成为另外一种类型的数据。2、def filter(f: T => Boolean): RDD[T]          返回满足条件的数据。3、def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]     将一个数据结构转换成为一个可迭代的数据结构,然后将数据压平。

4、def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]                    对于每一个分区执行一次函数,它的执行效率要比 map 高。5、def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]    类似于 mapPartitions,但 func 带有一个整数参数表示分片的索引值。

6、def sample(withReplacement: Boolean,fraction: Double,seed: Long = Utils.random.nextLong): RDD[T]     对 RDD 进行采样,主要用于观察大数据集的分布情况。

7、def union(other: RDD[T]): RDD[T]             和另外一个 RDD 取并集。8、def intersection(other: RDD[T]): RDD[T]      和另外一个 RDD 取交集。9、def distinct(numPartitions: Int)             对原 RDD 进行去重后返回一个新的 RDD。

10、def partitionBy(partitioner: Partitioner): RDD[(K, V)]      对 KV 结构 RDD 进行重新分区。11、def reduceByKey(func: (V, V) => V): RDD[(K, V)]             返回值 V 的数据类型必须和输入一样。先预聚合再聚集。12、def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]     将相同 Key 的 value 进行聚集。

13、def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]    (1) 后面三个函数的逻辑是针对某一个 Key 的聚集来起作用。    (2) createCombiner 每个分区都有,当遇到新 Key 的时候调用,产生一个新的数据结构。    (3) mergeValue 每个分区都有,当遇到旧 Key 的时候调用,将当前数据合并到数据结构中。    (4) mergeCombiners 这个是全局所有,合并所有分区中过来的数据。

14、def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]是 combineBykey 的简化操作,zeroValue 类似于 createCombiner, seqOp 类似于 mergeValue, combOp 类似于 mergeCombiner。

15、def foldByKey(zeroValue: V, partitioner: Partitioner) (func: (V, V) => V): RDD[(K, V)]      注意:V 的类型不能改变。

16、def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]  对 KV 结构 RDD 进行排序(默认升序),K 必须实现 trait Ordering[T],复写 compare 方法,返回一个按照 key 进行排序的 (K,V) 的 RDD。17、def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]  sortBy 使用 func 产生的 Key 来做比较。

18、def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]     和另外的 RDD 进行 JOIN。19、def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]      类似于两个 RDD 分别做 groupByKey 然后再 全JOIN。

20、def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]      笛卡尔积。21、def pipe(command: String): RDD[String]      对于每个分区,支持使用外部脚本比如 shell、perl 等处理分区内的数据。

22、def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T]      改变分区数。23、def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]       重新分区,所有数据全部网络混洗。24、def repartitionAndSortWithinPartitions(partitioner)     在重新分区的过程中会进行排序,如果重新分区后还要进行 sortBy 或者 sorkByKey 操作,那么直接使用该算子。性能比 repartition 要高。

25、def glom(): RDD[Array[T]]       将每一个分区中的所有数据转换为一个 Array 数组,形成新的 RDD。

26、def mapValues[U](f: V => U): RDD[(K, U)]        只对 KV 结构中 value 数据进行映射。value 可以改变类型。

27、def subtract(other: RDD[T]): RDD[T]  求差集

----------------------------------------------------------------------------------------------------------

行动操作:1、def reduce(f: (T, T) => T): T        规约某个 RDD2、collect()    将数据返回到 Driver,是以数组的形式返回数据集的所有元素(简单测试用,生产环境中不用)3、count()      返回 RDD 中的元素个数4、first()      返回第一个元素5、take(n)      返回前 n 个元素6、takeSample(withReplacement, num, [seed])      采样,返回 Array 数组7、takeOrdered (n)      返回排序后的前几个元素,如果需要倒序,那么可以利用重写 Ordering 来做

8、aggregate (zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U)  9、fold(zeroValue)(func)    aggregate 的简化操作

10、saveAsTextFile(path)        以文本的方式保存到HDFS兼容的文件系统11、saveAsSequenceFile(path)    以 SequenceFile 形式来存文件12、saveAsObjectFile(path)      以 ObjectFile 来存文件

13、countByKey()        返回 Map 结构,获取每一个 key 的数量14、foreach(func)       在数据集上的每一个元素运行 func 函数

7、向 RDD 操作传递函数注意
  传递函数的时候需要注意:如果你的 RDD 转换操作中的函数使用到了类的方法或者变量,那么你需要注意该类可能需要能够序列化。即该 class 需要继承 java.io.Serializable 接口,或者可以将属性赋值为本地变量来防止整个对象的传输。

8、RDD的依赖关系
  窄依赖(narrow dependency):子的父依赖只有一个,出度1。
  宽依赖(wide dependency):子的父依赖有多个,出度大于2。
  RDD 之间的前后依赖关系有宽依赖和窄依赖之分,主要通过依赖的不同来划分 Stage(阶段)。
  区别:是否要进行 shuffle 阶段(即合并分区的过程)。

9、RDD 的任务切分
  Application:一个能够打成 jar 包的 Spark 程序就是一个应用。里面应该有一个 SparkContext。
  Job:一个应用中每一个 Action 操作所涉及到的所有转换叫一个 Job。
  Stage:一个 Job 根据 RDD 之间的宽窄依赖关系划分为多个 Stage,Stage 之间是根据依赖关系来逐个执行的。
  Task: 一个 Stage 运行的时候,RDD 的每一个分区都会被一个 Task 去处理,也可以认为是并行度。

10、RDD 的运行规划
  写代码我们都是从前往后写,划分 Stage 是从后往前划分,步骤如下:
  (1)首先先把所有代码划分成为一个 Stage,然后该 Stage 入栈。
  (2)从最后的代码往前走,如果发现 RDD 之间的依赖关系是宽依赖,那么将宽依赖前面的所有代码划分为第二个 Stage,然后该 Stage 入栈。
  (3)根据2规则继续往前走,直到代码开头。

11、RDD 持久化
  RDD 持久化:每一个节点都将把计算的分片结果保存在内存中,并在对此 RDD 或衍生出的 RDD 进行的其他动作中重用。(防止重新计算浪费资源,因为 RDD 在没有持久化的时候默认计算的分片结果是不保存的,如果需要那么就要根据血统关系来重新计算。)
  持久化也是懒执行的,持久化有两个操作:persist(StorageLevel),persist() 默认把数据以序列化的形式缓存在 jvm 的堆空间中;另外一个是 cache,cache 就相当于 MEMORY_ONLY 的 persist。
  使用步骤:

// 设置缓存级别:MEMORY_ONLY, MEMORY_ONLY_SERdata.persist(StorageLevel.DISK_ONLY)// 清除缓存data.unpersist// data.unpersist(blocking=true)持久化级别按照:存储的位置(磁盘、内存、非堆内存)、是否序列化、存储的份数(1,2)进行划分

12、RDD 检查点机制
  检查点也是一种 RDD 的持久化机制,只不过检查点将 RDD 的数据放在非易失存储上,比如 HDFS,存放之后会将 RDD 的依赖关系删除,主要是因为检查点机制认为该 RDD 不会丢失。
如何用呢?步骤如下:
  (1)通过 sc.setCheckPointDir("hdfs://hadoop102:9000/checkpoint") 来设置一个 HDFS 兼容的文件系统目录
  (2)通过在RDD.checkPoint() 来启用检查点
  (3)RDD 创建之初就要启用检查点,否则不成功
  注意:整个 checkpoint 的读取是用户透明的(即用户看不到,是后台执行的)。

13、键值对 RDD 的数据分区
  hash 分区:对于给定的 key,计算其 hashCode,并除于分区的个数取余,容易造成数据倾斜。
  range 分区:采用的是水塘抽样算法,将将一定范围内的数映射到某一个分区内,避免了一个数据倾斜的状态。
  主要有 Hash 和 Range 两种,Range 分区通过水塘抽样算法来保证每一个分区的数据都比较均匀。
  可以通过继承 Partitoner 来实现自定义的分区器,复写2个方法。
  scala 获取分区数的元素:res3.mapPartitionsWithIndex((index, iter) => Iterator(index + "---" + iter.mkString(" , "))).collect

14、RDD 累加器
  RDD 累加器:线程安全,不是针对某个节点或者某个 RDD 的,它的对象是整个 Spark,类似于 hadoop 的累加器。
  RDD 累加器是提供一个类似于共享变量的东西,能够在 Driver 的数据空间定义,然后在 Executor 的数据空间进行更新,然后在 Driver 的数据空间进行正确访问的机制。
  14.1、使用系统默认提供的累加器(没啥用),步骤如下:
  (1)申请一个 val blanklines = sc.accumulator(0)
  (2)在转换操作中使用累加器要注意,可以回出现重复累加的情况,所以最好是在行动操作中使用。使用的时候不能够访问只能够更新,方法是:blanklines.add()。
  (3)在 Driver 程序中去访问,方法是: blanklines.value。
  14.2、自定义累加器,主要操作如下:
  (1)需要继承 AccumulatorV2 这个虚拟类,然后提供类型参数:1) 增加值的类型参数,2) 输出值的类型参数。然后重写5个方法。
  (2)需要创建一个 SparkContext
  (3)需要创建一个自定义累加器实例
  (4)需要通过 SparkContext 去注册你的累加器, sc.register(accum, "logAccum")
  (5)需要在转换或者行动操作中使用累加器。
  (6)在Driver中输出累加器的结果。

val conf = new SparkConf().setMaster("local[*]")setAppName("LogAccumulator")val sc = new SparkContext(conf)val accum = new LogAccumulatorsc.register(accum, "logAccum")

accum.add(x)

15、广播变量
  (1)如果转换操作中使用到了 Driver 程序中定义的变量,如果该变量不是通过广播变量来进行声明的,那么每一个分区都会拷贝该变量一份,会造成大量的网络数据传输。(广播传输,带宽浪费严重!)
  (2)如果使用广播变量来声明该共享变量,那么只会在每一个 Executor 中存在一次拷贝。(因为每一个 Executor 中有成千上万个分区!)
  (3)广播变量主要用来分发只读的小数据集,比如 300M 左右。
  (4)怎么使用广播变量呢?步骤如下:
    1. 先声明 val broadcastVar = sc.broadcast(Array(1, 2, 3))
    2. 再访问 broadcastVar.value

16、Spark Core 数据读取与存储的主要方式
  (1)文本文件的输入输出:textFile 和 saveAsTextFile,注意:在写出 text 文件的时候,每一个 partition 会单独写出,文件系统支持所有和 Hadoop 文件系统兼容的文件系统。
  (2)JSON 文件或者 CSV 文件:
    这种有格式的文件的输入和输出还是通过文本文件的输入和输出来支持的,Spark Core 没有内置对 JSON 文件和 CSV 文件的解析和反解析功能,这个解析功能是需要用户自己根据需求来定制的。
    注意:JSON 文件的读取如果需要多个 partition 来读,那么 JSON 文件一般一行是一个 json。如果你的 JSON 是跨行的,那么需要整体读入所有数据,并整体解析。
  (3)Sequence 文件:Spark 有专门用来读取 SequenceFile 文件的接口。可以直接使用 sequenceFile[keyClass, valueClass](path) 进行读取。注意:针对于 HDFS 中的文件 block 数为 1,那么 Spark 设定了最小的读取 partition 数为 2。如果 HDFS 中的文件 block 数为大于 1,比如 block 数为 5,那么 Spark 的读取 partition 数为 5。(因为 Spark 本质上属于内存计算层,它的输入输出很大一部分依赖于 HDFS 文件系统。)
  (4)ObjectFile 文件:sc.saveAsObjectFile 输出的是对象的形式
    1. 对于 ObjectFile 它的读取和保存使用了读取和保存 SequenceFile 的 API,也最终调用了 hadoop 的 API。
    2. ObjectFile 的读取使用 objectFile 进行。
    3. ObjectFile 的输出直接使用 saveAsObjectFile 来进行输出。
    4. 需要注意的是:在读取 ObjectFile 的时候需要指定对象的类型,而并不是 K-V 的类型。
  (5)HadoopAPI 的读取和输入:
    读取:newApiHadoopFile 和 newApiHadoopRDD 两个方法,最终都是调用 newApiHadoopRDD 来进行实现。
    输出:saveAsNewApiHadoopFile 和 saveAsNewApiHadoopDataset 两个方法,最终都是调用 saveAsNewApiHadoopDataset 这个方法进行实现。
  (6)关系型数据库的输入输出:JdbcRDD 里面包括了驱动,数据库用户名/密码
    1. 对于关系型数据库的读取使用 JdbcRDD 来进行实现,JdbcRDD 需要依次传入 sparkContext、获取 JDBC Connection 的无参方法、查询数据库的 SQL 语句,id 的下界、id 的上界、分区数、提供解析 ResultSet 的函数。
    2. 对于关系型数据库的输出,直接采用 jdbc 执行 insert 语句或者 update 语句进行实现。

原文地址:https://www.cnblogs.com/chenmingjun/p/10777205.html

时间: 2024-08-06 19:31:59

大数据技术之_19_Spark学习_02_Spark Core 应用解析小结的相关文章

大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Core 实例练习

第1章 RDD 概念1.1 RDD 为什么会产生1.2 RDD 概述1.2.1 什么是 RDD1.2.2 RDD 的属性1.3 RDD 弹性1.4 RDD 特点1.4.1 分区1.4.2 只读1.4.3 依赖1.4.4 缓存1.4.5 CheckPoint第2章 RDD 编程2.1 RDD 编程模型2.2 RDD 创建2.2.1 由一个已经存在的 Scala 集合创建,即集合并行化(测试用)2.2.2 由外部存储系统的数据集创建(开发用)2.3 RDD 编程2.3.1 Transformatio

大数据技术之_19_Spark学习_06_Spark 源码解析小结

========== Spark 通信架构 ========== 1.spark 一开始使用 akka 作为网络通信框架,spark 2.X 版本以后完全抛弃 akka,而使用 netty 作为新的网络通信框架.最主要原因:spark 对 akka 没有维护,需要 akka 更新,spark 的发展受到了 akka 的牵制,akka 版本之间无法通信,即 akka 兼容性问题.2.RpcEnv:RPC 上下文环境,每个 Rpc 端点运行时依赖的上下文环境称之为 RpcEnv.类似于 SparkC

大数据技术之_19_Spark学习_05_Spark GraphX 应用解析小结

========== Spark GraphX 概述 ==========1.Spark GraphX是什么?  (1)Spark GraphX 是 Spark 的一个模块,主要用于进行以图为核心的计算还有分布式图的计算.  (2)GraphX 他的底层计算也是 RDD 计算,它和 RDD 共用一种存储形态,在展示形态上可以以数据集来表示,也可以图的形式来表示. 2.Spark GraphX 有哪些抽象?(1)顶点.   顶点的表示用 RDD[(VertexId, VD)] 来表示,(Verte

大数据技术之_19_Spark学习_05_Spark GraphX 应用解析 + Spark GraphX 概述、解析 + 计算模式 + Pregel API + 图算法参考代码 + PageRank 实例

第1章 Spark GraphX 概述1.1 什么是 Spark GraphX1.2 弹性分布式属性图1.3 运行图计算程序第2章 Spark GraphX 解析2.1 存储模式2.1.1 图存储模式2.1.2 GraphX 存储模式2.2 vertices.edges 以及 triplets2.2.1 vertices2.2.2 edges2.2.3 triplets2.3 图的构建2.3.1 构建图的方法2.3.2 构建图的过程2.4 计算模式2.4.1 BSP 计算模式2.4.2 图操作一

大数据技术之_16_Scala学习_11_客户信息管理系统+并发编程模型 Akka+Akka 网络编程-小黄鸡客服案例+Akka 网络编程-Spark Master Worker 进程通讯项目

第十五章 客户信息管理系统15.1 项目的开发流程15.2 项目的需求分析15.3 项目的界面15.4 项目的设计-程序框架图15.5 项目的功能实现15.5.1 完成 Customer 类15.5.2 完成显示主菜单和退出软件功能15.5.3 完成显示客户列表的功能15.5.4 完成添加客户的功能15.5.5 完成删除客户的功能15.5.6 完善退出确认功能15.5.7 完善删除确认功能15.5.8 完成修改客户的功能第十六章 并发编程模型 Akka16.1 Akka 的介绍16.2 Acto

大数据技术之_20_Elasticsearch学习_01_概述 + 快速入门 + Java API 操作 + 创建、删除索引 + 新建、搜索、更新删除文档 + 条件查询 + 映射操作

一 概述1.1 什么是搜索?1.2 如果用数据库做搜索会怎么样?1.3 什么是全文检索和 Lucene?1.4 什么是 Elasticsearch?1.5 Elasticsearch 的适用场景1.6 Elasticsearch 的特点1.7 Elasticsearch 的核心概念1.7.1 近实时1.7.2 Cluster(集群)1.7.3 Node(节点)1.7.4 Index(索引 --> 数据库)1.7.5 Type(类型 --> 表)1.7.6 Document(文档 -->

大数据技术之_03_Hadoop学习_02_入门_Hadoop运行模式+【本地运行模式+伪分布式运行模式+完全分布式运行模式(开发重点)】+Hadoop编译源码(面试重点)+常见错误及解决方案

第4章 Hadoop运行模式4.1 本地运行模式4.1.1 官方Grep案例4.1.2 官方WordCount案例4.2 伪分布式运行模式4.2.1 启动HDFS并运行MapReduce程序4.2.2 启动YARN并运行MapReduce程序4.2.3 配置历史服务器4.2.4 配置日志的聚集4.2.5 配置文件说明4.3 完全分布式运行模式(开发重点)4.3.1 虚拟机准备4.3.2 编写集群分发脚本xsync4.3.3 集群配置4.3.4 集群单点启动4.3.5 SSH无密登录配置4.3.6

大数据技术之_08_Hive学习_01_Hive入门+Hive安装、配置和使用+Hive数据类型

第1章 Hive入门1.1 什么是Hive1.2 Hive的优缺点1.2.1 优点1.2.2 缺点1.3 Hive架构原理1.4 Hive和数据库比较1.4.1 查询语言1.4.2 数据存储位置1.4.3 数据更新1.4.4 索引1.4.5 执行1.4.6 执行延迟1.4.7 可扩展性1.4.8 数据规模第2章 Hive安装.配置和使用2.1 Hive安装地址2.2 Hive安装部署2.3 将本地文件导入Hive案例2.4 MySql安装2.4.1 安装包准备2.4.2 安装MySql服务器2.

大数据技术之_08_Hive学习_04_压缩和存储(Hive高级)+ 企业级调优(Hive优化)

第8章 压缩和存储(Hive高级)8.1 Hadoop源码编译支持Snappy压缩8.1.1 资源准备8.1.2 jar包安装8.1.3 编译源码8.2 Hadoop压缩配置8.2.1 MR支持的压缩编码8.2.2 压缩参数配置8.3 开启Map输出阶段压缩8.4 开启Reduce输出阶段压缩8.5 文件存储格式8.5.1 列式存储和行式存储8.5.2 TextFile格式8.5.3 Orc格式8.5.4 Parquet格式8.5.5 主流文件存储格式对比实验8.6 存储和压缩结合8.6.1 修