spark知识体系03-Rdds,Accumulators,Broadcasts

本篇主要详解spark具体编程实践中的rdd常用算子。

Rdds,Accumulators,Broadcasts

RDD

RDDs support 两种类型的操作: transformations(转换), 它会在一个已存在的 dataset 上创建一个新的 dataset, 和 actions(动作), 将在 dataset 上运行的计算后返回到 driver 程序. 例如, map 是一个通过让每个数据集元素都执行一个函数,并返回的新 RDD 结果的 transformation, reduce reduce 通过执行一些函数,聚合 RDD 中所有元素,并将最终结果给返回驱动程序(虽然也有一个并行 reduceByKey 返回一个分布式数据集)的 action.

Spark 中所有的 transformations 都是 lazy(懒加载的), 因此它不会立刻计算出结果. 相反, 他们只记得应用于一些基本数据集的转换 (例如. 文件). 只有当需要返回结果给驱动程序时,transformations 才开始计算. 这种设计使 Spark 的运行更高效. 例如, 我们可以了解到,map 所创建的数据集将被用在 reduce 中,并且只有 reduce 的计算结果返回给驱动程序,而不是映射一个更大的数据集.

默认情况下,每次你在 RDD 运行一个 action 的时, 每个 transformed RDD 都会被重新计算。但是,您也可用 persist (或 cache) 方法将 RDD persist(持久化)到内存中;在这种情况下,Spark 为了下次查询时可以更快地访问,会把数据保存在集群上。此外,还支持持续持久化 RDDs 到磁盘,或复制到多个结点。

Transformations常用操作

Transformation Meaning
map(func) Return a new distributed dataset formed by passing each element of the source through a function func.
filter(func) Return a new dataset formed by selecting those elements of the source on which func returns true.
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.
sample(withReplacement, fraction, seed) Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
union(otherDataset) Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset) Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numTasks])) Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
reduceByKey(func, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numTasks]) When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
join(otherDataset, [numTasks]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numTasks]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.
cartesian(otherDataset) When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command, [envVars]) Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process‘s stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions) Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions) Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartitions(partitioner) Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

Tranformation常用操作实例

map(func)

  • 含义:

    通过将一个函数应用于该RDD的所有元素,返回一个新的RDD。一个元素对应一个元素。

  • 输入输出:

    def map[U: ClassTag](f: T => U): RDD[U]

  • 示例:

    如下,b将RDD[String]转换为RDD[Int],c将RDD[String]转换为RDD[(String,Int)]

    scala> val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
    a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:25
    
    scala> val b = a.map(_.length)
    b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:27
    
    scala> b.collect()
    res1: Array[Int] = Array(3, 6, 6, 3, 8)
    
    scala> val c= a.map(x=>(x,x.length))
    c: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at map at <console>:27
    
    scala> c.collect()
    res2: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))

flatMap(func)

  • 含义:

    通过将一个函数应用于该RDD的所有元素,返回一个新的RDD。 其中flatMap函数可以一个元素对应一个或者多个元素

  • 输入输出:

    def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

  • 示例:

    如下先应用元素dog先被map为(d,1),(o,1),(g,1),然后和其他map后的元素一起扁平化

    scala> val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

    a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[6] at parallelize at

    scala> val d=a.flatMap(x=>{
         |   for (i<-0 until x.length) yield (x.charAt(i),1)
         | })
    d: org.apache.spark.rdd.RDD[(Char, Int)] = MapPartitionsRDD[7] at flatMap at <console>:27
    
    scala> d.collect()
    res3: Array[(Char, Int)] = Array((d,1), (o,1), (g,1), (s,1), (a,1), (l,1), (m,1), (o,1), (n,1), (s,1), (a,1), (l,1), (m,1), (o,1), (n,1), (r,1), (a,1), (t,1), (e,1), (l,1), (e,1), (p,1), (h,1), (a,1), (n,1), (t,1))

