【Spark】RDD操作详解2——值型Transformation算子

处理数据类型为Value型的Transformation算子可以根据RDD变换算子的输入分区与输出分区关系分为以下几种类型:

1)输入分区与输出分区一对一型

2)输入分区与输出分区多对一型

3)输入分区与输出分区多对多型

4)输出分区为输入分区子集型

5)还有一种特殊的输入与输出分区一对一的算子类型:Cache型。 Cache算子对RDD分区进行缓存

输入分区与输出分区一对一型

(1)map

将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素。源码中的map算子相当于初始化一个RDD,新RDD叫作MapPartitionsRDD(this,sc.clean(f))。

图中,每个方框表示一个RDD分区,左侧的分区经过用户自定义函数f:T->U映射为右侧的新的RDD分区。但是实际只有等到Action算子触发后,这个f函数才会和其他函数在一个Stage中对数据进行运算。 V1输入f转换输出V’ 1。

源码:

  /**
   * Return a new RDD by applying a function to all elements of this RDD.
   */
  def map[U: ClassTag](f: T => U): RDD[U] = {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

(2)flatMap

将原来RDD中的每个元素通过函数f转换为新的元素,并将生成的RDD的每个集合中的元素合并为一个集合。 内部创建FlatMappedRDD(this,sc.clean(f))。

图中,小方框表示RDD的一个分区,对分区进行flatMap函数操作,flatMap中传入的函数为f:T->U,T和U可以是任意的数据类型。将分区中的数据通过用户自定义函数f转换为新的数据。外部大方框可以认为是一个RDD分区,小方框代表一个集合。 V1、 V2、 V3在一个集合作为RDD的一个数据项,转换为V’ 1、 V’ 2、 V’ 3后,将结合拆散,形成为RDD中的数据项。

源码:

  /**
   *  Return a new RDD by first applying a function to all elements of this
   *  RDD, and then flattening the results.
   */
  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
  }

(3)mapPartitions

mapPartitions函数获取到每个分区的迭代器,在函数中通过这个分区整体的迭代器对整个分区的元素进行操作。 内部实现是生成MapPartitionsRDD。

图中,用户通过函数f(iter) => iter.filter(_>=3)对分区中的所有数据进行过滤,>=3的数据保留。一个方块代表一个RDD分区,含有1、 2、 3的分区过滤只剩下元素3。

源码:

  /**
   * Return a new RDD by applying a function to each partition of this RDD.
   *
   * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
   * should be `false` unless this is a pair RDD and the input function doesn‘t modify the keys.
   */
  def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
    val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter)
    new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
  }S

(4)glom

glom函数将每个分区形成一个数组,内部实现是返回的RDD[Array[T]]。

图中,方框代表一个分区。 该图表示含有V1、 V2、 V3的分区通过函数glom形成一个数组Array[(V1),(V2),(V3)]。

源码:

  /**S
   * Return an RDD created by coalescing all elements within each partition into an array.
   */
  def glom(): RDD[Array[T]] = {
    new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
  }

输入分区与输出分区多对一型

(1)union

使用union函数时需要保证两个RDD元素的数据类型相同,返回的RDD数据类型和被合并的RDD元素数据类型相同,并不进行去重操作,保存所有元素。如果想去重,可以使用distinct()。++符号相当于uion函数操作。

图中,左侧的大方框代表两个RDD,大方框内的小方框代表RDD的分区。右侧大方框代表合并后的RDD,大方框内的小方框代表分区。含有V1,V2…U4的RDD和含有V1,V8…U8的RDD合并所有元素形成一个RDD。V1、V1、V2、V8形成一个分区,其他元素同理进行合并。

源码:

  /**
   * Return the union of this RDD and another one. Any identical elements will appear multiple
   * times (use `.distinct()` to eliminate them).
   */
  def union(other: RDD[T]): RDD[T] = {
    if (partitioner.isDefined && other.partitioner == partitioner) {
      new PartitionerAwareUnionRDD(sc, Array(this, other))
    } else {
      new UnionRDD(sc, Array(this, other))
    }
  }

(2)certesian

对两个RDD内的所有元素进行笛卡尔积操作。操作后,内部实现返回CartesianRDD。

左侧的大方框代表两个RDD,大方框内的小方框代表RDD的分区。右侧大方框代表合并后的RDD,大方框内的小方框代表分区。

大方框代表RDD,大方框中的小方框代表RDD分区。 例如,V1和另一个RDD中的W1、 W2、 Q5进行笛卡尔积运算形成(V1,W1)、(V1,W2)、(V1,Q5)。

源码:

  /**
   * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
   * elements (a, b) where a is in `this` and b is in `other`.
   */
  def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)

