Spark-Core RDD转换算子-Value型

1、 map(func)

作用: 返回一个新的 RDD, 该 RDD 是由原 RDD 的每个元素经过函数转换后的值而组成. 就是对 RDD 中的数据做转换.

创建一个包含1-10的的 RDD,然后将每个元素*2形成新的 RDD

scala > val rdd1 = sc.parallelize(1 to 10)
// 得到一个新的 RDD, 但是这个 RDD 中的元素并不是立即计算出来的
scala> val rdd2 = rdd1.map(_ * 2)

2、mapPartitions(func)

作用: 类似于map(func), 但是是独立在每个分区上运行.所以:Iterator<T> => Iterator<U>

假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。

scala > val rdd1 = sc.parallelize(1 to 10)
// 得到一个新的 RDD, 但是这个 RDD 中的元素并不是立即计算出来的
scala> val rdd2 = rdd1.mapPartitions(_ * 2)
scala> rdd2.collect
res9: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

3、mapPartitionsWithIndex(func)

作用: 和mapPartitions(func)类似. 但是会给func多提供一个Int值来表示分区的索引. 所以func的类型是:

(Int, Iterator<T>)=> Iterator<U>

scala> val rdd1 = sc.parallelize(Array(10,20,30,40,50,60))
scala> val rdd2 = rdd1.mapPartitionsWithIndex((index, items) => items.map((index, _)))
scala> res2.collect
res9: Array[(Int, Int)] = Array((0,10), (0,20), (0,30), (1,40), (1,50), (1,60))
(1)确定分区数
override def defaultParallelism(): Int =
   scheduler.conf.getInt("spark.default.parallelism", totalCores)
(2)对元素分区
// length: RDD 中数据的长度  numSlices: 分区数
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
 (0 until numSlices).iterator.map { i =>
   val start = ((i * length) / numSlices).toInt
   val end = (((i + 1) * length) / numSlices).toInt
   (start, end)
 }
}
seq match {
 case r: Range =>

 case nr: NumericRange[_] =>

 case _ =>
   val array = seq.toArray // To prevent O(n^2) operations for List etc
   positions(array.length, numSlices).map { case (start, end) =>
       array.slice(start, end).toSeq
   }.toSeq
}

5、map和mapPartitions的区别

(1) map():每次处理一条数据

(2) mapPartition():每次处理一个分区的数据,这个分区的数据处理完后,原 RDD 中该分区的数据才能释放,可能导致 OOM

6、flatMap(func)

作用: 类似于map,但是每一个输入元素可以被映射为 0个输出元素(所以func应该返回一个序列,而不是单一元素 T => TraversableOnce[U])

创建一个元素为 1-5 的RDD,运用 flatMap创建一个新的 RDD,新的 RDD 为原 RDD 每个元素的 平方和三次方 来组成 1,1,4,8,9,27..

scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5))
scala> val rdd2 =rdd1.flatMap(x => Array(x * x, x * x * x))
scala> rdd2.collect
res14: Array[Int] = Array(1, 1, 4, 8, 9, 27, 16, 64, 25, 125)

7、glom()

作用: 将每一个分区的元素合并成一个数组,形成新的 RDD 类型是RDD[Array[T]]

scala> var rdd1 = sc.parallelize(Array(10,20,30,40,50,60), 4)
scala> rdd1.glom.collect
res2: Array[Array[Int]] = Array(Array(10), Array(20, 30), Array(40), Array(50, 60))

8、groupBy(func)

作用:按照func的返回值进行分组.

