Spark函数详解系列--RDD基本转换

摘要:

  RDD:弹性分布式数据集,是一种特殊集合 ‚ 支持多种来源 ‚ 有容错机制 ‚ 可以被缓存 ‚ 支持并行操作,一个RDD代表一个分区里的数据集

  RDD有两种操作算子:

         Transformation(转换):Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住了数据集的逻辑操作

         Ation(执行):触发Spark作业的运行,真正触发转换算子的计算

本系列主要讲解Spark中常用的函数操作:

         1.RDD基本转换

         2.键-值RDD转换

         3.Action操作篇

1.map(func):数据集中的每个元素经过用户自定义的函数转换形成一个新的RDD,新的RDD叫MappedRDD

(例1)

object Map {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("map")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(1 to 10)  //创建RDD
    val map = rdd.map(_*2)             //对RDD中的每个元素都乘于2
    map.foreach(x => print(x+" "))
    sc.stop()
  }
}

输出:

2 4 6 8 10 12 14 16 18 20

(RDD依赖图:红色块表示一个RDD区,黑色块表示该分区集合,下同)

2.flatMap(func):与map类似,但每个元素输入项都可以被映射到0个或多个的输出项,最终将结果”扁平化“后输出

(例2)

 //...省略sc
    val rdd = sc.parallelize(1 to 5)
    val fm = rdd.flatMap(x => (1 to x)).collect()
    fm.foreach( x => print(x + " "))

输出:

1 1 2 1 2 3 1 2 3 4 1 2 3 4 5

如果是map函数其输出如下:

Range(1) Range(1, 2) Range(1, 2, 3) Range(1, 2, 3, 4) Range(1, 2, 3, 4, 5)

(RDD依赖图)

3.mapPartitions(func):类似与map,map作用于每个分区的每个元素,但mapPartitions作用于每个分区工

func的类型:Iterator[T] => Iterator[U]

假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,当在映射的过程中不断的创建对象时就可以使用mapPartitions比map的效率要高很多,比如当向数据库写入数据时,如果使用map就需要为每个元素创建connection对象,但使用mapPartitions的话就需要为每个分区创建connetcion对象

(例3):输出有女性的名字:

object MapPartitions {
//定义函数
  def partitionsFun(/*index : Int,*/iter : Iterator[(String,String)]) : Iterator[String] = {
    var woman = List[String]()
    while (iter.hasNext){
      val next = iter.next()
      next match {
        case (_,"female") => woman = /*"["+index+"]"+*/next._1 :: woman
        case _ =>
      }
    }
    return  woman.iterator
  }

  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("mappartitions")
    val sc = new SparkContext(conf)
    val l = List(("kpop","female"),("zorro","male"),("mobin","male"),("lucy","female"))
    val rdd = sc.parallelize(l,2)
    val mp = rdd.mapPartitions(partitionsFun)
    /*val mp = rdd.mapPartitionsWithIndex(partitionsFun)*/
    mp.collect.foreach(x => (print(x +" ")))   //将分区中的元素转换成Aarray再输出
  }
}

输出:

kpop lucy

其实这个效果可以用一条语句完成

val mp = rdd.mapPartitions(x => x.filter(_._2 == "female")).map(x => x._1) 

之所以不那么做是为了演示函数的定义

  (RDD依赖图)

4.mapPartitionsWithIndex(func):与mapPartitions类似,不同的时函数多了个分区索引的参数

func类型:(Int, Iterator[T]) => Iterator[U]

(例4):将例3橙色的注释部分去掉即是

输出:(带了分区索引)

[0]kpop [1]lucy

5.union(ortherDataset):将两个RDD中的数据集进行合并,最终返回两个RDD的并集,若RDD中存在相同的元素也不会去重

//省略sc
   val rdd1 = sc.parallelize(1 to 3)
   val rdd2 = sc.parallelize(3 to 5)
   val unionRDD = rdd1.union(rdd2)
   unionRDD.collect.foreach(x => print(x + " "))
   sc.stop 

输出:

1 2 3 3 4 5

  

6.intersection(otherDataset):返回两个RDD的交集

//省略sc
val rdd1 = sc.parallelize(1 to 3)
val rdd2 = sc.parallelize(3 to 5)
val unionRDD = rdd1.intersection(rdd2)
unionRDD.collect.foreach(x => print(x + " "))
sc.stop 

输出:

3 4

7.distinct([numTasks]):对RDD中的元素进行去重