mapPartition(func)

  • 含义

    运行在在每个 RDD 的 partition(分区),所以在一个类型为 T 的 RDD 上运行时 func 必须是 Iterator<T> => Iterator<U>类型。

  • 输入输出
    def mapPartitions[U:ClassTag](f:Iterator[T] =Iterator[U],preservesPartitioning:Boolean = false): RDD[U]
  • 示例

    如下判断分区中两两相邻的元素,根据结果推断,可以判定(1,2,3)在一个分区,(4,5,6)在一个分区,(7,8,9)在一个分区

    scala> val e = sc.parallelize(1 to 9, 3)
    e: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:25
    
    scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
         |   var res = List[(T, T)]()
         |   var pre = iter.next
         |   while (iter.hasNext)
         |   {
         |     val cur = iter.next;
         |     res .::= (pre, cur)
         |     pre = cur;
         |   }
         |   res.iterator
         | }
    myfunc: [T](iter: Iterator[T])Iterator[(T, T)]
    
    scala> e.mapPartitions(myfunc).collect
    res4: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

mapPartitionsWithIndex(func)

  • 含义:

    mapPartitions 类似,但是也需要提供一个代表 partition 的 index(索引)的 interger value(整型值)作为参数的 func,所以在一个类型为 T 的 RDD 上运行时 func 必须是 (Int, Iterator<T>) => Iterator<U> 类型。

  • 输入输出:

    def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

  • 示例:

    通过mapPartitionsWithIndex可以更准确判定数据在分区中的分布情况,见运行结果

    scala> val e = sc.parallelize(1 to 9, 3)
    e: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:25
    
    scala>
    
    scala> def myfunc2(index: Int, iter: Iterator[Int]) : Iterator[String] = {
         |   iter.map(x => index + "," + x)
         | }
    myfunc2: (index: Int, iter: Iterator[Int])Iterator[String]
    
    scala> e.mapPartitionsWithIndex(myfunc2).collect()
    res5: Array[String] = Array(0,1, 0,2, 0,3, 1,4, 1,5, 1,6, 2,7, 2,8, 2,9)  

groupByKey(func)

  • 含义:

    在一个 (K, V) pair 的 dataset 上调用时,返回一个 (K, Iterable

  • 输入输出:
    def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
  • 示例:

    将数据按照长度分组

    val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)
    val b = a.keyBy(_.length)
    b.groupByKey.collect
    res11: Array[(Int, Seq[String])] = Array((4,ArrayBuffer(lion)), (6,ArrayBuffer(spider)), (3,ArrayBuffer(dog, cat)), (5,ArrayBuffer(tiger, eagle)))

reduceByKey(func)

  • 含义:

    在 (K, V) pairs 的 dataset 上调用时, 返回 dataset of (K, V) pairs 的 dataset, 其中的 values 是针对每个 key 使用给定的函数 func 来进行聚合的, 它必须是 type (V,V) => V 的类型. 像 groupByKey 一样, reduce tasks 的数量是可以通过第二个可选的参数来配置的。

    运行时会现在分区内进行合并操作

  • 输入输出:
    def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
    def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
    def reduceByKey(func: (V, V) => V): RDD[(K, V)]
  • 示例:
  • val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
    val b = a.map(x => (x.length, x))
    b.reduceByKey(_ + _).collect
    res19: Array[(Int, String)] = Array((4,lion), (6,spider), (3,dogcat), (5,tigereagle))

aggregateByKey(func)

  • 含义:

    在 (K, V) pairs 的 dataset 上调用时, 返回 (K, U) pairs 的 dataset,其中的 values 是针对每个 key 使用给定的 combine 函数以及一个 neutral "0" 值来进行聚合的. 允许聚合值的类型与输入值的类型不一样, 同时避免不必要的配置. 像 groupByKey 一样, reduce tasks 的数量是可以通过第二个可选的参数来配置的.

  • 输入输出:
    def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]
    def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]
    def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]

