Spark算子:RDD基本转换操作(1)–map、flatMap、distinct

Spark算子:RDD基本转换操作(1)–map、flatMap、distinct

关键字:Spark算子、Spark RDD基本转换、map、flatMap、distinct

  • map

将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。

输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区。

  1. hadoop fs -cat /tmp/lxw1234/1.txt
  2. hello world
  3. hello spark
  4. hello hive
  5. //读取HDFS文件到RDD
  6. scala> var data = sc.textFile("/tmp/lxw1234/1.txt")
  7. data: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at :21
  8. //使用map算子
  9. scala> var mapresult = data.map(line => line.split("\\s+"))
  10. mapresult: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at :23
  11. //运算map算子结果
  12. scala> mapresult.collect
  13. res0: Array[Array[String]] = Array(Array(hello, world), Array(hello, spark), Array(hello, hive))
  • flatMap

属于Transformation算子,第一步和map一样,最后将所有的输出分区合并成一个。

  1. /使用flatMap算子
  2. scala> var flatmapresult = data.flatMap(line => line.split("\\s+"))
  3. flatmapresult: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at :23
  4. //运算flagMap算子结果
  5. scala> flatmapresult.collect
  6. res1: Array[String] = Array(hello, world, hello, spark, hello, hive)

使用flatMap时候需要注意:
flatMap会将字符串看成是一个字符数组。
看下面的例子:

  1. scala> data.map(_.toUpperCase).collect
  2. res32: Array[String] = Array(HELLO WORLD, HELLO SPARK, HELLO HIVE, HI SPARK)
  3. scala> data.flatMap(_.toUpperCase).collect
  4. res33: Array[Char] = Array(H, E, L, L, O, , W, O, R, L, D, H, E, L, L, O, , S, P, A, R, K, H, E, L, L, O, , H, I, V, E, H, I, , S, P, A, R, K)

再看:

  1. scala> data.map(x => x.split("\\s+")).collect
  2. res34: Array[Array[String]] = Array(Array(hello, world), Array(hello, spark), Array(hello, hive), Array(hi, spark))
  3. scala> data.flatMap(x => x.split("\\s+")).collect
  4. res35: Array[String] = Array(hello, world, hello, spark, hello, hive, hi, spark)

这次的结果好像是预期的,最终结果里面并没有把字符串当成字符数组。
这是因为这次map函数中返回的类型为Array[String],并不是String。
flatMap只会将String扁平化成字符数组,并不会把Array[String]也扁平化成字符数组。

参考:
http://alvinalexander.com/scala/collection-scala-flatmap-examples-map-flatten

  • distinct

对RDD中的元素进行去重操作。

    1. scala> data.flatMap(line => line.split("\\s+")).collect
    2. res61: Array[String] = Array(hello, world, hello, spark, hello, hive, hi, spark)
    3. scala> data.flatMap(line => line.split("\\s+")).distinct.collect
    4. res62: Array[String] = Array(hive, hello, world, spark, hi)
时间: 2025-01-05 21:36:03

Spark算子:RDD基本转换操作(1)–map、flatMap、distinct的相关文章

Spark中RDD的常用操作(Python)

弹性分布式数据集(RDD) Spark是以RDD概念为中心运行的.RDD是一个容错的.可以被并行操作的元素集合.创建一个RDD有两个方法:在你的驱动程序中并行化一个已经存在的集合:从外部存储系统中引用一个数据集.RDD的一大特性是分布式存储,分布式存储在最大的好处是可以让数据在不同工作节点并行存储,以便在需要数据时并行运算.弹性指其在节点存储时,既可以使用内存,也可已使用外存,为使用者进行大数据处理提供方便.除此之外,RDD的另一大特性是延迟计算,即一个完整的RDD运行任务被分为两部分:Tran

RDD的转换操作,分三种:单value,双value交互,(k,v)对

import org.apache.spark.rdd.RDDimport org.apache.spark.{Partitioner, SparkConf, SparkContext} object Transformation { def main(args: Array[String]): Unit = { val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Transfor

RDD的转换操作---RDD转换过程

1) union(otherRDD)RDD-->UnionRDD2) groupByKey(numPartitions)RDD-->ShuffledRDD-->MapPartitionsRDDgroupByKey() 只需要将 Key 相同的 records 聚合在一起,一个简单的 shuffle 过程就可以完成.3) reduceyByKey(func, numPartitions)reduceyByKey() 相当于传统的 MapReduceRDD-->MapPartition

(1)spark核心RDD的概念解析、创建、以及相关操作

spark核心之RDD 什么是RDD RDD指的是弹性分布式数据集(Resilient Distributed Dataset),它是spark计算的核心.尽管后面我们会使用DataFrame.Dataset进行编程,但是它们的底层依旧是依赖于RDD的.我们来解释一下RDD(Resilient Distributed Dataset)的这几个单词含义. 弹性:在计算上具有容错性,spark是一个计算框架,如果某一个节点挂了,可以自动进行计算之间血缘关系的跟踪 分布式:很好理解,hdfs上数据是跨

Apache Spark RDD之RDD的转换

RDD的转换 Spark会根据用户提交的计算逻辑中的RDD的转换和动作来生成RDD之间的依赖关系,同时这个计算链也就生成了逻辑上的DAG.接下来以“Word Count”为例,详细描述这个DAG生成的实现过程. Spark Scala版本的Word Count程序如下: 1: val file = spark.textFile("hdfs://...") 2: val counts = file.flatMap(line => line.split(" "))

spark 算子分析

别的不说先上官网: action 这些算子中需要注意: 1.reduce 和 reduceByKey 虽说都有reduce,但是一个是action级别,一个是transformation级别,速度上会有很大的差异 2.groupBy的使用如下 groupBy :将元素通过函数生成相应的 Key,数据就转化为 Key-Value 格式,之后将 Key 相同的元素分为一组. val a = sc.parallelize(1 to 9, 3) a.groupBy(x => { if (x % 2 ==

【Spark】RDD操作详解2——值型Transformation算子

处理数据类型为Value型的Transformation算子可以根据RDD变换算子的输入分区与输出分区关系分为以下几种类型: 1)输入分区与输出分区一对一型 2)输入分区与输出分区多对一型 3)输入分区与输出分区多对多型 4)输出分区为输入分区子集型 5)还有一种特殊的输入与输出分区一对一的算子类型:Cache型. Cache算子对RDD分区进行缓存 输入分区与输出分区一对一型 (1)map 将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素.源码中的map算子相当于初

【Spark】RDD操作详解3——键值型Transformation算子

Transformation处理的数据为Key-Value形式的算子大致可以分为:输入分区与输出分区一对一.聚集.连接操作. 输入分区与输出分区一对一 mapValues mapValues:针对(Key,Value)型数据中的Value进行Map操作,而不对Key进行处理. 方框代表RDD分区.a=>a+2代表只对( V1, 1)数据中的1进行加2操作,返回结果为3. 源码: /** * Pass each value in the key-value pair RDD through a m

【Spark】RDD操作具体解释2——值型Transformation算子

处理数据类型为Value型的Transformation算子能够依据RDD变换算子的输入分区与输出分区关系分为下面几种类型: 1)输入分区与输出分区一对一型 2)输入分区与输出分区多对一型 3)输入分区与输出分区多对多型 4)输出分区为输入分区子集型 5)另一种特殊的输入与输出分区一对一的算子类型:Cache型. Cache算子对RDD分区进行缓存 输入分区与输出分区一对一型 (1)map 将原来RDD的每一个数据项通过map中的用户自己定义函数f映射转变为一个新的元素. 源代码中的map算子相