七、rdd究竟是什么

RDD是个抽象类,定义了诸如map()、reduce()等方法,但实际上继承RDD的派生类一般只要实现两个方法:

  • def getPartitions: Array[Partition]
  • def compute(thePart: Partition, context: TaskContext): NextIterator[T]

getPartitions()用来告知怎么将input分片;

compute()用来输出每个Partition的所有行(行是我给出的一种不准确的说法,应该是被函数处理的一个单元);

◆ RDD的特点:

  1. 它是在集群节点上的不可变的、已分区的集合对象。
  2. 通过并行转换的方式来创建如(map, filter, join, etc)。
  3. 失败自动重建。
  4. 可以控制存储级别(内存、磁盘等)来进行重用。
  5. 必须是可序列化的。
  6. 是静态类型的。

a、分区

b、依赖(lineage)

c、函数

d、最佳位置(数据本地化)

e、分区策略

◆ RDD的好处

  1. RDD只能从持久存储或通过Transformations操作产生,相比于分布式共享内存(DSM)可以更高效实现容错,对于丢失部分数据分区只需根据它的lineage就可重新计算出来,而不需要做特定的Checkpoint。
  2. RDD的不变性,可以实现类Hadoop MapReduce的推测式执行。
  3. RDD的数据分区特性,可以通过数据的本地性来提高性能,这与Hadoop MapReduce是一样的。
  4. RDD都是可序列化的,在内存不足时可自动降级为磁盘存储,把RDD存储于磁盘上,这时性能会有大的下降但不会差于现在的MapReduce。

◆ RDD的存储与分区

  1. 用户可以选择不同的存储级别存储RDD以便重用。
  2. 当前RDD默认是存储于内存,但当内存不足时,RDD会spill到disk。
  3. RDD在需要进行分区把数据分布于集群中时会根据每条记录Key进行分区(如Hash 分区),以此保证两个数据集在Join时能高效。

◆ RDD的内部表示

在RDD的内部实现中每个RDD都可以使用5个方面的特性来表示:

  1. 分区列表(数据块列表)
  2. 计算每个分片的函数(根据父RDD计算出此RDD)
  3. 对父RDD的依赖列表
  4. 对key-value RDD的Partitioner【可选】
  5. 每个数据分片的预定义地址列表(如HDFS上的数据块的地址)【可选】

◆ RDD的存储级别

RDD根据useDisk、useMemory、deserialized、replication四个参数的组合提供了11种存储级别:


  1. val NONE = new StorageLevel(false, false, false)
  2. val DISK_ONLY = new StorageLevel(true, false, false)
  3. val DISK_ONLY_2 = new StorageLevel(true, false, false, 2)
  4. val MEMORY_ONLY = new StorageLevel(false, true, true)
  5. val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2)
  6. val MEMORY_ONLY_SER = new StorageLevel(false, true, false)
  7. val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2)
  8. val MEMORY_AND_DISK = new StorageLevel(true, true, true)
  9. val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2)
  10. val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false)
  11. val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)

◆ RDD定义了各种操作,不同类型的数据由不同的RDD类抽象表示,不同的操作也由RDD进行抽实现。

RDD的生成

◆ RDD有两种创建方式:

1、从Hadoop文件系统(或与Hadoop兼容的其它存储系统)输入(例如HDFS)创建。

2、从父RDD转换得到新RDD。

◆ 下面来看一从Hadoop文件系统生成RDD的方式,如:val file = spark.textFile("hdfs://..."),file变量就是RDD(实际是HadoopRDD实例),生成的它的核心代码如下:


  1. // SparkContext根据文件/目录及可选的分片数创建RDD, 这里我们可以看到Spark与Hadoop MapReduce很像
  2. // 需要InputFormat, Key、Value的类型,其实Spark使用的Hadoop的InputFormat, Writable类型。
  3. def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
  4. hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable],
  5. classOf[Text], minSplits) .map(pair => pair._2.toString) }
  6. // 根据Hadoop配置,及InputFormat等创建HadoopRDD
  7. new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)

◆ 对RDD进行计算时,RDD从HDFS读取数据时与Hadoop MapReduce几乎一样的:

RDD的转换与操作

◆ 对于RDD可以有两种计算方式:转换(返回值还是一个RDD)与操作(返回值不是一个RDD)。