输入分区与输出分区多对多型

groupBy

将元素通过函数生成相应的Key,数据就转化为Key-Value格式,之后将Key相同的元素分为一组。

val cleanF = sc.clean(f)中sc.clean函数将用户函数预处理;

this.map(t => (cleanF(t), t)).groupByKey(p)对数据map进行函数操作,再对groupByKey进行分组操作。其中,p中确定了分区个数和分区函数,也就决定了并行化的程度。

图中,方框代表一个RDD分区,相同key的元素合并到一个组。 例如,V1,V2合并为一个Key-Value对,其中key为“ V” ,Value为“ V1,V2” ,形成V,Seq(V1,V2)。

源码:

  /**
   * Return an RDD of grouped items. Each group consists of a key and a sequence of elements
   * mapping to that key. The ordering of elements within each group is not guaranteed, and
   * may even differ each time the resulting RDD is evaluated.
   *
   * Note: This operation may be very expensive. If you are grouping in order to perform an
   * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
   * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
   */
  def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =
    groupBy[K](f, defaultPartitioner(this))

  /**
   * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
   * mapping to that key. The ordering of elements within each group is not guaranteed, and
   * may even differ each time the resulting RDD is evaluated.
   *
   * Note: This operation may be very expensive. If you are grouping in order to perform an
   * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
   * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
   */
  def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =
    groupBy(f, new HashPartitioner(numPartitions))

  /**
   * Return an RDD of grouped items. Each group consists of a key and a sequence of elements
   * mapping to that key. The ordering of elements within each group is not guaranteed, and
   * may even differ each time the resulting RDD is evaluated.
   *
   * Note: This operation may be very expensive. If you are grouping in order to perform an
   * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
   * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
   */
  def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
      : RDD[(K, Iterable[T])] = {
    val cleanF = sc.clean(f)
    this.map(t => (cleanF(t), t)).groupByKey(p)
  }

输出分区为输入分区子集型

(1)filter

filter的功能是对元素进行过滤,对每个元素应用f函数,返回值为true的元素在RDD中保留,返回为false的将过滤掉。 内部实现相当于生成FilteredRDD(this,sc.clean(f))。

图中,每个方框代表一个RDD分区。 T可以是任意的类型。通过用户自定义的过滤函数f,对每个数据项进行操作,将满足条件,返回结果为true的数据项保留。 例如,过滤掉V2、 V3保留了V1,将区分命名为V1’。

源码:

  /**
   * Return a new RDD containing only the elements that satisfy a predicate.
   */
  def filter(f: T => Boolean): RDD[T] = {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
      this,
      (context, pid, iter) => iter.filter(cleanF),
      preservesPartitioning = true)
  }

(2)distinct

distinct将RDD中的元素进行去重操作。

图中,每个方框代表一个分区,通过distinct函数,将数据去重。 例如,重复数据V1、 V1去重后只保留一份V1。

源码:

  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(): RDD[T] = distinct(partitions.size)

(3)subtract

subtract相当于进行集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素。

图中,左侧的大方框代表两个RDD,大方框内的小方框代表RDD的分区。右侧大方框代表合并后的RDD,大方框内的小方框代表分区。V1在两个RDD中均有,根据差集运算规则,新RDD不保留,V2在第一个RDD有,第二个RDD没有,则在新RDD元素中包含V2。

源码:

  /**
   * Return an RDD with the elements from `this` that are not in `other`.
   *
   * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
   * RDD will be <= us.
   */
  def subtract(other: RDD[T]): RDD[T] =
    subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size)))

  /**
   * Return an RDD with the elements from `this` that are not in `other`.
   */
  def subtract(other: RDD[T], numPartitions: Int): RDD[T] =
    subtract(other, new HashPartitioner(numPartitions))

  /**
   * Return an RDD with the elements from `this` that are not in `other`.
   */
  def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = {
    if (partitioner == Some(p)) {
      // Our partitioner knows how to handle T (which, since we have a partitioner, is
      // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
      val p2 = new Partitioner() {
        override def numPartitions = p.numPartitions
        override def getPartition(k: Any) = p.getPartition(k.asInstanceOf[(Any, _)]._1)
      }
      // Unfortunately, since we‘re making a new p2, we‘ll get ShuffleDependencies
      // anyway, and when calling .keys, will not have a partitioner set, even though
      // the SubtractedRDD will, thanks to p2‘s de-tupled partitioning, already be
      // partitioned by the right/real keys (e.g. p).
      this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
    } else {
      this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
    }
  }

(4)sample

