Spark RDD的fold和aggregate为什么是两个API?为什么不是一个foldLeft?

欢迎关注我的新博客地址:http://cuipengfei.me/blog/2014/10/31/spark-fold-aggregate-why-not-foldleft/

大家都知道Scala标准库的List有一个用来做聚合操作的foldLeft方法。

比如我定义一个公司类:

1
case class Company(name:String, children:Seq[Company]=Nil)

它有名字和子公司。 然后定义几个公司:

1
val companies = List(Company("B"),Company("A"),Company("T"))

三家大公司,然后呢,我假设有一家超牛逼的公司把它们给合并了:

1
companies.foldLeft(Company("King"))((king,company)=>Company(name=king.name,king.children:+company))

这个执行的结果是这样的:

1
2
scala> companies.foldLeft(Company("King"))((king,company)=>Company(name=king.name,king.children:+company))
res6: Company = Company(King,List(Company(B,List()), Company(A,List()), Company(T,List())))

可见foldLeft的结果是一家包含了BAT三大家得新公司。

由List[Company]聚合出一个新的Company,这种属于foldLeft的同构聚合操作。

同时,foldLeft也可以做异构的聚合操作:

1
companies.foldLeft("")((acc,company)=>acc+company.name)

它的执行结果是这样的:

1
2
scala> companies.foldLeft("")((acc,company)=>acc+company.name)
res7: String = BAT

由List[Company]聚合出一个String。

这样的API感觉很方便,只要是聚合,无论同构异构,都可以用它来做。

最近接触了Spark,其中的RDD是做分布式计算时最常用的一个类。

RDD有一个叫做fold的API,它和foldLeft的签名很像,唯一区别是它只能做同构聚合操作。

也就是说如果你有一个RDD[X],通过fold,你只能构造出一个X。

如果我想通过一个RDD[X]构造一个Y出来呢?

那就得用aggregate这个API了,aggregate的签名是这样的:

1
aggregate[U](zeroValue: U)(seqOp: (U, T) ? U, combOp: (U, U) ? U)(implicit arg0: ClassTag[U]): U

它比fold和foldLeft多需要一个combOp做参数。

这让我很不解,同构和异构的API干嘛非得拆成两个呢?怎么不能学Scala的标准库,把它做成类似foldLeft的样子呢?

后来想明白了,这是由于Spark需要分布运算造成的。

先想一下Scala List的foldLeft是怎么工作的?

1
companies.foldLeft(Company("King"))((king,company)=>Company(name=king.name,king.children:+company))
  1. 拿到初始值,即名字为king的公司,把它和list中的第一个公司合并,成为一个包含一家子公司的新公司
  2. 把上一步中的新公司拿来和list中的第二个公司合并,成为一个包含两家子公司的新公司
  3. 把上一步中的新公司拿来和list中的第三个公司合并,成为一个包含三家子公司的新公司

这是同构的过程。

1
companies.foldLeft("")((acc,company)=>acc+company.name)
  1. 拿到初始值,即空字符串,把它和list中的第一个公司的名字拼在一起,成为B
  2. 把上一步中的B第二个公司名字拼一起,成为BA
  3. 把上一步中的BA拿来和list中的第三个公司的名字拼一起,成为BAT

这是异构的过程。

像多米诺骨牌一样,从左到右依次把list中的元素吸收入结果中。

现在假设RDD[X]中有一个类似foldLeft的API,其签名和foldLeft一致,我现在调用foldLeft,给它一个f:(Y,X)=>Y,接下来该发生什么呢?

  1. 因为要分布计算,所以我先要把手里的很多个X分成几份,分发到不同的节点上去
  2. 每个节点把拿到的很多个X计算出一个Y出来
  3. 把所有节点的结果拿来,这时我手里就有了很多个Y
  4. 啊。。。我不知道怎么把很多个Y变成一个Y啊。。。

