spark 笔记 6: RDD

了解RDD之前,必读UCB的论文,个人认为这是最好的资料,没有之一。
http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf  
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,* partitioned collection of elements that can be operated on in parallel. This class contains the
* basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,* Internally, each 
* [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value* pairs, such as `groupByKey` and `join`;* [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of* Doubles; and* [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that* can be saved as SequenceFiles.* These operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]* through implicit conversions when you `import org.apache.spark.SparkContext._`.
*RDD is characterized by five main properties:**  - A list of partitions*  - A function for computing each split*  - A list of dependencies on other RDDs*  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)*  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for*    an HDFS file)
* All of the scheduling and execution in Spark is done based on these methods, allowing each RDD* to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for* reading data from a new storage system) by overriding these functions. Please refer to the* [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details* on RDD internals.*/
abstract class RDD[T: ClassTag](    @transient private var sc: SparkContext,    @transient private var deps: Seq[Dependency[_]]  ) extends Serializable with Logging {
RDD是spark中最基础的数据表达形式,它的compute方法用来产生partition。由子类实现。
/** * :: DeveloperApi :: * Implemented by subclasses to compute a given partition. */@DeveloperApidef compute(split: Partition, context: TaskContext): Iterator[T]
RDD的persist是一个主要的功能,它负责将RDD以某个存储级别保留给后续的计算流程使用,是的迭代计算高效。
/** * Set this RDD‘s storage level to persist its values across operations after the first time * it is computed. This can only be used to assign a new storage level if the RDD does not * have a storage level set yet.. */def persist(newLevel: StorageLevel): this.type = {  // TODO: Handle changes of StorageLevel  if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {    throw new UnsupportedOperationException(      "Cannot change storage level of an RDD after it was already assigned a level")  }  sc.persistRDD(this)  // Register the RDD with the ContextCleaner for automatic GC-based cleanup  sc.cleaner.foreach(_.registerRDDForCleanup(this))  storageLevel = newLevel  this}
RDD可以设置本地化优先策略,这是在使用Hadoop做存储时提高性能的主要手段。
/** * Get the preferred locations of a partition (as hostnames), taking into account whether the * RDD is checkpointed. */final def preferredLocations(split: Partition): Seq[String] = {  checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {    getPreferredLocations(split)  }}
RDD可以转化为其他的RDD,map/flatMap/filter是三个最常用的转化方式
// Transformations (return a new RDD)
/** * Return a new RDD by applying a function to all elements of this RDD. */def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))

/** *  Return a new RDD by first applying a function to all elements of this *  RDD, and then flattening the results. */def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] =  new FlatMappedRDD(this, sc.clean(f))

/** * Return a new RDD containing only the elements that satisfy a predicate. */def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
注意,大部分时候RDD是推迟计算的,也就是在做transformation时,其实只是记录“如何做”,而真正的转化,是等到“Actions”来出发的。这样做的优势是使得串行化成为可能,这是spark性能高于hadoop的主要原因之一。
// Actions (launch a job to return a value to the user program)
/** * Applies a function f to all elements of this RDD. */def foreach(f: T => Unit) {  val cleanF = sc.clean(f)  sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))}