//省略sc
val list = List(1,1,2,5,2,9,6,1)
val distinctRDD = sc.parallelize(list)
val unionRDD = distinctRDD.distinct()
unionRDD.collect.foreach(x => print(x + " "))  

输出:

1 6 9 5 2

8.cartesian(otherDataset):对两个RDD中的所有元素进行笛卡尔积操作

 //省略
val rdd1 = sc.parallelize(1 to 3)
val rdd2 = sc.parallelize(2 to 5)
val cartesianRDD = rdd1.cartesian(rdd2)
cartesianRDD.foreach(x => println(x + " ")) 

输出:

(1,2)
(1,3)
(1,4)
(1,5)
(2,2)
(2,3)
(2,4)
(2,5)
(3,2)
(3,3)
(3,4)
(3,5)

(RDD依赖图)

 

9.coalesce(numPartitions,shuffle):对RDD的分区进行重新分区,shuffle默认值为false,当shuffle=false时,不能增加分区数

目,但不会报错,只是分区个数还是原来的

(例9:)shuffle=false

//省略
val rdd = sc.parallelize(1 to 16,4)
val coalesceRDD = rdd.coalesce(3) //当suffle的值为false时,不能增加分区数(即分区数不能从5->7)
println("重新分区后的分区个数:"+coalesceRDD.partitions.size) 

输出:

重新分区后的分区个数:3
//分区后的数据集
List(1, 2, 3, 4)
List(5, 6, 7, 8)
List(9, 10, 11, 12, 13, 14, 15, 16) 

 

(例9.1:)shuffle=true

 //...省略
val rdd = sc.parallelize(1 to 16,4)
val coalesceRDD = rdd.coalesce(7,true)
println("重新分区后的分区个数:"+coalesceRDD.partitions.size)
println("RDD依赖关系:"+coalesceRDD.toDebugString)  

输出:

重新分区后的分区个数:5
RDD依赖关系:(5) MapPartitionsRDD[4] at coalesce at Coalesce.scala:14 []
| CoalescedRDD[3] at coalesce at Coalesce.scala:14 []
| ShuffledRDD[2] at coalesce at Coalesce.scala:14 []
+-(4) MapPartitionsRDD[1] at coalesce at Coalesce.scala:14 []
| ParallelCollectionRDD[0] at parallelize at Coalesce.scala:13 []
//分区后的数据集
List(10, 13)
List(1, 5, 11, 14)
List(2, 6, 12, 15)
List(3, 7, 16)
List(4, 8, 9) 

(RDD依赖图:coalesce(3,flase))

 

(RDD依赖图:coalesce(3,true))

10.repartition(numPartition):是函数coalesce(numPartition,true)的实现,效果和例9.1的coalesce(numPartition,true)的一样

11.glom():将RDD的每个分区中的类型为T的元素转换换数组Array[T]

 

//省略
val rdd = sc.parallelize(1 to 16,4)
val glomRDD = rdd.glom() //RDD[Array[T]]
glomRDD.foreach(rdd => println(rdd.getClass.getSimpleName))
sc.stop 

输出:

int[] //说明RDD中的元素被转换成数组Array[Int]

12.randomSplit(weight:Array[Double],seed):根据weight权重值将一个RDD划分成多个RDD,权重越高划分得到的元素较多的几率就越大

//省略sc
val rdd = sc.parallelize(1 to 10)
val randomSplitRDD = rdd.randomSplit(Array(1.0,2.0,7.0))
randomSplitRDD(0).foreach(x => print(x +" "))
randomSplitRDD(1).foreach(x => print(x +" "))
randomSplitRDD(2).foreach(x => print(x +" "))
sc.stop 

输出:

2 4
3 8 9
1 5 6 7 10
时间: 2024-10-24 04:22:08

Spark函数详解系列--RDD基本转换的相关文章

Spark函数详解系列之RDD基本转换

摘要: RDD:弹性分布式数据集,是一种特殊集合 ? 支持多种来源 ? 有容错机制 ? 可以被缓存 ? 支持并行操作. RDD有两种操作算子: Transformation(转换):Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住了数据集的逻辑操作 Action(执行):触发Spark作业的运行,真正触发转换算子的计算 本节所讲函数 1.map(func) 2.flatMap(func) 3.mapPartitions(func) 4.ma

ThinkPHP函数详解系列

为了能方便大家学习和掌握,在这里汇总下ThinkPHP中的经典函数用法 A 函数:实例化控制器R 函数:直接调用控制器的操作方法C 函数:设置和获取配置参数L 函数:设置和获取语言变量D 函数:实例化模型M 函数:实例化模型(无需定义模型类)N 函数:计数器G 函数:调试统计U 函数:URL地址生成I 函数:安全获取系统输入变量S 函数:缓存设置和存取F 函数:快速缓存设置和存取session函数:Session操作cookie函数:Cookie操作import函数:类库导入