func返回值作为 key, 对应的值放入一个迭代器中. 返回的 RDD: RDD[(K,Iterable[T])

创建一个 RDD,按照元素的奇偶性进行分组

scala> val rdd1 = sc.makeRDD(Array(1, 3, 4, 20, 4, 5, 8))
scala> val rdd2 = rdd1.groupBy(x => if(x % 2 == 1) "odd" else "even")
scala> rdd2.collect
res5: Array[(String, Iterable[Int])] = Array((even,CompactBuffer(4, 20, 4, 8)), (odd,CompactBuffer(1, 3, 5)))

9、filter(func)

作用: 过滤. 返回一个新的 RDD 是由func的返回值为true的那些元素组成

创建一个 RDD(由字符串组成),过滤出一个新 RDD(包含“xiao”子串)

scala> val rdd1 = sc.parallelize(Array("xiaoli", "laoli", "laowang", "xiaocang", "xiaojing", "xiaokong"))
scala> val rdd2 = rdd1.filter(_.contains("xiao"))
scala> rdd2.collect
res4: Array[String] = Array(xiaoli, xiaocang, xiaojing, xiaokong) 

10、sample(withReplacement,fraction,seed)

作用

(1)以指定的随机种子随机抽样出比例为fraction的数据,(抽取到的数量是: size * fraction). 需要注意的是得到的结果并不能保证准确的比例

(2)withReplacement表示是抽出的数据是否放回

? true为有放回的抽样

? false为无放回的抽样

? true放回表示数据有可能会被重复抽取到,是 true, 则fraction大于等于0就可以了

? false 则不可能重复抽取到. 如果是false, 则fraction必须是:[0,1]

(3)seed用于指定随机数生成器种子。 一般用默认的, 或者传入当前的时间戳

(4)不放回抽样

scala> val rdd1 = sc.parallelize(1 to 10)

scala> rdd1.sample(false, 0.5).collect
res15: Array[Int] = Array(1, 3, 4, 7)

(5)放回抽样

scala> val rdd1 = sc.parallelize(1 to 10)
scala> rdd1.sample(true, 2).collect
res25: Array[Int] = Array(1, 1, 2, 3, 3, 4, 4, 5, 5, 5, 5, 5, 6, 6, 7, 7, 8, 8, 9)

11、distinct([numTasks])

作用:对 RDD 中元素执行去重操作. 参数表示任务的数量.默认值和分区数保持一致.

scala> val rdd1 = sc.parallelize(Array(10,10,2,5,3,5,3,6,9,1))

scala> rdd1.distinct().collect
res29: Array[Int] = Array(6, 10, 2, 1, 3, 9, 5)

本质上是reduceByKey

  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
  }

12 、coalesce(numPartitions)

作用: 缩减分区数到指定的数量,用于大数据集过滤后,提高小数据集的执行效率。

scala> val rdd1 = sc.parallelize(0 to 100, 5)

scala> rdd1.partitions.length
res3: Int = 5

// 减少分区的数量至 2
scala>val rdd2= rdd1.coalesce(2)

scala> rdd2.partitions.length
res4: Int = 2
注意:

第二个参数表示是否shuffle, 如果不传或者传入的为false,** 则表示不进行shuffer, 此时分区数减少有效, 增加分区数无效.**

13、repartition(numPartitions)

作用: 根据新的分区数, 重新 shuffle 所有的数据, 这个操作总会通过网络(跨分区操作).

新的分区数相比以前可以多, 也可以少,一般用于增加分区,语义清晰

scala> val rdd1 = sc.parallelize(0 to 100, 5)

scala> val rdd2 = rdd1.repartition(3)

scala> res2.partitions.length
res4: Int = 3

scala> val rdd3 = rdd1.repartition(10)

scala> rdd3.partitions.length
res5: Int = 10
coalasce和repartition的区别

(1) coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。

(2)repartition实际上是调用的的coalesce,进行shuffle

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}

(3)如果是减少分区, 尽量避免shuffle

14、sortBy(func,[ascending],[numTasks])

作用: 使用func先对数据进行处理,按照处理后结果排序,默认为正序

scala> val rdd1 = sc.parallelize(Array(1,3,4,10,4,6,9,20,30,16))

scala> rdd1.sortBy(x => x).collect
res17: Array[Int] = Array(1, 3, 4, 4, 6, 9, 10, 16, 20, 30)

scala> rdd1.sortBy(x => x, true).collect
res18: Array[Int] = Array(1, 3, 4, 4, 6, 9, 10, 16, 20, 30)

// 不用正序
scala> rdd1.sortBy(x => x, false).collect
res19: Array[Int] = Array(30, 20, 16, 10, 9, 6, 4, 4, 3, 1)

15、pipe(command,[envVars])

作用: 管道,针对每个分区,把 RDD 中的每个数据通过管道传递给shell命令或脚本,返回输出的RDD。

一个分区执行一次这个命令. 如果只有一个分区, 则执行一次命令.

注意:

脚本要放在 worker 节点可以访问到的位置

(1)创建一个脚本文件pipe.sh

echo "hello"
while read line;do
    echo ">>>"$line
done

(2)创建只有 1 个分区的RDD

scala> val rdd1 = sc.parallelize(Array(10,20,30,40), 1)

scala> rdd1.pipe("./pipe.sh").collect
res1: Array[String] = Array(hello, >>>10, >>>20, >>>30, >>>40)

(3)创建有 2 个分区的 RDD

scala> val rdd1 = sc.parallelize(Array(10,20,30,40), 2)

scala> rdd1.pipe("./pipe.sh").collect
res2: Array[String] = Array(hello, >>>10, >>>20, hello, >>>30, >>>40)

每个分区执行一次脚本, 但是每个元素算是标准输入中的一行

原文地址:https://www.cnblogs.com/hyunbar/p/12045543.html

时间: 2024-10-16 17:08:09

Spark-Core RDD转换算子-Value型的相关文章

RDD转换算子与操作算子

