【Spark】RDD操作详解4——Action算子

本质上在Actions算子中通过SparkContext执行提交作业的runJob操作,触发了RDD DAG的执行。

根据Action算子的输出空间将Action算子进行分类:无输出、 HDFS、 Scala集合和数据类型。

无输出

foreach

对RDD中的每个元素都应用f函数操作,不返回RDD和Array,而是返回Uint。

图中,foreach算子通过用户自定义函数对每个数据项进行操作。 本例中自定义函数为println,控制台打印所有数据项。

源码:

  /**
   * Applies a function f to all elements of this RDD.
   */
  def foreach(f: T => Unit) {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  }

HDFS

(1)saveAsTextFile

函数将数据输出,存储到HDFS的指定目录。将RDD中的每个元素映射转变为(Null,x.toString),然后再将其写入HDFS。

图中,左侧的方框代表RDD分区,右侧方框代表HDFS的Block。 通过函数将RDD的每个分区存储为HDFS中的一个Block。

源码:

  /**
   * Save this RDD as a text file, using string representations of elements.
   */
  def saveAsTextFile(path: String) {
    // https://issues.apache.org/jira/browse/SPARK-2075
    //
    // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit
    // Ordering for it and will use the default `null`. However, it‘s a `Comparable[NullWritable]`
    // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an
    // Ordering for `NullWritable`. That‘s why the compiler will generate different anonymous
    // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.
    //
    // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate
    // same bytecodes for `saveAsTextFile`.
    val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
    val textClassTag = implicitly[ClassTag[Text]]
    val r = this.mapPartitions { iter =>
      val text = new Text()
      iter.map { x =>
        text.set(x.toString)
        (NullWritable.get(), text)
      }
    }
    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
  }

  /**
   * Save this RDD as a compressed text file, using string representations of elements.
   */
  def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) {
    // https://issues.apache.org/jira/browse/SPARK-2075
    val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
    val textClassTag = implicitly[ClassTag[Text]]
    val r = this.mapPartitions { iter =>
      val text = new Text()
      iter.map { x =>
        text.set(x.toString)
        (NullWritable.get(), text)
      }
    }
    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
  }

(2)saveAsObjectFile

saveAsObjectFile将分区中的每10个元素组成一个Array,然后将这个Array序列化,映射为(Null,BytesWritable(Y))的元素,写入HDFS为SequenceFile的格式。

图中,左侧方框代表RDD分区,右侧方框代表HDFS的Block。 通过函数将RDD的每个分区存储为HDFS上的一个Block。

源码:

  /**
   * Save this RDD as a SequenceFile of serialized objects.
   */
  def saveAsObjectFile(path: String) {
    this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
      .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
      .saveAsSequenceFile(path)
  }

Scala集合和数据类型

(1)collect

collect相当于toArray,toArray已经过时不推荐使用,collect将分布式的RDD返回为一个单机的scala Array数组。 在这个数组上运用scala的函数式操作。

图中,左侧方框代表RDD分区,右侧方框代表单机内存中的数组。通过函数操作,将结果返回到Driver程序所在的节点,以数组形式存储。

源码:

  /**
   * Return an array that contains all of the elements in this RDD.
   */
  def collect(): Array[T] = {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

(2)collectAsMap

collectAsMap对(K,V)型的RDD数据返回一个单机HashMap。对于重复K的RDD元素,后面的元素覆盖前面的元素。

图中,左侧方框代表RDD分区,右侧方框代表单机数组。数据通过collectAsMap函数返回给Driver程序计算结果,结果以HashMap形式存储。

源码:

  /**
   * Return the key-value pairs in this RDD to the master as a Map.
   *
   * Warning: this doesn‘t return a multimap (so if you have multiple values to the same key, only
   *          one value per key is preserved in the map returned)
   */
  def collectAsMap(): Map[K, V] = {
    val data = self.collect()
    val map = new mutable.HashMap[K, V]
    map.sizeHint(data.length)
    data.foreach { pair => map.put(pair._1, pair._2) }
    map
  }

(3)reduceByKeyLocally

实现的是先reduce再collectAsMap的功能,先对RDD的整体进行reduce操作,然后再收集所有结果返回为一个HashMap。

源码:

  /**
   * Merge the values for each key using an associative reduce function, but return the results
   * immediately to the master as a Map. This will also perform the merging locally on each mapper
   * before sending results to a reducer, similarly to a "combiner" in MapReduce.
   */
  def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = {

    if (keyClass.isArray) {
      throw new SparkException("reduceByKeyLocally() does not support array keys")
    }

    val reducePartition = (iter: Iterator[(K, V)]) => {
      val map = new JHashMap[K, V]
      iter.foreach { pair =>
        val old = map.get(pair._1)
        map.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
      }
      Iterator(map)
    } : Iterator[JHashMap[K, V]]

    val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
      m2.foreach { pair =>
        val old = m1.get(pair._1)
        m1.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
      }
      m1
    } : JHashMap[K, V]

    self.mapPartitions(reducePartition).reduce(mergeMaps)
  }

(4)lookup

