Spark常用函数讲解--Action操作

摘要:

RDD:弹性分布式数据集,是一种特殊集合 ‚ 支持多种来源 ‚ 有容错机制 ‚ 可以被缓存 ‚ 支持并行操作,一个RDD代表一个分区里的数据集
RDD有两种操作算子:

        Transformation(转换):Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住       了数据集的逻辑操作
         Ation(执行):触发Spark作业的运行,真正触发转换算子的计算
 
本系列主要讲解Spark中常用的函数操作:
         1.RDD基本转换
         2.键-值RDD转换
         3.Action操作篇

 

1.reduce(func):通过函数func先聚集各分区的数据集,再聚集分区之间的数据,func接收两个参数,返回一个新值,新值再做为参数继续传递给函数func,直到最后一个元素

 

2.collect():以数据的形式返回数据集中的所有元素给Driver程序,为防止Driver程序内存溢出,一般要控制返回的数据集大小

 

3.count():返回数据集元素个数

 

4.first():返回数据集的第一个元素

 

5.take(n):以数组的形式返回数据集上的前n个元素

 

6.top(n):按默认或者指定的排序规则返回前n个元素,默认按降序输出

 

7.takeOrdered(n,[ordering]): 按自然顺序或者指定的排序规则返回前n个元素

例1:

def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("reduce")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(1 to 10,2)
    val reduceRDD = rdd.reduce(_ + _)
    val reduceRDD1 = rdd.reduce(_ - _) //如果分区数据为1结果为 -53
    val countRDD = rdd.count()
    val firstRDD = rdd.first()
    val takeRDD = rdd.take(5)    //输出前个元素
    val topRDD = rdd.top(3)      //从高到底输出前三个元素
    val takeOrderedRDD = rdd.takeOrdered(3)    //按自然顺序从底到高输出前三个元素

    println("func +: "+reduceRDD)
    println("func -: "+reduceRDD1)
    println("count: "+countRDD)
    println("first: "+firstRDD)
    println("take:")
    takeRDD.foreach(x => print(x +" "))
    println("\ntop:")
    topRDD.foreach(x => print(x +" "))
    println("\ntakeOrdered:")
    takeOrderedRDD.foreach(x => print(x +" "))
    sc.stop
  }

输出:

func +: 55
func -: 15 //如果分区数据为1结果为 -53
count: 10
first: 1
take:
1 2 3 4 5
top:
10 9 8
takeOrdered:
1 2 3

(RDD依赖图:红色块表示一个RDD区,黑色块表示该分区集合,下同)

(RDD依赖图)

8.countByKey():作用于K-V类型的RDD上,统计每个key的个数,返回(K,K的个数)

9.collectAsMap():作用于K-V类型的RDD上,作用与collect不同的是collectAsMap函数不包含重复的key,对于重复的key。后面的元素覆盖前面的元素

10.lookup(k):作用于K-V类型的RDD上,返回指定K的所有V值

例2:

 def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("KVFunc")
    val sc = new SparkContext(conf)
    val arr = List(("A", 1), ("B", 2), ("A", 2), ("B", 3))
    val rdd = sc.parallelize(arr,2)
    val countByKeyRDD = rdd.countByKey()
    val collectAsMapRDD = rdd.collectAsMap()

    println("countByKey:")
    countByKeyRDD.foreach(print)

    println("\ncollectAsMap:")
    collectAsMapRDD.foreach(print)
    sc.stop
  }

输出:

countByKey:
(B,2)(A,2)
collectAsMap:
(A,2)(B,3)

(RDD依赖图)

11.aggregate(zeroValue:U)(seqOp:(U,T) => U,comOp(U,U) => U):

seqOp函数将每个分区的数据聚合成类型为U的值,comOp函数将各分区的U类型数据聚合起来得到类型为U的值

def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("Fold")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(List(1,2,3,4),2)
    val aggregateRDD = rdd.aggregate(2)(_+_,_ * _)
    println(aggregateRDD)
    sc.stop
  }

输出:

90

步骤1:分区1:zeroValue+1+2=5   分区2:zeroValue+3+4=9

步骤2:zeroValue*分区1的结果*分区2的结果=90

(RDD依赖图)

12.fold(zeroValue:T)(op:(T,T) => T):通过op函数聚合各分区中的元素及合并各分区的元素,op函数需要两个参数,在开始时第一个传入的参数为zeroValue,T为RDD数据集的数据类型,,其作用相当于SeqOp和comOp函数都相同的aggregate函数

例3

def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("Fold")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)), 2)
    val foldRDD = rdd.fold(("d", 0))((val1, val2) => { if (val1._2 >= val2._2) val1 else val2
    })
    println(foldRDD)
  }

输出:

c,5

其过程如下:

1.开始时将(“d”,0)作为op函数的第一个参数传入,将Array中和第一个元素("a",1)作为op函数的第二个参数传入,并比较value的值,返回value值较大的元素

2.将上一步返回的元素又作为op函数的第一个参数传入,Array的下一个元素作为op函数的第二个参数传入,比较大小

3.重复第2步骤

每个分区的数据集都会经过以上三步后汇聚后再重复以上三步得出最大值的那个元素,对于其他op函数也类似,只不过函数里的处理数据的方式不同而已

(RDD依赖图)

13.saveAsFile(path:String):将最终的结果数据保存到指定的HDFS目录中

14.saveAsSequenceFile(path:String):将最终的结果数据以sequence的格式保存到指定的HDFS目录中

例子源码地址:https://github.com/Mobin-F/SparkExample/tree/master/src/main/scala/com/mobin/SparkRDDFun/TransFormation/Action

时间: 2024-10-25 12:39:56

Spark常用函数讲解--Action操作的相关文章

Spark常用函数讲解--键值RDD转换

摘要: RDD:弹性分布式数据集,是一种特殊集合 ‚ 支持多种来源 ‚ 有容错机制 ‚ 可以被缓存 ‚ 支持并行操作,一个RDD代表一个分区里的数据集RDD有两种操作算子:         Transformation(转换):Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住       了数据集的逻辑操作         Ation(执行):触发Spark作业的运行,真正触发转换算子的计算 本系列主要讲解Spark中常用的函数操作:   

SQL Server 常用函数和日期操作

一.字符转换函数 1.ASCII() 返回字符表达式最左端字符的ASCII 码值. 在ASCII()函数中,纯数字的字符串可不用‘’括起来,但含其它字符的字符串必须用‘’括起来使用,否则会出错. print ASCII('123456')    =>    49 print ASCII(123456)     =>    49 print ASCII('abc')         =>    97 2.CHAR() 将ASCII 码转换为字符.如果没有输入0 ~ 255 之间的ASCII

PHP中的MYSQL常用函数(php下操作数据库必备)

1.mysql_connect()-建立数据库连接 格式: resource mysql_connect([string hostname [:port] [:/path/to/socket] [, string username] [, string password]]) 例: $conn = @mysql_connect("localhost", "username", "password") or die("不能连接到Mysql

常用函数-Linux文件操作

/************************************************************************ 函数功能:寻找文件夹下的某格式文件 std::vector<string> &filelist -- 文件名list const char *basePath -- 文件路径 string format -- 文件格式 如 .xml ******************************************************

Opencv常用函数讲解

1.approxPolyDP(Mat(ps), poly, 5, true);//根据点集,拟合出多边形 2.fillConvexPoly(mask, Mat(ps), Scalar(255));根据点集,绘制并填充出多边形 3.fillPoly(mask, Mat(ps), Scalar(255)); ;根据点集,绘制出多边形 原文地址:https://www.cnblogs.com/raorao1994/p/8542464.html

spark core源码分析9 从简单例子看action操作

上一节举例讲解了transformation操作,这一节以reduce为例讲解action操作 首先看submitJob方法,它将我们reduce中写的处理函数随JobSubmitted消息传递出去,因为每个分区都需要调用它进行计算: 而resultHandler是指最后合并的方法,在每个task完成后,需要调用resultHandler将最终结果合并.所以它不需要随JobSubmitted消息传递,而是保存在JobWaiter中 /** * Submit a job to the job sc

spark transformation与action操作函数

一.Transformation map(func) 返回一个新的分布式数据集,由每个原元素经过函数处理后的新元素组成 filter(func) 返回一个新的数据集,经过fun函数处理后返回值为true的原元素组成 flatMap(func) 类似于map,但每个输入元素会被映射为0个或多个输出元素 mapPartitions(func)  类似于map,对RDD的每个分区起作用 intersection(otherDataset) 求两个RDD的交集 distinct([numTasks])

spark RDD transformation与action函数巩固 (未完)

1.创建RDD val lines = sc.parallelize(List("pandas","i like pandas")) 2.加载本地文件到RDD val linesRDD = sc.textFile("yangsy.txt") 3.过滤 filter 需要注意的是 filter并不会在原有RDD上过滤,而是根据filter的内容重新创建了一个RDD val spark = linesRDD.filter(line => lin

Spark中的各种action算子操作(java版)

在我看来,Spark编程中的action算子的作用就像一个触发器,用来触发之前的transformation算子.transformation操作具有懒加载的特性,你定义完操作之后并不会立即加载,只有当某个action的算子执行之后,前面所有的transformation算子才会全部执行.常用的action算子如下代码所列:(java版) package cn.spark.study.core; import java.util.Arrays; import java.util.List; im