需要说明的是第一个函数即(U, V) => U用于在分区内部合并数据,而第二个函数(U, U) => U则用于不同分区间数据的合并

  • 示例:
  • //首先根据mapPartitionsWithIndex函数查看数据的分布情况,便于后面理解计算结果
    val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
    def myfunc(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
      iter.map(x => "[partID:" +  index + ", val: " + x + "]")
    }
    pairRDD.mapPartitionsWithIndex(myfunc).foreach(println)
    /**
     * 0:(cat,2),(cat,5),(mouse,4)
     * 1:(mouse,2),(dog,12),(cat,12)
     * */
    //先计算每个分区单个key的最大值(),然后将不同分区的值相加
    pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
    //    res3: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
    
    pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
    //    res4: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))

combineByKey

  • 含义:

    使用用户自定义的聚合函数对每个Key中的Value进行组合(combine)。可以将输入类型为RDD[(K, V)]转成成RDD[(K, C)]。

  • 输入输出
    def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      numPartitions: Int): RDD[(K, C)]
    
     def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null): RDD[(K, C)]
    
    def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)]  

主要参数包括createCombinermergeValuemergeCombiners三个函数,其对数据类型转换示意图如下:

spark源码对这三个参数解释如下:

     @param createCombiner function to create the initial value of the aggregation.
     @param mergeValue function to merge a new value into the aggregation result.
     @param mergeCombiners function to merge outputs from multiple mergeValue function.

由于聚合操作会遍历分区中所有的元素,因此每个元素(键值对)的键只有两种情况,即以前出现过的和没出现过的。分区内如果没有出现过,聚合执行的是createCombiner方法,否则执行更新,即mergeValue方法。

分区间的聚合操作采用mergeCombiners方法。

  • 示例:

    根据单词长度分组

    scala>     val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
    a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[27] at parallelize at <console>:24
    
    scala>     val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
    b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at <console>:24
    
    scala>     val c = b.zip(a)
    c: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[29] at zip at <console>:28
    
    scala> c.collect
    res24: Array[(Int, String)] = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))
    
    scala>     val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
    d: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[30] at combineByKey at <console>:30
    
    scala> d.collect
    res25: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf)))
    
    scala>

    根据人名进行平均数计算

    scala> import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.SparkSession
    
    scala>     type ScoreCollector = (Int, Double)
    defined type alias ScoreCollector
    
    scala>     type PersonScores = (String, (Int, Double))
    defined type alias PersonScores
    
    scala>     val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0))
    initialScores: Array[(String, Double)] = Array((Fred,88.0), (Fred,95.0), (Fred,91.0), (Wilma,93.0), (Wilma,95.0), (Wilma,98.0))
    
    scala>     val wilmaAndFredScores = sc.parallelize(initialScores).cache()
    wilmaAndFredScores: org.apache.spark.rdd.RDD[(String, Double)] = ParallelCollectionRDD[0] at parallelize at <console>:27
    
    scala>     val createScoreCombiner = (score: Double) => (1, score)
    createScoreCombiner: Double => (Int, Double) = <function1>
    
    scala>     val scoreCombiner = (collector: ScoreCollector, score: Double) => {
         |       val (numberScores, totalScore) = collector
         |       (numberScores + 1, totalScore + score)
         |     }
    scoreCombiner: (ScoreCollector, Double) => (Int, Double) = <function2>
    
    scala>     val scoreMerger = (collector1: ScoreCollector, collector2: ScoreCollector) => {
         |       val (numScores1, totalScore1) = collector1
         |       val (numScores2, totalScore2) = collector2
         |       (numScores1 + numScores2, totalScore1 + totalScore2)
         |     }
    scoreMerger: (ScoreCollector, ScoreCollector) => (Int, Double) = <function2>
    
    scala>
    
    scala>     val scores = wilmaAndFredScores.combineByKey(createScoreCombiner, scoreCombiner, scoreMerger)
    scores: org.apache.spark.rdd.RDD[(String, (Int, Double))] = ShuffledRDD[1] at combineByKey at <console>:37
    
    scala> scores.collect
    res0: Array[(String, (Int, Double))] = Array((Wilma,(3,286.0)), (Fred,(3,274.0)))
    
    scala> val averagingFunction = (personScore: PersonScores) => {
         |       val (name, (numberScores, totalScore)) = personScore
         |       (name, totalScore / numberScores)
         |     }
    averagingFunction: PersonScores => (String, Double) = <function1>
    
    scala>     val averageScores = scores.collectAsMap().map(averagingFunction)
    averageScores: scala.collection.Map[String,Double] = Map(Fred -> 91.33333333333333, Wilma -> 95.33333333333333)