/** * Return an array that contains all of the elements in this RDD. */def collect(): Array[T] = {  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)  Array.concat(results: _*)}
/** * Reduces the elements of this RDD using the specified commutative and * associative binary operator. */def reduce(f: (T, T) => T): T = {  val cleanF = sc.clean(f)  val reducePartition: Iterator[T] => Option[T] = iter => {    if (iter.hasNext) {      Some(iter.reduceLeft(cleanF))    } else {      None    }  }  var jobResult: Option[T] = None  val mergeResult = (index: Int, taskResult: Option[T]) => {    if (taskResult.isDefined) {      jobResult = jobResult match {        case Some(value) => Some(f(value, taskResult.get))        case None => taskResult      }    }  }  sc.runJob(this, reducePartition, mergeResult)  // Get the final result out of our Option, or throw an exception if the RDD was empty  jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))}
/** * Return the number of elements in the RDD. */def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
/** * Returns the top K (largest) elements from this RDD as defined by the specified * implicit Ordering[T]. This does the opposite of [[takeOrdered]]. For example: * {{{ *   sc.parallelize(Seq(10, 4, 2, 12, 3)).top(1) *   // returns Array(12) * *   sc.parallelize(Seq(2, 3, 4, 5, 6)).top(2) *   // returns Array(6, 5) * }}} * * @param num the number of top elements to return * @param ord the implicit ordering for T * @return an array of top elements */def top(num: Int)(implicit ord: Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse)
RDD的checkpoint功能意义也很重大,因为它会将RDD存到可靠存储设备,所以在这个RDD之前的历史记录就可以不用记录了(因为这个RDD已经是可靠的,不需要更老的历史了)。对于RDD以来很长的应用,选择合适的checkpiont显得格外重要。
/** * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint * directory set with SparkContext.setCheckpointDir() and all references to its parent * RDDs will be removed. This function must be called before any job has been * executed on this RDD. It is strongly recommended that this RDD is persisted in * memory, otherwise saving it on a file will require recomputation. */def checkpoint() {  if (context.checkpointDir.isEmpty) {    throw new Exception("Checkpoint directory has not been set in the SparkContext")  } else if (checkpointData.isEmpty) {    checkpointData = Some(new RDDCheckpointData(this))    checkpointData.get.markForCheckpoint()  }}
这个调试函数会打印绝大部分的RDD的状态和信息。
/** A description of this RDD and its recursive dependencies for debugging. */def toDebugString: String = {
RDD的转换示意图:





来自为知笔记(Wiz)

时间: 2024-10-19 20:07:57

spark 笔记 6: RDD的相关文章

Spark笔记:RDD基本操作(上)

本文主要是讲解spark里RDD的基础操作.RDD是spark特有的数据模型,谈到RDD就会提到什么弹性分布式数据集,什么有向无环图,本文暂时不去展开这些高深概念,在阅读本文时候,大家可以就把RDD当作一个数组,这样的理解对我们学习RDD的API是非常有帮助的.本文所有示例代码都是使用scala语言编写的. Spark里的计算都是操作RDD进行,那么学习RDD的第一个问题就是如何构建RDD,构建RDD从数据来源角度分为两类:第一类是从内存里直接读取数据,第二类就是从文件系统里读取,当然这里的文件

spark 笔记 2: Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing

http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf ucb关于spark的论文,对spark中核心组件RDD最原始.本质的理解,没有比这个更好的资料了.必读. Abstract RDDs provide a restricted form of shared memory, based on coarse grained transformations rather than fine-grained updates to s

spark 中的RDD编程 -以下基于Java api

1.RDD介绍:     RDD,弹性分布式数据集,即分布式的元素集合.在spark中,对所有数据的操作不外乎是创建RDD.转化已有的RDD以及调用RDD操作进行求值.在这一切的背后,Spark会自动将RDD中的数据分发到集群中,并将操作并行化. Spark中的RDD就是一个不可变的分布式对象集合.每个RDD都被分为多个分区,这些分区运行在集群中的不同节点上.RDD可以包含Python,Java,Scala中任意类型的对象,甚至可以包含用户自定义的对象. 用户可以使用两种方法创建RDD:读取一个

(版本定制)第18课:Spark Streaming中空RDD处理及流处理程序优雅的停止

本期内容: 1. Spark Streaming中RDD为空处理 2. Streaming Context程序停止方式 Spark Streaming运用程序是根据我们设定的Batch Duration来产生RDD,产生的RDD存在partitons数据为空的情况,但是还是会执行foreachPartition,会获取计算资源,然后计算一下,这种情况就会浪费 集群计算资源,所以需要在程序运行的时候进行过滤,参考如下代码: package com.dt.spark.sparkstreamingim

spark版本定制十八:Spark Streaming中空RDD处理及流处理程序优雅的停止

本期内容: 1.Spark Streaming中RDD的空处理 2.StreamingContext程序的停止 一.Spark Streaming中RDD的空处理 案例代码: Scala代码: package com.dt.spark.sparkstreaming import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext} /** * 使用Scala开发集群运行的Sp

Spark算子:RDD基本转换操作(1)–map、flatMap、distinct

Spark算子:RDD基本转换操作(1)–map.flatMap.distinct 关键字:Spark算子.Spark RDD基本转换.map.flatMap.distinct map 将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素. 输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区. hadoop fs -cat /tmp/lxw1234/1.txt hello world hello spark hello hive //读取HDFS文件到RDD sca

Spark的核心RDD(Resilient Distributed Datasets弹性分布式数据集)

Spark的核心RDD (Resilient Distributed Datasets弹性分布式数据集)  原文链接:http://www.cnblogs.com/yjd_hycf_space/p/7681585.html 铺垫 在hadoop中一个独立的计算,例如在一个迭代过程中,除可复制的文件系统(HDFS)外没有提供其他存储的概念,这就导致在网络上进行数据复制而增加了大量的消耗,而对于两个的MapReduce作业之间数据共享只有一个办法,就是将其写到一个稳定的外部存储系统,如分布式文件系统

Spark笔记:复杂RDD的API的理解(上)

本篇接着讲解RDD的API,讲解那些不是很容易理解的API,同时本篇文章还将展示如何将外部的函数引入到RDD的API里使用,最后通过对RDD的API深入学习,我们还讲讲一些和RDD开发相关的scala语法. 1)  aggregate(zeroValue)(seqOp,combOp)  该函数的功能和reduce函数一样,也是对数据进行聚合操作,不过aggregate可以返回和原RDD不同的数据类型,使用时候还要提供初始值. 我们来看看下面的用法,代码如下: val rddInt: RDD[In

Spark笔记整理(五):Spark RDD持久化、广播变量和累加器

[TOC] Spark RDD持久化 RDD持久化工作原理 Spark非常重要的一个功能特性就是可以将RDD持久化在内存中.当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition.这样的话,对于针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD,而不需要反复计算多次该RDD. 巧妙使用RDD持久化,甚至在某些场景下,可以将spark应用程序的性能提升1