Lookup函数对(Key,Value)型的RDD操作,返回指定Key对应的元素形成的Seq。这个函数处理优化的部分在于,如果这个RDD包含分区器,则只会对应处理K所在的分区,然后返回由(K,V)形成的Seq。如果RDD不包含分区器,则需要对全RDD元素进行暴力扫描处理,搜索指定K对应的元素。

图中,左侧方框代表RDD分区,右侧方框代表Seq,最后结果返回到Driver所在节点的应用中。

源码:

  /**
   * Return the list of values in the RDD for key `key`. This operation is done efficiently if the
   * RDD has a known partitioner by only searching the partition that the key maps to.
   */
  def lookup(key: K): Seq[V] = {
    self.partitioner match {
      case Some(p) =>
        val index = p.getPartition(key)
        val process = (it: Iterator[(K, V)]) => {
          val buf = new ArrayBuffer[V]
          for (pair <- it if pair._1 == key) {
            buf += pair._2
          }
          buf
        } : Seq[V]
        val res = self.context.runJob(self, process, Array(index), false)
        res(0)
      case None =>
        self.filter(_._1 == key).map(_._2).collect()
    }
  }

(5)count

count返回整个RDD的元素个数。

图中,返回数据的个数为5。一个方块代表一个RDD分区。

源码:

  /**
   * Return the number of elements in the RDD.
   */
  def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

(6)top

top可返回最大的k个元素。

相近函数说明:

  • top返回最大的k个元素。
  • take返回最小的k个元素。
  • takeOrdered返回最小的k个元素, 并且在返回的数组中保持元素的顺序。
  • first相当于top( 1) 返回整个RDD中的前k个元素, 可以定义排序的方式Ordering[T]。返回的是一个含前k个元素的数组。

源码:

  /**
   * Returns the top k (largest) elements from this RDD as defined by the specified
   * implicit Ordering[T]. This does the opposite of [[takeOrdered]]. For example:
   * {{{
   *   sc.parallelize(Seq(10, 4, 2, 12, 3)).top(1)
   *   // returns Array(12)
   *
   *   sc.parallelize(Seq(2, 3, 4, 5, 6)).top(2)
   *   // returns Array(6, 5)
   * }}}
   *
   * @param num k, the number of top elements to return
   * @param ord the implicit ordering for T
   * @return an array of top elements
   */
  def top(num: Int)(implicit ord: Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse)

(7)reduce

reduce函数相当于对RDD中的元素进行reduceLeft函数的操作。

reduceLeft先对两个元素

  /**
   * Reduces the elements of this RDD using the specified commutative and
   * associative binary operator.
   */
  def reduce(f: (T, T) => T): T = {
    val cleanF = sc.clean(f)
    val reducePartition: Iterator[T] => Option[T] = iter => {
      if (iter.hasNext) {
        Some(iter.reduceLeft(cleanF))
      } else {
        None
      }
    }
    var jobResult: Option[T] = None
    val mergeResult = (index: Int, taskResult: Option[T]) => {
      if (taskResult.isDefined) {
        jobResult = jobResult match {
          case Some(value) => Some(f(value, taskResult.get))
          case None => taskResult
        }
      }
    }
    sc.runJob(this, reducePartition, mergeResult)
    // Get the final result out of our Option, or throw an exception if the RDD was empty
    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
  }

(8)fold

fold和reduce的原理相同,但是与reduce不同,相当于每个reduce时,迭代器取的第一个元素是zeroValue。

图中,通过用户自定义函数进行fold运算,图中的一个方框代表一个RDD分区。

源码:

  /**
   * Aggregate the elements of each partition, and then the results for all the partitions, using a
   * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
   * modify t1 and return it as its result value to avoid object allocation; however, it should not
   * modify t2.
   */
  def fold(zeroValue: T)(op: (T, T) => T): T = {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
    val cleanOp = sc.clean(op)
    val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
    val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
    sc.runJob(this, foldPartition, mergeResult)
    jobResult
  }

(9)aggregate

aggregate先对每个分区的所有元素进行aggregate操作,再对分区的结果进行fold操作。

aggreagate与fold和reduce的不同之处在于,aggregate相当于采用归并的方式进行数据聚集,这种聚集是并行化的。 而在fold和reduce函数的运算过程中,每个分区中需要进行串行处理,每个分区串行计算完结果,结果再按之前的方式进行聚集,并返回最终聚集结果。

图中,通过用户自定义函数对RDD 进行aggregate的聚集操作,图中的每个方框代表一个RDD分区。

源码:

  /**
   * Aggregate the elements of each partition, and then the results for all the partitions, using
   * given combine functions and a neutral "zero value". This function can return a different result
   * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
   * and one operation for merging two U‘s, as in scala.TraversableOnce. Both of these functions are
   * allowed to modify and return their first argument instead of creating a new U to avoid memory
   * allocation.
   */
  def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
    val cleanSeqOp = sc.clean(seqOp)
    val cleanCombOp = sc.clean(combOp)
    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
    sc.runJob(this, aggregatePartition, mergeResult)
    jobResult
  }

转载请注明作者Jason Ding及其出处