需要说明的groupByKey,reduceByKey,aggregateByKey,以及foldByKey都是通过调用combineByKey(combineByKeyWithClassTag)来实现的,具体实现方式可以参考org.apache.spark.rdd.PairRDDFunctions类。

foldByKey(func)

  • 含义:

    RDD[K,V]根据K将V做折叠、合并处理,其中先将zeroValue应用于V(同一个分区单个key应用一次),再将映射函数应用于处理后的V

  • 输入输出:
    def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
    def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
    def foldByKey(zeroValue: V,partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] 
  • 示例:

    scala> val aa = sc.parallelize(List( ("cat",2), ("mouse", 2),("cat", 3), ("dog", 4), ("mouse", 2), ("cat", 1)),2 )

    aa: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[18] at parallelize at

    scala> def myfunc(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
         |   iter.map(x => "[partID:" +  index + ", val: " + x + "]")
         | }
    myfunc: (index: Int, iter: Iterator[(String, Int)])Iterator[String]
    
    scala> aa.mapPartitionsWithIndex(myfunc).foreach(println)
    [partID:1, val: (dog,4)]
    [partID:0, val: (cat,2)]
    [partID:1, val: (mouse,2)]
    [partID:0, val: (mouse,2)]
    [partID:1, val: (cat,1)]
    [partID:0, val: (cat,3)]
    
    scala> aa.foldByKey(0)(_+_).collect()
    res10: Array[(String, Int)] = Array((dog,4), (cat,6), (mouse,4))
    
    scala> aa.foldByKey(2)(_+_).collect()
    res11: Array[(String, Int)] = Array((dog,6), (cat,10), (mouse,8))
    
    scala>
    
    scala> val bb = sc.parallelize(List( ("cat",2), ("mouse", 2),("cat", 3), ("dog", 4), ("mouse", 2), ("cat", 1)),3 )
    bb: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[22] at parallelize at <console>:25
    
    scala> bb.mapPartitionsWithIndex(myfunc).foreach(println)
    [partID:0, val: (cat,2)]
    [partID:2, val: (mouse,2)]
    [partID:1, val: (cat,3)]
    [partID:2, val: (cat,1)]
    [partID:0, val: (mouse,2)]
    [partID:1, val: (dog,4)]
    
    scala> bb.foldByKey(2)(_+_).collect()
    res13: Array[(String, Int)] = Array((cat,12), (mouse,8), (dog,6))
    
    scala> aa.foldByKey(0)(_*_).collect()
    res14: Array[(String, Int)] = Array((dog,0), (cat,0), (mouse,0))
    
    scala> aa.foldByKey(1)(_*_).collect()
    res15: Array[(String, Int)] = Array((dog,4), (cat,6), (mouse,4))

首先查看了aa数据的分布情况,两个分区,分区0内三个元素((cat,2),(mouse,2),(cat,3)),分区1内三个元素((dog,4),(mouse,2),(cat,1)),计算过程示意如下:

由此可见zeroValue化后,分区0中的(cat,2)变为了(cat,4),而同分区的(cat,3)没有发生变化。分区1中的(cat,1)变成了(cat,3),故cat最后的结果为10。并不是所有的元素都加2,而是同一个分区的单个元素加2。bb的结果可以对应去对比分析。

