RDD.scala(源码)

---- map、

--- flatMap、fliter、distinct、repartition、coalesce、sample、randomSplit、randomSampleWithRange、takeSample、union、++、sortBy、intersection

map源码

/** * Return a new RDD by applying a function to all elements of this RDD. */def map[U: ClassTag](f: T => U): RDD[U] = withScope {  val cleanF = sc.clean(f)  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))}

flatMap源码

/** *  Return a new RDD by first applying a function to all elements of this *  RDD, and then flattening the results. */def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {  val cleanF = sc.clean(f)  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))}

fliter源码

/** * Return a new RDD containing only the elements that satisfy a predicate. */def filter(f: T => Boolean): RDD[T] = withScope {  val cleanF = sc.clean(f)  new MapPartitionsRDD[T, T](    this,    (context, pid, iter) => iter.filter(cleanF),    preservesPartitioning = true)}
distinct源码

/** * Return a new RDD containing the distinct elements in this RDD. */def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {  map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)}

/** * Return a new RDD containing the distinct elements in this RDD. */def distinct(): RDD[T] = withScope {  distinct(partitions.length)}
repartition源码

/** * Return a new RDD that has exactly numPartitions partitions. * * Can increase or decrease the level of parallelism in this RDD. Internally, this uses * a shuffle to redistribute data. * * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, * which can avoid performing a shuffle. */def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {  coalesce(numPartitions, shuffle = true)}

coalesce源码

/** * Return a new RDD that is reduced into `numPartitions` partitions. * * This results in a narrow dependency, e.g. if you go from 1000 partitions * to 100 partitions, there will not be a shuffle, instead each of the 100 * new partitions will claim 10 of the current partitions. * * However, if you‘re doing a drastic coalesce, e.g. to numPartitions = 1, * this may result in your computation taking place on fewer nodes than * you like (e.g. one node in the case of numPartitions = 1). To avoid this, * you can pass shuffle = true. This will add a shuffle step, but means the * current upstream partitions will be executed in parallel (per whatever * the current partitioning is). * * Note: With shuffle = true, you can actually coalesce to a larger number * of partitions. This is useful if you have a small number of partitions, * say 100, potentially with a few partitions being abnormally large. Calling * coalesce(1000, shuffle = true) will result in 1000 partitions with the * data distributed using a hash partitioner. */def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)    : RDD[T] = withScope {  if (shuffle) {    /** Distributes elements evenly across output partitions, starting from a random partition. */    val distributePartition = (index: Int, items: Iterator[T]) => {      var position = (new Random(index)).nextInt(numPartitions)      items.map { t =>        // Note that the hash code of the key will just be the key itself. The HashPartitioner        // will mod it with the number of total partitions.        position = position + 1        (position, t)      }    } : Iterator[(Int, T)]

    // include a shuffle step so that our upstream tasks are still distributed    new CoalescedRDD(      new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),      new HashPartitioner(numPartitions)),      numPartitions).values  } else {    new CoalescedRDD(this, numPartitions)  }}
sample源码

/** * Return a sampled subset of this RDD. * * @param withReplacement can elements be sampled multiple times (replaced when sampled out) * @param fraction expected size of the sample as a fraction of this RDD‘s size *  without replacement: probability that each element is chosen; fraction must be [0, 1] *  with replacement: expected number of times each element is chosen; fraction must be >= 0 * @param seed seed for the random number generator */def sample(    withReplacement: Boolean,    fraction: Double,    seed: Long = Utils.random.nextLong): RDD[T] = withScope {  require(fraction >= 0.0, "Negative fraction value: " + fraction)  if (withReplacement) {    new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)  } else {    new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)  }}
randomSplit源码

/** * Randomly splits this RDD with the provided weights. * * @param weights weights for splits, will be normalized if they don‘t sum to 1 * @param seed random seed * * @return split RDDs in an array */def randomSplit(    weights: Array[Double],    seed: Long = Utils.random.nextLong): Array[RDD[T]] = withScope {  val sum = weights.sum  val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)  normalizedCumWeights.sliding(2).map { x =>    randomSampleWithRange(x(0), x(1), seed)  }.toArray}

randomSampleWithRange源码

/** * Internal method exposed for Random Splits in DataFrames. Samples an RDD given a probability * range. * @param lb lower bound to use for the Bernoulli sampler * @param ub upper bound to use for the Bernoulli sampler * @param seed the seed for the Random number generator * @return A random sub-sample of the RDD without replacement. */private[spark] def randomSampleWithRange(lb: Double, ub: Double, seed: Long): RDD[T] = {  this.mapPartitionsWithIndex( { (index, partition) =>    val sampler = new BernoulliCellSampler[T](lb, ub)    sampler.setSeed(seed + index)    sampler.sample(partition)  }, preservesPartitioning = true)}

union源码

/** * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). */def union(other: RDD[T]): RDD[T] = withScope {  if (partitioner.isDefined && other.partitioner == partitioner) {    new PartitionerAwareUnionRDD(sc, Array(this, other))  } else {    new UnionRDD(sc, Array(this, other))  }}

++源码

/** * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). */def ++(other: RDD[T]): RDD[T] = withScope {  this.union(other)}
sortBy源码

/** * Return this RDD sorted by the given key function. */def sortBy[K](    f: (T) => K,    ascending: Boolean = true,    numPartitions: Int = this.partitions.length)    (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {  this.keyBy[K](f)      .sortByKey(ascending, numPartitions)      .values}
intersection源码

/** * Return the intersection of this RDD and another one. The output will not contain any duplicate * elements, even if the input RDDs did. * * Note that this method performs a shuffle internally. */def intersection(other: RDD[T]): RDD[T] = withScope {  this.map(v => (v, null)).cogroup(other.map(v => (v, null)))      .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }      .keys}

/** * Return the intersection of this RDD and another one. The output will not contain any duplicate * elements, even if the input RDDs did. * * Note that this method performs a shuffle internally. * * @param partitioner Partitioner to use for the resulting RDD */def intersection(    other: RDD[T],    partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {  this.map(v => (v, null)).cogroup(other.map(v => (v, null)), partitioner)      .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }      .keys}

/** * Return the intersection of this RDD and another one. The output will not contain any duplicate * elements, even if the input RDDs did.  Performs a hash partition across the cluster * * Note that this method performs a shuffle internally. * * @param numPartitions How many partitions to use in the resulting RDD */def intersection(other: RDD[T], numPartitions: Int): RDD[T] = withScope {  intersection(other, new HashPartitioner(numPartitions))}

glom源码

/** * Return an RDD created by coalescing all elements within each partition into an array. */def glom(): RDD[Array[T]] = withScope {  new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))}
cartesian源码

/** * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of * elements (a, b) where a is in `this` and b is in `other`. */def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {  new CartesianRDD(sc, this, other)}

groupBy源码

/** * Return an RDD of grouped items. Each group consists of a key and a sequence of elements * mapping to that key. The ordering of elements within each group is not guaranteed, and * may even differ each time the resulting RDD is evaluated. * * Note: This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {  groupBy[K](f, defaultPartitioner(this))}

/** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. The ordering of elements within each group is not guaranteed, and * may even differ each time the resulting RDD is evaluated. * * Note: This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */def groupBy[K](    f: T => K,    numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {  groupBy(f, new HashPartitioner(numPartitions))}

/** * Return an RDD of grouped items. Each group consists of a key and a sequence of elements * mapping to that key. The ordering of elements within each group is not guaranteed, and * may even differ each time the resulting RDD is evaluated. * * Note: This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)    : RDD[(K, Iterable[T])] = withScope {  val cleanF = sc.clean(f)  this.map(t => (cleanF(t), t)).groupByKey(p)}

pipe源码

/** * Return an RDD created by piping elements to a forked external process. */def pipe(command: String): RDD[String] = withScope {  new PipedRDD(this, command)}

/** * Return an RDD created by piping elements to a forked external process. */def pipe(command: String, env: Map[String, String]): RDD[String] = withScope {  new PipedRDD(this, command, env)}

/** * Return an RDD created by piping elements to a forked external process. * The print behavior can be customized by providing two functions. * * @param command command to run in forked process. * @param env environment variables to set. * @param printPipeContext Before piping elements, this function is called as an opportunity *                         to pipe context data. Print line function (like out.println) will be *                         passed as printPipeContext‘s parameter. * @param printRDDElement Use this function to customize how to pipe elements. This function *                        will be called with each RDD element as the 1st parameter, and the *                        print line function (like out.println()) as the 2nd parameter. *                        An example of pipe the RDD data of groupBy() in a streaming way, *                        instead of constructing a huge String to concat all the elements: *                        def printRDDElement(record:(String, Seq[String]), f:String=>Unit) = *                          for (e <- record._2){f(e)} * @param separateWorkingDir Use separate working directories for each task. * @return the result RDD */def pipe(    command: Seq[String],    env: Map[String, String] = Map(),    printPipeContext: (String => Unit) => Unit = null,    printRDDElement: (T, String => Unit) => Unit = null,    separateWorkingDir: Boolean = false): RDD[String] = withScope {  new PipedRDD(this, command, env,    if (printPipeContext ne null) sc.clean(printPipeContext) else null,    if (printRDDElement ne null) sc.clean(printRDDElement) else null,    separateWorkingDir)}
mapPartitions源码

/** * Return a new RDD by applying a function to each partition of this RDD. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn‘t modify the keys. */def mapPartitions[U: ClassTag](    f: Iterator[T] => Iterator[U],    preservesPartitioning: Boolean = false): RDD[U] = withScope {  val cleanedF = sc.clean(f)  new MapPartitionsRDD(    this,    (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),    preservesPartitioning)}

mapPartitionsWithIndex源码

/** * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn‘t modify the keys. */def mapPartitionsWithIndex[U: ClassTag](    f: (Int, Iterator[T]) => Iterator[U],    preservesPartitioning: Boolean = false): RDD[U] = withScope {  val cleanedF = sc.clean(f)  new MapPartitionsRDD(    this,    (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),    preservesPartitioning)}


mapPartitionsWithContext源码

/** * :: DeveloperApi :: * Return a new RDD by applying a function to each partition of this RDD. This is a variant of * mapPartitions that also passes the TaskContext into the closure. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn‘t modify the keys. */@DeveloperApi@deprecated("use TaskContext.get", "1.2.0")def mapPartitionsWithContext[U: ClassTag](    f: (TaskContext, Iterator[T]) => Iterator[U],    preservesPartitioning: Boolean = false): RDD[U] = withScope {  val cleanF = sc.clean(f)  val func = (context: TaskContext, index: Int, iter: Iterator[T]) => cleanF(context, iter)  new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)}
mapPartitionsWithSplit源码

/** * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. */@deprecated("use mapPartitionsWithIndex", "0.7.0")def mapPartitionsWithSplit[U: ClassTag](    f: (Int, Iterator[T]) => Iterator[U],    preservesPartitioning: Boolean = false): RDD[U] = withScope {  mapPartitionsWithIndex(f, preservesPartitioning)}
mapWith源码

/** * Maps f over this RDD, where f takes an additional parameter of type A.  This * additional parameter is produced by constructA, which is called in each * partition with the index of that partition. */@deprecated("use mapPartitionsWithIndex", "1.0.0")def mapWith[A, U: ClassTag]    (constructA: Int => A, preservesPartitioning: Boolean = false)    (f: (T, A) => U): RDD[U] = withScope {  val cleanF = sc.clean(f)  val cleanA = sc.clean(constructA)  mapPartitionsWithIndex((index, iter) => {    val a = cleanA(index)    iter.map(t => cleanF(t, a))  }, preservesPartitioning)}

flatMapWith源码

/** * FlatMaps f over this RDD, where f takes an additional parameter of type A.  This * additional parameter is produced by constructA, which is called in each * partition with the index of that partition. */@deprecated("use mapPartitionsWithIndex and flatMap", "1.0.0")def flatMapWith[A, U: ClassTag]    (constructA: Int => A, preservesPartitioning: Boolean = false)    (f: (T, A) => Seq[U]): RDD[U] = withScope {  val cleanF = sc.clean(f)  val cleanA = sc.clean(constructA)  mapPartitionsWithIndex((index, iter) => {    val a = cleanA(index)    iter.flatMap(t => cleanF(t, a))  }, preservesPartitioning)}


foreachWith源码

/** * Applies f to each element of this RDD, where f takes an additional parameter of type A. * This additional parameter is produced by constructA, which is called in each * partition with the index of that partition. */@deprecated("use mapPartitionsWithIndex and foreach", "1.0.0")def foreachWith[A](constructA: Int => A)(f: (T, A) => Unit): Unit = withScope {  val cleanF = sc.clean(f)  val cleanA = sc.clean(constructA)  mapPartitionsWithIndex { (index, iter) =>    val a = cleanA(index)    iter.map(t => {cleanF(t, a); t})  }}

filterWith源码