sample将RDD这个集合内的元素进行采样,获取所有元素的子集。用户可以设定是否有放回的抽样、百分比、随机种子,进而决定采样方式。

参数说明:

withReplacement=true, 表示有放回的抽样;

withReplacement=false, 表示无放回的抽样。

每个方框是一个RDD分区。通过sample函数,采样50%的数据。V1、V2、U1、U2、U3、U4采样出数据V1和U1、U2,形成新的RDD。

源码:

  /**
   * Return a sampled subset of this RDD.
   */
  def sample(withReplacement: Boolean,
      fraction: Double,
      seed: Long = Utils.random.nextLong): RDD[T] = {
    require(fraction >= 0.0, "Negative fraction value: " + fraction)
    if (withReplacement) {
      new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
    } else {
      new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
    }
  }

(5)takeSample

takeSample()函数和上面的sample函数是一个原理,但是不使用相对比例采样,而是按设定的采样个数进行采样,同时返回结果不再是RDD,而是相当于对采样后的数据进行collect(),返回结果的集合为单机的数组。

图中,左侧的方框代表分布式的各个节点上的分区,右侧方框代表单机上返回的结果数组。通过takeSample对数据采样,设置为采样一份数据,返回结果为V1。

源码:

  /**
   * Return a fixed-size sampled subset of this RDD in an array
   *
   * @param withReplacement whether sampling is done with replacement
   * @param num size of the returned sample
   * @param seed seed for the random number generator
   * @return sample of specified size in an array
   */
  def takeSample(withReplacement: Boolean,
      num: Int,
      seed: Long = Utils.random.nextLong): Array[T] = {
    val numStDev =  10.0

    if (num < 0) {
      throw new IllegalArgumentException("Negative number of elements requested")
    } else if (num == 0) {
      return new Array[T](0)
    }

    val initialCount = this.count()
    if (initialCount == 0) {
      return new Array[T](0)
    }

    val maxSampleSize = Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt
    if (num > maxSampleSize) {
      throw new IllegalArgumentException("Cannot support a sample size > Int.MaxValue - " +
        s"$numStDev * math.sqrt(Int.MaxValue)")
    }

    val rand = new Random(seed)
    if (!withReplacement && num >= initialCount) {
      return Utils.randomizeInPlace(this.collect(), rand)
    }

    val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,
      withReplacement)

    var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()

    // If the first sample didn‘t turn out large enough, keep trying to take samples;
    // this shouldn‘t happen often because we use a big multiplier for the initial size
    var numIters = 0
    while (samples.length < num) {
      logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")
      samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
      numIters += 1
    }

    Utils.randomizeInPlace(samples, rand).take(num)
  }

Cache型

(1)cache

cache将RDD元素从磁盘缓存到内存,相当于persist(MEMORY_ONLY)函数的功能。

图中,每个方框代表一个RDD分区,左侧相当于数据分区都存储在磁盘,通过cache算子将数据缓存在内存。

源码:

  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
  def cache(): this.type = persist()

(2)persist

persist函数对RDD进行缓存操作。数据缓存在哪里由StorageLevel枚举类型确定。

有几种类型的组合,DISK代表磁盘,MEMORY代表内存,SER代表数据是否进行序列化存储。StorageLevel是枚举类型,代表存储模式,如,MEMORY_AND_DISK_SER代表数据可以存储在内存和磁盘,并且以序列化的方式存储。 其他同理。

图中,方框代表RDD分区。 disk代表存储在磁盘,mem代表存储在内存。 数据最初全部存储在磁盘,通过persist(MEMORY_AND_DISK)将数据缓存到内存,但是有的分区无法容纳在内存,例如:图3-18中将含有V1,V2,V3的RDD存储到磁盘,将含有U1,U2的RDD仍旧存储在内存。

源码:

  /**
   * Set this RDD‘s storage level to persist its values across operations after the first time
   * it is computed. This can only be used to assign a new storage level if the RDD does not
   * have a storage level set yet..
   */
  def persist(newLevel: StorageLevel): this.type = {
    // TODO: Handle changes of StorageLevel
    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
      throw new UnsupportedOperationException(
        "Cannot change storage level of an RDD after it was already assigned a level")
    }
    sc.persistRDD(this)
    // Register the RDD with the ContextCleaner for automatic GC-based cleanup
    sc.cleaner.foreach(_.registerRDDForCleanup(this))
    storageLevel = newLevel
    this
  }

  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

转载请注明作者Jason Ding及其出处