其实很好理解,foldByKey是通过调用combineByKeyWithClassTag方法实现的,zeroValue方法对应combineByKeyWithClassTag中的createCombiner,而combineByKey是通过调用org.apache.spark.Aggregator来实现的,关键源码如下:

    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }

分区内的计算通过调用aggregator.combineValuesByKey(iter, context),iter是单个分区的迭代器,

      def combineValuesByKey(
          iter: Iterator[_ <: Product2[K, V]],
          context: TaskContext): Iterator[(K, C)] = {
        val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
        combiners.insertAll(iter)
        updateMetrics(context, combiners)
        combiners.iterator
      }

org.apache.spark.util.collection.util.collection.ExternalAppendOnlyMap中insertAll方法如下:

    def insertAll(entries: Iterator[Product2[K, V]]): Unit = {
        if (currentMap == null) {
          throw new IllegalStateException(
            "Cannot insert new elements into a map after calling iterator")
        }
        // An update function for the map that we reuse across entries to avoid allocating
        // a new closure each time
        var curEntry: Product2[K, V] = null
        val update: (Boolean, C) => C = (hadVal, oldVal) => {
          if (hadVal) mergeValue(oldVal, curEntry._2) else createCombiner(curEntry._2)
        }

        while (entries.hasNext) {
          curEntry = entries.next()
          val estimatedSize = currentMap.estimateSize()
          if (estimatedSize > _peakMemoryUsedBytes) {
            _peakMemoryUsedBytes = estimatedSize
          }
          if (maybeSpill(currentMap, estimatedSize)) {
            currentMap = new SizeTrackingAppendOnlyMap[K, C]
          }
          currentMap.changeValue(curEntry._1, update)
          addElementsRead()
        }
      }

其update说明了分区内没出现过,聚合执行的是createCombiner,否则执行mergeValue。

sortByKey(func)

  • 含义

    将RDD的数据按照key排序重组后再保存到RDD中

  • 输入输出:
  • 示例:
  • scala>     val a = sc.parallelize(List( ("cat",2), ("mouse", 2),("bear", 3), ("dog", 4), ("ant", 2), ("horse", 1)),2 )
    a: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[49] at parallelize at <console>:25
    
    scala>     def myfunc3(index: Int, iter: Iterator[(String,Int)]) : Iterator[String] = {
         |       iter.map(x => index + "," +x.toString())
         |     }
    myfunc3: (index: Int, iter: Iterator[(String, Int)])Iterator[String]
    
    scala>     a.mapPartitionsWithIndex(myfunc3).collect().sorted.foreach(println)
    0,(bear,3)
    0,(cat,2)
    0,(mouse,2)
    1,(ant,2)
    1,(dog,4)
    1,(horse,1)
    
    scala>     a.sortByKey(true).mapPartitionsWithIndex(myfunc3).collect().sorted.foreach(println)
    0,(ant,2)
    0,(bear,3)
    0,(cat,2)
    1,(dog,4)
    1,(horse,1)
    1,(mouse,2) 

Action常用操作

常用Action算子如下:

Action Meaning
reduce(func) Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect() Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count() Return the number of elements in the dataset.
first() Return the first element of the dataset (similar to take(1)).
take(n) Return an array with the first n elements of the dataset.
takeSample(withReplacement, num, [seed]) Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
takeOrdered(n, [ordering]) Return the first n elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path) Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path)
(Java and Scala)
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop‘s Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
saveAsObjectFile(path)
(Java and Scala)
Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().
countByKey() Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
foreach(func) Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