/** * Filters this RDD with p, where p takes an additional parameter of type A.  This * additional parameter is produced by constructA, which is called in each * partition with the index of that partition. */@deprecated("use mapPartitionsWithIndex and filter", "1.0.0")def filterWith[A](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = withScope {  val cleanP = sc.clean(p)  val cleanA = sc.clean(constructA)  mapPartitionsWithIndex((index, iter) => {    val a = cleanA(index)    iter.filter(t => cleanP(t, a))  }, preservesPartitioning = true)}

继续


				
时间: 2024-08-30 10:34:48

RDD.scala(源码)的相关文章

Spark RDD类源码学习(未完)

每天进步一点点~开搞~ abstract class RDD[T: ClassTag]( //@transient 注解表示将字段标记为瞬态的 @transient private var _sc: SparkContext, // Seq是序列,元素有插入的先后顺序,可以有重复的元素. @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging { if (classOf[RDD[_]]

RDD的源码

RDD是一个抽象类定义了所有RDD共有的一些属性和方法,下面介绍了主要的属性和方法. abstract class RDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging { RDD有5个主要的属性 * - A list of partitions * - A fun

第2课 Scala面向对象彻底精通及Spark源码SparkContext,RDD阅读总结

第2课:Scala面向对象彻底精通及Spark源码阅读本期内容:1 Scala中的类.object实战详解 2 Scala中的抽象类.接口实战详解 3 综合案例及Spark源码解析 一:定义类class HiScala{private var name = "Spark" def sayName(){println(name)}def getName = name} Scala中,变量与类中的方法是同等级的,可以直接赋值给方法. scala中的get与set与Java中的get,set

scala的list源码解密

今日[DT大数据梦工厂视频]<第83讲:Scala中List的实现内幕源码揭秘>51CTO视频:http://edu.51cto.com/lesson/id-71363.html土豆视频:http://www.tudou.com/programs/view/Qp70gLn7jr8/ 优酷视频:http://v.youku.com/v_show/id_XMTMwNjU0NTI0NA==.html?from=y1.7-1.2 腾讯视频:http://v.qq.com/boke/page/u/0/m

(版本定制)第15课:Spark Streaming源码解读之No Receivers彻底思考

hu本期内容: 1.Kafka解密 背景: 目前No Receivers在企业中使用的越来越多,No Receivers具有更强的控制度,语义一致性.No Receivers是我们操作数据来源自然方式,操作数据来源使用一个封装器,且是RDD类型的. 所以Spark Streaming就产生了自定义RDD –> KafkaRDD. 源码分析: 1.KafkaRDD源码 private[kafka]class KafkaRDD[K: ClassTag,V: ClassTag,U <: Decode

Apache Spark源码走读之16 -- spark repl实现详解

欢迎转载,转载请注明出处,徽沪一郎. 概要 之所以对spark shell的内部实现产生兴趣全部缘于好奇代码的编译加载过程,scala是需要编译才能执行的语言,但提供的scala repl可以实现代码的实时交互式执行,这是为什么呢? 既然scala已经提供了repl,为什么spark还要自己单独搞一套spark repl,这其中的缘由到底何在? 显然,这些都是问题,要解开这些谜团,只有再次开启一段源码分析之旅了. 全局视图 上图显示了java源文件从编译到加载执行的全局视图,整个过程中最主要的步

大数据Spark蘑菇云前传第15课:Scala类型参数编程实战及Spark源码鉴赏(学习笔记)

前传第15课:Scala类型参数编程实战及Spark源码鉴赏 本課課程: Spark源码中的Scala类型系統的使用 Scala类型系統编程操作实战 Spark源码中的Scala类型系統的使用 classOf[RDD[_]] 這個也是类型系統 這里的意思是說 B 這種類型必需至少是 A 這樣類型 Ordering Scala类型系統编程操作实战 作為類型系統最大的就可以對類型進行限制,在Scala 中的類型系統,他本身也作為對象.e.g. 我們可以建立 Person 這個類,現在可以建立一個什麼

(升级版)Spark从入门到精通(Scala编程、案例实战、高级特性、Spark内核源码剖析、Hadoop高端)

本课程主要讲解目前大数据领域最热门.最火爆.最有前景的技术——Spark.在本课程中,会从浅入深,基于大量案例实战,深度剖析和讲解Spark,并且会包含完全从企业真实复杂业务需求中抽取出的案例实战.课程会涵盖Scala编程详解.Spark核心编程.Spark SQL和Spark Streaming.Spark内核以及源码剖析.性能调优.企业级案例实战等部分.完全从零起步,让学员可以一站式精通Spark企业级大数据开发,提升自己的职场竞争力,实现更好的升职或者跳槽,或者从j2ee等传统软件开发工程

大数据Spark蘑菇云前传第16课:Scala implicits编程彻底实战及Spark源码鉴赏(学习笔记)

本課課程: Spark源码中的Scala的 implicit 的使用 Scala的 implicit 编程操作实战 Scala的 implicit 企业级最佳实践 Spark源码中的Scala的 implicit 的使用 這個東西意義非常重大,RDD 本身沒有所謂的 Key, Value,只不過是自己本身解讀的時候把它變成 Key Value 的方法去解讀,RDD 本身就是一個 Record. RDD 本身沒有 reduceByKey,它是用了隐式转换,转换了PairRDDPartition 類