GitCafe博客主页(http://jasonding1354.gitcafe.io/)

Github博客主页(http://jasonding1354.github.io/)

CSDN博客(http://blog.csdn.net/jasonding1354)

简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)

Google搜索jasonding1354进入我的博客主页

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-12 22:46:25

【Spark】RDD操作详解2——值型Transformation算子的相关文章

【Spark】RDD操作具体解释2——值型Transformation算子

处理数据类型为Value型的Transformation算子能够依据RDD变换算子的输入分区与输出分区关系分为下面几种类型: 1)输入分区与输出分区一对一型 2)输入分区与输出分区多对一型 3)输入分区与输出分区多对多型 4)输出分区为输入分区子集型 5)另一种特殊的输入与输出分区一对一的算子类型:Cache型. Cache算子对RDD分区进行缓存 输入分区与输出分区一对一型 (1)map 将原来RDD的每一个数据项通过map中的用户自己定义函数f映射转变为一个新的元素. 源代码中的map算子相

【Spark】RDD操作详解1——Transformation和Actions概况

Spark算子的作用 下图描述了Spark在运行转换中通过算子对RDD进行转换. 算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作. 输入:在Spark程序运行中,数据从外部数据空间(如分布式存储:textFile读取HDFS等,parallelize方法输入Scala集合或数据)输入Spark,数据进入Spark运行时数据空间,转化为Spark中的数据块,通过BlockManager进行管理. 运行:在Spark数据输入形成RDD后便可以通过变换算子,如filter等,对数据进行操

【Spark】RDD操作详解4——Action算子

本质上在Actions算子中通过SparkContext执行提交作业的runJob操作,触发了RDD DAG的执行. 根据Action算子的输出空间将Action算子进行分类:无输出. HDFS. Scala集合和数据类型. 无输出 foreach 对RDD中的每个元素都应用f函数操作,不返回RDD和Array,而是返回Uint. 图中,foreach算子通过用户自定义函数对每个数据项进行操作. 本例中自定义函数为println,控制台打印所有数据项. 源码: /** * Applies a f

Spark RDD API详解(一)(转)

RDD是什么? RDD是Spark中的抽象数据结构类型,任何数据在Spark中都被表示为RDD.从编程的角度来看,RDD可以简单看成是一个数组.和普通数组 的区别是,RDD中的数据是分区存储的,这样不同分区的数据就可以分布在不同的机器上,同时可以被并行处理.因此,Spark应用程序所做的无非是把需要 处理的数据转换为RDD,然后对RDD进行一系列的变换和操作从而得到结果.本文为第一部分,将介绍Spark RDD中与Map和Reduce相关的API中. 如何创建RDD? RDD可以从普通数组创建出

Spark RDD API详解(一) Map和Reduce

本文由cmd markdown编辑,原始链接:https://www.zybuluo.com/jewes/note/35032 RDD是什么? RDD是Spark中的抽象数据结构类型,任何数据在Spark中都被表示为RDD.从编程的角度来看,RDD可以简单看成是一个数组.和普通数组 的区别是,RDD中的数据是分区存储的,这样不同分区的数据就可以分布在不同的机器上,同时可以被并行处理.因此,Spark应用程序所做的无非是把需要 处理的数据转换为RDD,然后对RDD进行一系列的变换和操作从而得到结果

Spark RDD使用详解1--RDD原理

RDD简介 在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数据集(Resilient Distributed Dataset,RDD).RDD是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现.RDD是Spark最核心的东西,它表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现.RDD必须是可序列化的.RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操

【Spark】RDD操作详解3——键值型Transformation算子

Transformation处理的数据为Key-Value形式的算子大致可以分为:输入分区与输出分区一对一.聚集.连接操作. 输入分区与输出分区一对一 mapValues mapValues:针对(Key,Value)型数据中的Value进行Map操作,而不对Key进行处理. 方框代表RDD分区.a=>a+2代表只对( V1, 1)数据中的1进行加2操作,返回结果为3. 源码: /** * Pass each value in the key-value pair RDD through a m

Yii 框架里数据库操作详解-[增加、查询、更新、删除的方法 &#39;AR模式&#39;]

public function getMinLimit () {        $sql = "...";        $result = yii::app()->db->createCommand($sql);        $query = $result->queryAll();         return array (                $query [0] ['max'],         );    } $connection=Yii::

笔记-[1]-DOM的节点操作详解.

DOM:文档对象模型 操作DOM基本就是操作DOM的元素节点. 节点的属性: 1:objElement.childNodes  :获取该元素对象的的节点集合,有length长度属性,在标准浏览器下使用,会辩认文本节点的节点,一般不用,有其他的更好的属性(children) 2:objElement.children :     获取该元素对象的的节点集合,有length长度属性,在ie8下和其他标准浏览器兼容,只获取元素节点. 3:obj.nodeType   :获取对象的节点类型,1为元素节点