一.RDD算子分类 1. RDD算子分类及概述 RDD的算子分为Transformation和Action两类,Transformation是延迟执行,Action是立即执行.Transformation和Action本质上的区别是,Transformation是从一个RDD到一个RDD,Action是从一个RDD到一个值.由下图可知,Spark的的转换算子与操作算子的执行流程.首先可以从HDFS中使用textFile方法将数据加载到内存,然后经过转换算子对RDD进行转换,最后再通过操作算子Ac

Spark中将RDD转换成DataFrame的两种方法

总结下Spark中将RDD转换成DataFrame的两种方法, 代码如下: 方法一: 使用createDataFrame方法 ```java //StructType and convert RDD to DataFrame val schema = StructType( Seq( StructField("name",StringType,true) ,StructField("age",IntegerType,true) ) ) val rowRDD = sp

Spark-Core RDD转换算子-双Value型交互

1.union(otherDataSet) 作用:求并集. 对源 RDD 和参数 RDD 求并集后返回一个新的 RDD scala> val rdd1 = sc.parallelize(1 to 6) scala> val rdd2 = sc.parallelize(4 to 10) scala> val rdd3 = rdd1.union(rdd2) scala> rdd3.collect res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 4,

Spark中RDD转换成DataFrame的两种方式(分别用Java和Scala实现)

一:准备数据源     在项目下新建一个student.txt文件,里面的内容为: 1,zhangsan,20 2,lisi,21 3,wanger,19 4,fangliu,18 二:实现 Java版: 1.首先新建一个student的Bean对象,实现序列化和toString()方法,具体代码如下: import java.io.Serializable; @SuppressWarnings("serial") public class Student implements Ser

急中生智~利用Spark core完成&quot;ETL&quot;!

背景介绍:今天接到老板分配的一个小任务:开发一个程序,实现从数据库中抽取数据并生成报表的功能(这是我们数据库审计平台准备上线的一个功能).既然是要生成报表,那么首先得有数据,于是便想到从该业务系统的测试环境抽取业务表的数据,然后装载至自己云主机上的Mysql中.本来以为只要"select ...into outfile"和"load data infile..."两个命令就可以搞定的,可是还是出了意外.测试环境导出的txt文件在云主机load时,报了"Ro

大数据技术之_27_电商平台数据分析项目_02_预备知识 + Scala + Spark Core + Spark SQL + Spark Streaming + Java 对象池

第0章 预备知识0.1 Scala0.1.1 Scala 操作符0.1.2 拉链操作0.2 Spark Core0.2.1 Spark RDD 持久化0.2.2 Spark 共享变量0.3 Spark SQL0.3.1 RDD.DataFrame 与 DataSet0.3.2 DataSet 与 RDD 互操作0.3.3 RDD.DataFrame 与 DataSet 之间的转换0.3.4 用户自定义聚合函数(UDAF)0.3.5 开窗函数0.4 Spark Streaming0.4.1 Dst

【Spark】RDD操作详解2——值型Transformation算子

处理数据类型为Value型的Transformation算子可以根据RDD变换算子的输入分区与输出分区关系分为以下几种类型: 1)输入分区与输出分区一对一型 2)输入分区与输出分区多对一型 3)输入分区与输出分区多对多型 4)输出分区为输入分区子集型 5)还有一种特殊的输入与输出分区一对一的算子类型:Cache型. Cache算子对RDD分区进行缓存 输入分区与输出分区一对一型 (1)map 将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素.源码中的map算子相当于初

【Spark】RDD操作具体解释2——值型Transformation算子

处理数据类型为Value型的Transformation算子能够依据RDD变换算子的输入分区与输出分区关系分为下面几种类型: 1)输入分区与输出分区一对一型 2)输入分区与输出分区多对一型 3)输入分区与输出分区多对多型 4)输出分区为输入分区子集型 5)另一种特殊的输入与输出分区一对一的算子类型:Cache型. Cache算子对RDD分区进行缓存 输入分区与输出分区一对一型 (1)map 将原来RDD的每一个数据项通过map中的用户自己定义函数f映射转变为一个新的元素. 源代码中的map算子相

大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Core 实例练习

第1章 RDD 概念1.1 RDD 为什么会产生1.2 RDD 概述1.2.1 什么是 RDD1.2.2 RDD 的属性1.3 RDD 弹性1.4 RDD 特点1.4.1 分区1.4.2 只读1.4.3 依赖1.4.4 缓存1.4.5 CheckPoint第2章 RDD 编程2.1 RDD 编程模型2.2 RDD 创建2.2.1 由一个已经存在的 Scala 集合创建,即集合并行化(测试用)2.2.2 由外部存储系统的数据集创建(开发用)2.3 RDD 编程2.3.1 Transformatio