GitCafe博客主页(http://jasonding1354.gitcafe.io/)

Github博客主页(http://jasonding1354.github.io/)

CSDN博客(http://blog.csdn.net/jasonding1354)

简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)

Google搜索jasonding1354进入我的博客主页

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-08-07 02:31:26

【Spark】RDD操作详解4——Action算子的相关文章

【Spark】RDD操作详解2——值型Transformation算子

处理数据类型为Value型的Transformation算子可以根据RDD变换算子的输入分区与输出分区关系分为以下几种类型: 1)输入分区与输出分区一对一型 2)输入分区与输出分区多对一型 3)输入分区与输出分区多对多型 4)输出分区为输入分区子集型 5)还有一种特殊的输入与输出分区一对一的算子类型:Cache型. Cache算子对RDD分区进行缓存 输入分区与输出分区一对一型 (1)map 将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素.源码中的map算子相当于初

【Spark】RDD操作详解1——Transformation和Actions概况

Spark算子的作用 下图描述了Spark在运行转换中通过算子对RDD进行转换. 算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作. 输入:在Spark程序运行中,数据从外部数据空间(如分布式存储:textFile读取HDFS等,parallelize方法输入Scala集合或数据)输入Spark,数据进入Spark运行时数据空间,转化为Spark中的数据块,通过BlockManager进行管理. 运行:在Spark数据输入形成RDD后便可以通过变换算子,如filter等,对数据进行操

【Spark】RDD操作具体解释4——Action算子

本质上在Actions算子中通过SparkContext运行提交作业的runJob操作,触发了RDD DAG的运行. 依据Action算子的输出空间将Action算子进行分类:无输出. HDFS. Scala集合和数据类型. 无输出 foreach 对RDD中的每一个元素都应用f函数操作,不返回RDD和Array,而是返回Uint. 图中.foreach算子通过用户自己定义函数对每一个数据项进行操作. 本例中自己定义函数为println,控制台打印全部数据项. 源代码: /** * Applie

Spark RDD使用详解1--RDD原理

RDD简介 在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数据集(Resilient Distributed Dataset,RDD).RDD是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现.RDD是Spark最核心的东西,它表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现.RDD必须是可序列化的.RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操

Spark RDD API详解(一)(转)

RDD是什么? RDD是Spark中的抽象数据结构类型,任何数据在Spark中都被表示为RDD.从编程的角度来看,RDD可以简单看成是一个数组.和普通数组 的区别是,RDD中的数据是分区存储的,这样不同分区的数据就可以分布在不同的机器上,同时可以被并行处理.因此,Spark应用程序所做的无非是把需要 处理的数据转换为RDD,然后对RDD进行一系列的变换和操作从而得到结果.本文为第一部分,将介绍Spark RDD中与Map和Reduce相关的API中. 如何创建RDD? RDD可以从普通数组创建出

Spark RDD API详解(一) Map和Reduce

本文由cmd markdown编辑,原始链接:https://www.zybuluo.com/jewes/note/35032 RDD是什么? RDD是Spark中的抽象数据结构类型,任何数据在Spark中都被表示为RDD.从编程的角度来看,RDD可以简单看成是一个数组.和普通数组 的区别是,RDD中的数据是分区存储的,这样不同分区的数据就可以分布在不同的机器上,同时可以被并行处理.因此,Spark应用程序所做的无非是把需要 处理的数据转换为RDD,然后对RDD进行一系列的变换和操作从而得到结果

【Spark】RDD操作详解3——键值型Transformation算子

Transformation处理的数据为Key-Value形式的算子大致可以分为:输入分区与输出分区一对一.聚集.连接操作. 输入分区与输出分区一对一 mapValues mapValues:针对(Key,Value)型数据中的Value进行Map操作,而不对Key进行处理. 方框代表RDD分区.a=>a+2代表只对( V1, 1)数据中的1进行加2操作,返回结果为3. 源码: /** * Pass each value in the key-value pair RDD through a m

Java路径操作详解

1.基本概念的理解 绝对路径:绝对路径就是你的主页上的文件或目录在硬盘上真正的路径,(URL和物理路径)例如: C:\xyz\test.txt 代表了test.txt文件的绝对路径.http://www.sun.com/index.htm也代表了一个 URL绝对路径. 相对路径:相对与某个基准目录的路径.包含Web的相对路径(HTML中的相对目录),例如:在 Servlet中,"/"代表Web应用的根目录.和物理路径的相对表示.例如:". /" 代表当前目录, &q

Windows DIB文件操作详解-4.使用DIB Section

前面讲了为了提高DIB的显示性能和效率,我们将DIB转换成DDB,但是这又遇到一个问题,如果我想操作DIB的数据的话,显然是不能使用DDB:一是因为DIB转DDB时发生了颜色转换,再就是DDB无法直接提取指定像素点的数据.那么我们怎么办呢,Windows使用一种折中的方式来达到这一目标(既提高了显示效率和性能,又可以直接操作像素点). 1.DIB Section存储和显示 Windows使用DIB块(DIB Section)来存储DIB数据,其内存结构示意图如下 其实,和我们自己读入DIB数据到