countByKey(func)

  • 含义:

    countByKey用于统计RDD[K,V]中每个K的数量,结果以Map的形式返回到driver端。

    如果结果集比较大,可以考虑用rdd.mapValues(_ => 1L).reduceByKey(_ + _)达到相同的统计目的,返回格式为RDD[T,Long]

  • 输入输出:
    def countByKey(): Map[K, Long]  
  • 示例:
    scala>     val aa = sc.parallelize(List( ("cat",2), ("mouse", 2),("cat", 3), ("dog", 4), ("mouse", 2), ("cat", 1)),2 )
    aa: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[29] at parallelize at <console>:25
    
    scala>     aa.countByKey()
    res17: scala.collection.Map[String,Long] = Map(dog -> 1, cat -> 3, mouse -> 2)
    
    scala>     aa.mapValues(_=>1).reduceByKey(_+_)
    res18: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[33] at reduceByKey at <console>:28
    
    scala>     aa.mapValues(_=>1).reduceByKey(_+_).collect()
    res19: Array[(String, Int)] = Array((dog,1), (cat,3), (mouse,2))
    
    scala>     aa.mapValues(_=>1).reduceByKey(_+_).collectAsMap()
    res20: scala.collection.Map[String,Int] = Map(cat -> 3, dog -> 1, mouse -> 2)   

Accumulators

Accumulators(累加器)是一个仅可以执行 “added”(添加)的变量来通过一个关联和交换操作,因此可以高效地执行支持并行。累加器可以用于实现 counter( 计数,类似在 MapReduce 中那样)或者 sums(求和。

spark主要有LongAccumulator,DoubleAccumulator,CollectionAccumulator三类累加器,LongAccumulator,DoubleAccumulator主要用来数值累加,CollectionAccumulator用于list元素的累加。示例如下:

    scala> import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.SparkSession

    scala>     val spark=SparkSession.builder().appName("AccumutorTest").getOrCreate()
    18/03/05 14:30:19 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
    spark: org.apache.spark.sql.SparkSession = [email protected]

    scala>     val longAccumutor=spark.sparkContext.longAccumulator("longValue")
    longAccumutor: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 48, name: Some(longValue), value: 0)

    scala>     val doubleAccumutor=spark.sparkContext.doubleAccumulator("doubleValue")
    doubleAccumutor: org.apache.spark.util.DoubleAccumulator = DoubleAccumulator(id: 49, name: Some(doubleValue), value: 0.0)

    scala>     val collectAccumutor=spark.sparkContext.collectionAccumulator[String]("listValue")
    collectAccumutor: org.apache.spark.util.CollectionAccumulator[String] = CollectionAccumulator(id: 50, name: Some(listValue), value: [])

    scala>     val a = spark.sparkContext.parallelize(List( ("cat",2), ("mouse", 2),("bear", 3), ("dog", 4), ("ant", 2), ("horse", 1)),2 )
    a: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[2] at parallelize at <console>:29

    scala>     a.foreach(x=>longAccumutor.add(x._2))

    scala>     longAccumutor.value
    res7: Long = 14

    scala>     a.foreach(x=>collectAccumutor.add(x._1))

    scala>     collectAccumutor.value
    res9: java.util.List[String] = [cat, mouse, bear, dog, ant, horse]

Broadcasts

Broadcast variables(广播变量)允许程序员将一个 read-only(只读的)变量缓存到每台机器上,而不是给任务传递一个副本。它们是如何来使用呢,例如,广播变量可以用一种高效的方式给每个节点传递一份比较大的 input dataset(输入数据集)副本。在使用广播变量时,Spark 也尝试使用高效广播算法分发 broadcast variables(广播变量)以降低通信成本。

Spark 的 action(动作)操作是通过一系列的 stage(阶段)进行执行的,这些 stage(阶段)是通过分布式的 “shuffle” 操作进行拆分的。Spark 会自动广播出每个 stage(阶段)内任务所需要的公共数据。这种情况下广播的数据使用序列化的形式进行缓存,并在每个任务运行前进行反序列化。这也就意味着,只有在跨越多个 stage(阶段)的多个任务会使用相同的数据,或者在使用反序列化形式的数据特别重要的情况下,使用广播变量会有比较好的效果。

