RDD转换算子与操作算子

一、RDD算子分类

1. RDD算子分类及概述

  RDD的算子分为Transformation和Action两类,Transformation是延迟执行,Action是立即执行。Transformation和Action本质上的区别是,Transformation是从一个RDD到一个RDD,Action是从一个RDD到一个值。由下图可知,Spark的的转换算子与操作算子的执行流程。首先可以从HDFS中使用textFile方法将数据加载到内存,然后经过转换算子对RDD进行转换,最后再通过操作算子Actions,如saveAsTextFile,将结果存回HDFS中去。

2. 源码解读Transformation算子与Action算子的本质区别

  • Transformation本质
//例如flatMap,应用在RDD上,结果是new MapPartitionsRDD,这个方法就结束了。
/**
 *  Return a new RDD by first applying a function to all elements of this
 *  RDD, and then flattening the results.
 */
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
  •  Action本质
//例如reduce,应用在RDD上,结果是使用sc执行了runJob方法。
/**
 * Reduces the elements of this RDD using the specified commutative and
 * associative binary operator.
 */
def reduce(f: (T, T) => T): T = withScope {
  val cleanF = sc.clean(f)
  val reducePartition: Iterator[T] => Option[T] = iter => {
    if (iter.hasNext) {
      Some(iter.reduceLeft(cleanF))
    } else {
      None
    }
  }
  var jobResult: Option[T] = None
  val mergeResult = (index: Int, taskResult: Option[T]) => {
    if (taskResult.isDefined) {
      jobResult = jobResult match {
        case Some(value) => Some(f(value, taskResult.get))
        case None => taskResult
      }
    }
  }
  sc.runJob(this, reducePartition, mergeResult)
  // Get the final result out of our Option, or throw an exception if the RDD was empty
  jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

二、常用Transformations算子及释义

1. textFile

  将文件加载到内存,抽象成一个RDD,是一个Transformation操作。

2. map

  map(f:T=>U) : RDD[T]=>RDD[U]

  map函数需要一个参数,此参数是一个匿名函数,在参数进来时使用f来接收。这个匿名函数需要的参数进来时是T类型,进过匿名函数转化之后变成U类型。这些算子都是操作在RDD上的,那么函数map(f:T=>U)的功能是把一个RDD[T],这RDD里面每个元素是类型的转成U类型。map函数实质上就是对RDD集合中的每个元素依次进行操作。

3. filter

  filter(f:T=>Bool) : RDD[T]=>RDD[T]

  filter函数,传一个匿名函数进来,参数为T,匿名函数转换后变成Boolean类型,如果参数是True就被保留下来,如果是False就被过滤掉。RDD[T]=>RDD[T],filter作用前后RDD中的元素都是T类型,只是把不符合的元素去除了。

4. flatmap

  flatmap(f:T=>Seq[U]) : RDD[T]=>RDD[U]

  一个flat+一个map,map是对集合的每个元素进行操作,flat英文直译是压扁,flatmap(f:T=>Seq[U])中的Seq是集合序列之意。例如,一个英文句子,进行分词可以按空格隔开,句子进来就是String类型,对每个元素进行分词就是一个个单词组成的String数组,这就是一个Seq集合;如果对于一个句子使用map的话出去的还是一个句子。因为map之后是一个数组,并没有打散成为一个个单词,所以flat的作用就在于此。

5. sample

  sample(fraction:Float) : RDD[T]=>RDD[T](Deterministic sampling)

  传入浮点数,采样。

6. groupByKey

  groupByKey() : RDD[(K,V)]=>RDD[(K,Seq[V])]

  按Key进行分组,必须运用于一个元素为键值对形式的RDD,可以按照Key进行分组,把相同Key的值放到一个集合中去。

7. reduceByKey

  reduceByKey(f:(V,V)=>V) : RDD[(K,V)]=>RDD[(K,V)]

  reduceByKey=一个groupByKey()+一个reduce。先groupByKey(),把相同的Key的value放到一个序列中;然后进行reduce操作,例如:_+_就是把每一个元素依次进行操作。RDD进来时的每一个元素是(K,V),这里的Key可能重复;出去的RDD的元素也是(K,V),但是这里的Key无重复,因为有groupByKey()操作。

8. union

  union() : (RDD[T], RDD[T])=>RDD[T]

  union意为联合,把两个相同元素类型的RDD,转化到一起去。

9. join

  join() : (RDD[(K,V)], RDD[(K,W)]) => RDD[(K,(V,W))]

  join是把两个以键值对形式的为元素的RDD,按K进行分组,转化成一个RDD,这个RDD的元素依旧是键值对形式,键为K,值为由原来相同键的值组成的元组。

10. cogroup

  cogroup() : (RDD[(K,V)], RDD[(K,W)])=> RDD[(K,(Seq[V],Seq[W]))]

  cogroup把两个RDD变成一个RDD,最后的RDD是键值对形式,按K分组,把值V和W变成,数组序列。

11. crossProduct

  crossProduct() : (RDD[T],RDD[U])=>RDD[(T,U)]

  把两个RDD中的元素放到一个RDD中,并且分别作为最终RDD元素的键和值。

12. mapValues

  mapValues(f:V=>W) : RDD[(K,V)]=>RDD[(K,W)](Preserves partitioning)

  mapValues是对RDD中的每个元素(K,V)中的V进行map操作,K保持不变。

13. sort

  sort(c:Comparator[K]) : RDD[(K,V)]=>RDD[(K,V)]

  sort进行排序,至于排序方法是和传进来的Comparator有关。RDD[(K,V)] => RDD[(K,V)],元素本身没有变,只是进行了排序。

14. sortBykey

  和sort类似,sortByKey是根据键进行排序。

15. sortBy

  sortBy与sortByKey不同,sortBy可以指定根据集合中的第几个元素进行排序。例如根据value排序,rdd.sortBy(_._2)。

16. partitionBy

  partitionBy(p:Partitioner[K]) : RDD[(K,V)]=>RDD[(K,V)]

  可以传入自定义的Partitioner,传入Key,然后根据Key重新进行Partitioner。这时再看RDD里的内容是完全没有变化的,只是内部有一个shuffle,重新分组,把数据多机的内存传到另外多机的内存中去。

三、常用Actions算子及释义

1. count

  count() : RDD[T]=>Long

  count是统计RDD里面元素的数量。因为count不知道RDD元素的个数,所以返回值类型为Long类型。

2. collect

  collect() : RDD[T]=>Seq[T]

  collect,把一个RDD返回到一个集合序列中去。RDD[T]=>Seq[T],RDD[T]是弹性分布式数据集,Seq[T]是单机的集合。collect是很恐怖的操作,相当于把分布式的数据,拿到当前的机器或当前客户端的程序中来,如果数据量大而且当前机器的内存小的话机器就容易挂掉。

3. reduce

  reduce(f:(T,T)=>T) : RDD[T]=>T

  reduce和reduceByKey不同,reduceByKey是一个Transformation,而reduce是一个action。reduce可以将以T为元素类型的RDD转成T类型。例如一个集合是(1,4,6,4,1),对它进行reduce就会返回相同的类型的值。

4. lookup

  lookup(k:K) : RDD[(K,V)=>Seq[V](On hash/range partitioned RDDs)

  根据Key,把Key的value都找出来,然后放到本地的集合序列中去。

5. save

  save(path:String) : Outputs RDD to a storage system, e.g., HDFS

  save需要传一个路径,把RDD的结果给输出到文件中去,可以是本地的也可以是hdfs的。

6. saveAsTextFile

  saveAsTextFlie(path:String) : Outputs RDD to a storage system, e.g., HDFS

  这个saveAsTextFlie不能使用本地的路径。

7. foreach

  对RDD中的每个元素进行遍历操作。例如,打印RDD的每个元素:

  rdd.foreach(println)

  更多资源可以参看Zhen He RDD API

原文地址:https://www.cnblogs.com/yangp/p/8687668.html

时间: 2024-07-29 18:37:25

RDD转换算子与操作算子的相关文章

Spark常用函数讲解--键值RDD转换

摘要: RDD:弹性分布式数据集,是一种特殊集合 ‚ 支持多种来源 ‚ 有容错机制 ‚ 可以被缓存 ‚ 支持并行操作,一个RDD代表一个分区里的数据集RDD有两种操作算子:         Transformation(转换):Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住       了数据集的逻辑操作         Ation(执行):触发Spark作业的运行,真正触发转换算子的计算 本系列主要讲解Spark中常用的函数操作:   

【opencv入门之九】Opencv边缘检测:Canny算子,Sobel算子,Laplace算子,Scharr滤波器

参考网站: http://blog.csdn.net/poem_qianmo/article/details/25560901 1.边缘检测步骤 1)滤波:边缘检测的算法主要是基于图像强度的一阶和二阶导数,但导数通常对噪声很敏感.( 通常用高斯滤波 ) 2)增强:增强边缘的基础是确定图像各点领域强度的变化值.增强算法可以将图像灰度点领域强度值有显著变化的点凸显出来.( 可以通过计算梯度幅值来确定 ) 3)检测:经过增强的图像,往往领域中有很多点的梯度值比较大,而特定的应用中,这些点并不是我们要找

Spark之键值RDD转换(转载)

1.mapValus(fun):对[K,V]型数据中的V值map操作(例1):对每个的的年龄加2 object MapValues { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local").setAppName("map") val sc = new SparkContext(conf) val list = List(("mobin",22),

灰度图像--图像增强 Robert算子、Sobel算子

学习DIP第36天 转载请标明本文出处:http://blog.csdn.net/tonyshengtan,欢迎大家转载,发现博客被某些论坛转载后,图像无法正常显示,无法正常表达本人观点,对此表示很不满意.有些网站转载了我的博文,很开心的是自己写的东西被更多人看到了,但不开心的是这段话被去掉了,也没标明转载来源,虽然这并没有版权保护,但感觉还是不太好,出于尊重文章作者的劳动,转载请标明出处!!!! 文章代码已托管,欢迎共同开发:https://github.com/Tony-Tan/DIPpro

Spark中将RDD转换成DataFrame的两种方法

总结下Spark中将RDD转换成DataFrame的两种方法, 代码如下: 方法一: 使用createDataFrame方法 ```java //StructType and convert RDD to DataFrame val schema = StructType( Seq( StructField("name",StringType,true) ,StructField("age",IntegerType,true) ) ) val rowRDD = sp

RDD转换成DataFrames

官方提供了2种方法 1.利用反射来推断包含特定类型对象的RDD的schema.这种方法会简化代码并且在你已经知道schema的时候非常适用. 先创建一个bean类 case class Person(name: String, age: Int) 然后将Rdd转换成DataFrame val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p =

OpenCV2马拉松第15圈——边缘检測(Laplace算子,LOG算子)

收入囊中 拉普拉斯算子 LOG算子(高斯拉普拉斯算子) OpenCV Laplacian函数 构建自己的拉普拉斯算子 利用拉普拉斯算子进行图像的锐化 葵花宝典 在OpenCV2马拉松第14圈--边缘检測(Sobel,prewitt,roberts)  我们已经认识了3个一阶差分算子 拉普拉斯算子是二阶差分算子.为什么要增加二阶的算子呢?试想一下,假设图像中有噪声,噪声在一阶导数处也会取得极大值从而被当作边缘.然而求解这个极大值也不方便.採用二阶导数后,极大值点就为0了.因此值为0的地方就是边界.

OpenCV2马拉松第15圈——边缘检测(Laplace算子,LOG算子)

收入囊中 拉普拉斯算子 LOG算子(高斯拉普拉斯算子) OpenCV Laplacian函数 构建自己的拉普拉斯算子 利用拉普拉斯算子进行图像的锐化 葵花宝典 在OpenCV2马拉松第14圈--边缘检测(Sobel,prewitt,roberts)  我们已经认识了3个一阶差分算子 拉普拉斯算子是二阶差分算子,为什么要加入二阶的算子呢?试想一下,如果图像中有噪声,噪声在一阶导数处也会取得极大值从而被当作边缘.然而求解这个极大值也不方便,采用二阶导数后,极大值点就为0了,因此值为0的地方就是边界.

CAD转换器中批量转换文件的操作步骤是什么?

CAD转换器中批量转换文件的操作步骤是什么?现在在CAD行业当中,最基础的工作就是绘制CAD图纸,然后绘制完成的图纸都是dwg格式的,所以在进行查看的时候就需要将CAD图纸的格式进行转换,那在CAD转换器中批量转换文件的操作步骤是什么?具体要怎么来进行操作,想要了解的朋友就一起来看看吧,希望能够帮助到你们.以下就是具体操作步骤. 第一步:首先如果小伙伴们电脑中没有安装CAD转换器的,可以在电脑桌面中任意的打开一个浏览器,在浏览器的搜索框中搜索迅捷CAD转换器,点击进入到下载软件的界面当中,选择下