Android总结篇系列:Activity中几个主要函数详解

专注Android领域开发. 仰望星空,同时需要脚踏实地. ——好记性不如烂博客 Android总结篇系列:Activity中几个主要函数详解 Activity作为Android系统中四大基本组件之一,包含大量的与其他的各大组件.intent.widget以及系统各项服务等之间的交互的函数.在此,本文主要选取实际项目开发中常用的,但完全理解又需要有一定深入了解的几个函数进行讲解,后续本文会根据需要不断更新. 1. startActivityForResult / onActivityResult

C++ list容器系列功能函数详解

C++ list函数详解 首先说下eclipse工具下怎样debug:方法:你先要设置好断点,然后以Debug方式启动你的应用程序,不要用run的方式,当程序运行到你的断点位置时就会停住,也会提示你进入到Debug视图方式操作,F5是进入到函数或语句块的内部,F6是单步运行,一行一行的走,F7可以跳当前监听函数或语句块F8 会直接跳到下个断点. 下面进入主题: 一.构造.析构函数.= 运算符 1.功能:声明list容器.4种方式 list<int> first;                

Apache Spark源码走读之13 -- hiveql on spark实现详解

欢迎转载,转载请注明出处,徽沪一郎 概要 在新近发布的spark 1.0中新加了sql的模块,更为引人注意的是对hive中的hiveql也提供了良好的支持,作为一个源码分析控,了解一下spark是如何完成对hql的支持是一件非常有趣的事情. Hive简介 Hive的由来 以下部分摘自Hadoop definite guide中的Hive一章 "Hive由Facebook出品,其设计之初目的是让精通SQL技能的分析师能够对Facebook存放在HDFS上的大规模数据集进行分析和查询. Hive大大

可变参数函数详解

可变参数函数又称参数个数可变函数(本文也简称变参函数),即函数参数数目可变.原型声明格式为: type VarArgFunc(type FixedArg1, type FixedArg2, -); 其中,参数可分为两部分:数目确定的固定参数和数目可变的可选参数.函数至少需要一个固定参数,其声明与普通函数参数相同:可选参数由于数目不定(0个或以上),声明时用"-"表示("-"用作参数占位符).固定参数和可选参数共同构成可变参数函数的参数列表. 由于参数数目不定,使用可

Android高效率编码-第三方SDK详解系列(三)——JPush推送牵扯出来的江湖恩怨,XMPP实现推送,自定义客户端推送

Android高效率编码-第三方SDK详解系列(三)--JPush推送牵扯出来的江湖恩怨,XMPP实现推送,自定义客户端推送 很久没有更新第三方SDK这个系列了,所以更新一下这几天工作中使用到的推送,写这个系列真的很要命,你要去把他们的API文档大致的翻阅一遍,而且各种功能都实现一遍,解决各种bug各种坑,不得不说,极光推送真坑,大家使用还是要慎重,我们看一下极光推送的官网 https://www.jpush.cn/common/ 推送比较使用,很多软件有需要,所以在这个点拿出来多讲讲,我们本节

【转载】3D/2D中的D3DXMatrixPerspectiveFovLH和D3DXMatrixOrthoLH投影函数详解

原文:3D/2D中的D3DXMatrixPerspectiveFovLH和D3DXMatrixOrthoLH投影函数详解 3D中z值会影响屏幕坐标系到世界坐标系之间的转换,2D中Z值不会产生影响(而只是屏幕宽高比会产生影响,z值只对深度剔除产生影响).所以U3D中如果用2D摄像机那么屏幕坐标和世界坐标之间的转换需要用指定的2D摄像机才行,如果用主3D摄像机那么UI转换会产生计算结果异常. 一.D3DXMatrixPerspectiveFovLH函数 作用:Builds a left-handed

php学习之道:php中iconv函数 详解

iconv函数库能够完成各种字符集间的转换,是php编程中不可缺少的基础函数库. 用法如下: $string = "亲爱的朋友欢迎访问胡文芳的博客,希望给您带来一点点的帮助!"; iconv("utf8","gbk",$string)//将字符串string  编码由utf8转变成gbk: 扩展如下: echo $str= '你好,欢迎访问胡文芳的博客,该博客记录一个程序员的成长过程!'; echo ' '; echo iconv('GB2312