◆ 转换(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。

◆ 操作(Actions) (如:count, collect, save等),Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。

时间: 2024-12-28 21:03:46

七、rdd究竟是什么的相关文章

http中的socket是怎么一回事

首先我们先看一下socket的定义:是在传输层和应用层中间的一个抽象层,是实现网络通信的接口. 那么什么是传输层,什么是应用层呢?网络又是怎样通信的呢?为了弄清这两个问题,我们需要弄清一个概念TCP/IP四层模型和OSI七层模型. OSI: Open System Interconnection的缩写,意为开放式系统互联. TCP/IP(Transmission Control Protocol/Internet Protocol)即传输控制协议/网间协议,定义了主机如何连入因特网及数据如何在它

Spark3000门徒第七课Spark运行原理及RDD解密总结

今晚听了王家林老师的第七课Spark运行原理及RDD解密,课后作业是:spark基本原理,我的总结如下: 1 spark是分布式 基于内存 特别适合于迭代计算的计算框架 2 mapReduce就两个阶段map和reduce,而spark是不断地迭代计算,更加灵活更加强大,容易构造复杂算法. 3 spark不能取代hive,hive做数据仓库存储,spark sql只是取代hive的计算引擎 4 spark中间数据可以在内存也可以在磁盘 5 partition是一个数据集合 6 注意:初学者执行多

(七十一)关于UITableView退出崩溃的问题和滚动究竟部的方法

[TableView退出崩溃的问题] 近期在使用TableView时偶然发如今TableView中数据较多时,假设在滚动过程中退出TableView到上一界面.会引起程序的崩溃.经过网上查阅和思考我发现这样的情况出如今一个UIView控制器拥有一个TableView,TableView无法在UIView销毁前全然销毁,从而继续调用dataSource,而这时候UIView已经不可用了,会引发野指针错误. 避免方法非常easy,仅仅须要在UIView的dealloc方法中把dataSource设为

Spark技术内幕:究竟什么是RDD

RDD是Spark最基本,也是最根本的数据抽象.http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf 是关于RDD的论文.如果觉得英文阅读太费时间,可以看这篇译文:http://shiyanjun.cn/archives/744.html 本文也是基于这篇论文和源码,分析RDD的实现. 第一个问题,RDD是什么?Resilient Distributed Datasets(RDD,) 弹性分布式数据集.RDD是只读的.分区记录的

spark2.x由浅入深深到底系列七之RDD python api详解一

学习spark任何技术之前,请先正确理解spark,可以参考:正确理解spark 以下对RDD的三种创建方式.单类型RDD基本的transformation api.采样Api以及pipe操作进行了python api方面的阐述 一.RDD的三种创建方式 从稳定的文件存储系统中创建RDD,比如local fileSystem或者hdfs等,如下: """ 创建RDD的方法: 1: 从一个稳定的存储系统中,比如hdfs文件, 或者本地文件系统 """

第七篇:Spark SQL 源码分析之Physical Plan 到 RDD的具体实现

/** Spark SQL源码分析系列文章*/ 接上一篇文章Spark SQL Catalyst源码分析之Physical Plan,本文将介绍Physical Plan的toRDD的具体实现细节: 我们都知道一段sql,真正的执行是当你调用它的collect()方法才会执行Spark Job,最后计算得到RDD. [java] view plain copy lazy val toRdd: RDD[Row] = executedPlan.execute() Spark Plan基本包含4种操作

spark源码阅读笔记RDD(七) RDD的创建、读取和保存

Spark支持很多输入和输出源,同时还支持内建RDD.Spark本身是基于Hadoop的生态圈,它可以通过 Hadoop MapReduce所使用的InpoutFormat和OutputFormat接口访问数据.而且大部分的文件格式和存储系统 (HDFS,Hbase,S3等)都支持这种接口.Spark常见的数据源如下: (1) 文件格式和文件系统,也就是我们经常用的TXT,JSON,CSV等這些文件格式 (2)SparkSQL中的结构化数据源 (3)数据库与键值存储(Hbase和JDBC源) 当

七、spark核心数据集RDD

简介 spark RDD操作具体参考官网:http://spark.apache.org/docs/latest/rdd-programming-guide.html#overview RDD全称叫做Resilient Distributed Datasets,直译为弹性分布式数据集,是spark中非常重要的概念. 首先RDD是一个数据的集合,这个数据集合被划分成了许多的数据分区,而这些分区被分布式地存储在不同的物理机器当中,如图: 我们反过来想一下,RDD就是很多物理数据块的逻辑抽象.不仅如此

郑和七下西洋究竟为了啥?揭秘惊人真相

 http://weheartit.com/sipinan/collections/64544617-2014-12-25/ http://weheartit.com/jixieken/collections/64544633-2014-12-25/ http://weheartit.com/kefugan/collections/64544632-2014-12-25/ http://weheartit.com/sipinan/collections/64544680-2014-12-25