广播变量通过在一个变量 v 上调用 SparkContext.broadcast(v) 方法来进行创建。广播变量是 v 的一个 wrapper(包装器),可以通过调用 value 方法来访问它的值。代码示例如下:

    scala> import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.SparkSession

    scala>     val spark=SparkSession.builder().appName("Broadcast").getOrCreate()
    18/03/05 13:59:26 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
    spark: org.apache.spark.sql.SparkSession = [email protected]

    scala>     val slices=2
    slices: Int = 2

    scala>     val num=1000000
    num: Int = 1000000

    scala>     val arr1=new Array[Int](num)
    arr1: Array[Int] = Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0...
    scala>     for(i<-0 until arr1.length){
         |       arr1(i)=i
         |     }

    scala>     val arr2=new Array[Int](num)
    arr2: Array[Int] = Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0...
    scala>     for (i<-0 until arr2.length){
         |       arr2(i)=i
         |     }

    scala>     val barr1=spark.sparkContext.broadcast(arr1)
    barr1: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

    scala>     val barr2=spark.sparkContext.broadcast(arr2)
    barr2: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(1)

    scala>     val observedSizes=spark.sparkContext.parallelize(0 to 10,slices).map(_=>(barr1.value.length,barr2.value.length))
    observedSizes: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[1] at map at <console>:40

    scala>     observedSizes.collect().foreach(i => println(i))
    (1000000,1000000)
    (1000000,1000000)
    (1000000,1000000)
    (1000000,1000000)
    (1000000,1000000)
    (1000000,1000000)
    (1000000,1000000)
    (1000000,1000000)
    (1000000,1000000)
    (1000000,1000000)
    (1000000,1000000)
    

在创建广播变量之后,在集群上执行的所有的函数中,应该使用该广播变量代替原来的 v 值,所以节点上的 v 最多分发一次。另外,对象 v 在广播后不应该再被修改,以保证分发到所有的节点上的广播变量具有同样的值(例如,如果以后该变量会被运到一个新的节点)。

原文地址:https://www.cnblogs.com/molyeo/p/9246365.html

时间: 2024-11-02 08:28:20

spark知识体系03-Rdds,Accumulators,Broadcasts的相关文章

大型运维知识体系v2.0

转载请注明来自-运维社区https://www.unixhot.com/page/ops 运维知识体系-V2.0 By:2016年12月26日更新 运维架构层级/运维角度 内容描述/主要技术关键词 监控体系 安全体系 备份体系 自动化体系 云计算 客户端层 浏览器 Cookie.浏览器缓存协商(Last-Modified.Expires.Etag).组件分离.前端优化(提高浏览器并发数.避免静态资源Cookie上传).运维检测工具 舆论监控(第三方) 外部网络监控 APM 加速乐.牛盾.安全宝.

【转】大型网站架构演变和知识体系

大型网站架构演变和知识体系 之前也有一些介绍大型网站架构演变的文章,例如LiveJournal的.ebay的,都是非常值得参考的,不过感觉他们讲的更多的是每次演变的结果,而没有很详细的讲为什么需要做这样的演变,再加上近来感觉有不少同学都很难明白为什么一个网站需要那么复杂的技术,于是有了写这篇文章的想法,在这篇文章中 将阐述一个普通的网站发展成大型网站过程中的一种较为典型的架构演变历程和所需掌握的知识体系,希望能给想从事互联网行业的同学一点初步的概念,:),文中的不对之处也请各位多给点建议,让本文

运维知识体系v0.5

http://www.90qj.com/?post=318http://ixdba.blog.51cto.com/2895551/1751377   运维知识体系v0.5-(运维社区-赵班长出品,欢迎转载!) 运维管理体系 测试和开发相关 运维架构层级 内容描述 监控体系 安全体系 备份体系 自动化体系 管理必知必会 ITSM ITIL IT Service CMM Six Sigma PMBok 涉及到运维参与 性能测试(TCPCopy) 单机监控(nmon) 环境规划(开发.测试.预生产.生