由于Spark的RDD不像Scala的List一样只需要推倒一副多米诺骨牌,而是要推倒很多副,最后再对很多副多米诺骨牌的结果做聚合。

这时如果是同构还好,我只需要再用f:(X,X)=>X做一遍就ok了。

但是如果是异构的,那我就必须得再需要一个f:(Y,Y)=>Y了。

时间: 2024-07-29 05:55:32

Spark RDD的fold和aggregate为什么是两个API?为什么不是一个foldLeft?的相关文章

Spark RDD编程(二)

转载请注明出处:http://blog.csdn.net/gamer_gyt @高阳团 博主微博:http://weibo.com/234654758 Github:https://github.com/thinkgamer ============================================================ SparkRDD编程(一) Spark 的键值对(pair RDD)操作,Scala实现 RDD的分区函数 目前Spark中实现的分区函数包括两种 Ha

Spark RDD Action操作

reduce def reduce(f: (T, T) => T): T通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的 1 2 3 4 5 6 7 8 9 10 11 scala> val rdd1 = sc.makeRDD(1 to 10,2) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at makeRDD at <console>:24 scala> rdd1.r

Spark RDD aggregateByKey

aggregateByKey 这个RDD有点繁琐,整理一下使用示例,供参考 直接上代码 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkContext, SparkConf} /** * Created by Edward on 2016/10/27. */ object AggregateByKey { def main(args: Array[String]) { val sparkConf: SparkConf =

spark RDD transformation与action函数巩固 (未完)

1.创建RDD val lines = sc.parallelize(List("pandas","i like pandas")) 2.加载本地文件到RDD val linesRDD = sc.textFile("yangsy.txt") 3.过滤 filter 需要注意的是 filter并不会在原有RDD上过滤,而是根据filter的内容重新创建了一个RDD val spark = linesRDD.filter(line => lin

Spark RDD Transformation 简单用例(二)

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine

Spark Rdd coalesce()方法和repartition()方法

在Spark的Rdd中,Rdd是分区的. 有时候需要重新设置Rdd的分区数量,比如Rdd的分区中,Rdd分区比较多,但是每个Rdd的数据量比较小,需要设置一个比较合理的分区.或者需要把Rdd的分区数量调大.还有就是通过设置一个Rdd的分区来达到设置生成的文件的数量. 有两种方法是可以重设Rdd的分区:分别是 coalesce()方法和repartition(). 这两个方法有什么区别,看看源码就知道了: def coalesce(numPartitions: Int, shuffle: Bool

Spark RDD Action 简单用例(一)

collectAsMap(): Map[K, V] 返回key-value对,key是唯一的,如果rdd元素中同一个key对应多个value,则只会保留一个./** * Return the key-value pairs in this RDD to the master as a Map. * * Warning: this doesn't return a multimap (so if you have multiple values to the same key, only * on

Spark RDD、DataFrame和DataSet的区别

版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] 转载请标明出处:小帆的帆的专栏 RDD 优点: 编译时类型安全 编译时就能检查出类型错误 面向对象的编程风格 直接通过类名点的方式来操作数据 缺点: 序列化和反序列化的性能开销 无论是集群间的通信, 还是IO操作都需要对对象的结构和数据进行序列化和反序列化. GC的性能开销 频繁的创建和销毁对象, 势必会增加GC import org.apache.spark.sql.SQLContext import org.apache

Spark RDD解密

1.  基于数据集的处理: 从物理存储上加载数据,然后操作数据,然后写入数据到物理设备; 基于数据集的操作不适应的场景: 不适合于大量的迭代: 不适合交互式查询:每次查询都需要对磁盘进行交互. 基于数据流的方式不能够复用曾经的结果或者中间的结果; 2. RDD弹性数据集 特点: A)自动的进行内存和磁盘数据的存储切换: B) 基于lineage的高效容错: C) Task如果失败会自动进行重试 D) Stage如果失败会自动进行重试,而且只会计算失败的分片; E) Checkpoint和pers