研究生期间知识体系规划 .

现在研二了,自然语言处理方向的.一直以来不清楚自己要进行怎样的知识体系,通过研一的学习和手头触及的一点事,慢慢理清了自己要进行的知识体系.这个知识体系有四个方面:专业基础方面,项目方面,论文方面,综合素质方面. 专业基础 专业基础第一点是java语言学习.传说中有些牛逼院校可能不是很注重编程语言学习,但结合我自身来看,好好学习一门语言很重要的.怎么进行java方面学习呢?第一个阶段,找一门java基础教程,认真走一遍.很多人到这以后就开始进行java某个应用方面进行发展了,学习框架.调用三方接口

yarn知识体系总结

2019/2/18 星期一 yarn知识体系总结 Yarn 产生的原因(1)MapreduceV1 中,jobtracker 存在瓶颈:集群上运行的所有mr 程序都有jobtracker 来调度SPOF 单点故障职责划分不清晰(2) 将jobtracker 的职责划分成两个部分:? 资源调度与管理:由统一的资源调度平台(集群)来实现(yarn)? 任务监控与管理:A.每一个application 运行时拥有一个自己的任务监控管理进程AppMasterB.AppMaster 的生命周期:appli

一文读懂分布式架构知识体系(内含超全核心知识大图)

作者 | 晓土  阿里巴巴高级工程师 姊妹篇阅读推荐:<云原生时代,分布式系统设计必备知识图谱(内含22个知识点)> 导读:本文力求从分布式基础理论.架构设计模式.工程应用.部署运维.业界方案这几大方面,介绍基于 MSA(微服务架构)的分布式知识体系大纲,从而对 SOA 到 MSA 进化有着立体的认识:从概念上和工具应用上更近一步了解微服务分布式的本质,身临其境的感受如何搭建全套微服务架构的过程. 关注“阿里巴巴云原生”公众号,回复“分布”,即可下载分布式系统及其知识体系清晰大图! 随着移动互

[转帖]一文读懂分布式架构知识体系(内含超全核心知识大图)

一文读懂分布式架构知识体系(内含超全核心知识大图) https://yq.aliyun.com/articles/721007?spm=a2c4e.11153959.0.0.2f464977X7lSdH 作者 | 晓土  阿里巴巴高级工程师 姊妹篇阅读推荐:<云原生时代,分布式系统设计必备知识图谱(内含22个知识点)> 导读:本文力求从分布式基础理论.架构设计模式.工程应用.部署运维.业界方案这几大方面,介绍基于 MSA(微服务架构)的分布式知识体系大纲,从而对 SOA 到 MSA 进化有着立

认知,构建个人的知识体系(上)

1.前言 本文将聊聊我对构建个人知识体系的一些想法,主要是为了提升自我认知.从个人经历开始,谈谈对知识的划分,也就是一个是什么,为什么的过程. 2.缘起 把时间回到一年前,那时候我工作快一年了,得益于前面的一些努力,工作比较顺利.特别是技术上,没有遇到太多过无法解决的问题.同时也开始迷茫,工作难道就是这个轻松的样子?三五年之后那不是很无趣,该怎么办? 想找到这个问题的答案,而最好的方式莫过于,亲自去了解那些三五年工作经验的人是怎么的样子. 因此从那时候起,关注了不少来公司面试的人的简历,也有过几

2016年本博客知识体系引导(持续更新)

前言 为了方便大家的阅读以及自己的知识体系的建立,特意来写出这个引导文章.以前我是遇到什么写什么,想到什么写什么,从2016年开始我将围绕这个知识体系来写文章,从点到面来逐步建立并完善自己的知识体系,同时也欢迎大家关注我的博客. 1.Android IPC机制 Android IPC机制(一)开启多进程 Android IPC机制(二)用Messenger进行进程间通信 Android IPC机制(三)在Android Studio中使用AIDL实现跨进程方法调用 Android IPC机制(四