轻松理解 Spark 的 aggregate 方法

2019-04-20

关键字: Spark 的 agrregate 作用、Scala 的 aggregate 是什么

Spark 编程中的 aggregate 方法还是比较常用的。本篇文章站在初学者的角度以大白话的形式来讲解一下 aggregate 方法。



aggregate 方法是一个聚合函数,接受多个输入,并按照一定的规则运算以后输出一个结果值。

aggregate 在哪

aggregate 方法是 Spark 编程模型 RDD 类( org.apache.spark.RDD ) 中定义的一个公有方法。它的方法声明如下

1   def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
2
3     // ...
4
5   }

aggregate 的参数是什么意思

然后我们一块一块来学习这个方法的声明。其实这小节讲的,都是 Scala 的语法知识。

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {

首先看到的是 “泛型” 声明。懂 Java 的同学直接把这个 " [U: ClassTag] " 理解成是一个泛型声明就好了。如果您不是很熟悉 Java 语言,那我们只需要知道这个 U 表示我们的 aggregate 方法只能接受某一种类型的输入值,至于到底是哪种类型,要看您在具体调用的时候给了什么类型。

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {

然后我们来看看 aggregate 的参数列表。明显这个 aggregate 方法是一个柯里化函数。柯里化的知识不在本篇文章讨论的范围之内。如果您还不了解柯里化的概念,那在这里简单地理解为是通过多个圆括号来接受多个输入参数就可以了

然后我们来看看第 1 部分,即上面蓝色加粗的 " (zeroValue: U) " 。这个表示它接受一个任意类型的输入参数,变量名为 zeroValue 。这个值就是初值,至于这个初值的作用,姑且不用理会,等到下一小节通过实例来讲解会更明了,在这里只需要记住它是一个 “只使用一次” 的值就好了。

第 2 部分,我们还可以再把它拆分一下,因为它里面其实有两个参数。笔者认为 Scala 语法在定义多个参数时,辨识度比较弱,不睁大眼睛仔细看,很难确定它到底有几个参数。

首先是第 1 个参数 " seqOp: (U, T) => U " 它是一个函数类型,以一个输入为任意两个类型 U, T 而输出为 U 类型的函数作为参数。这个函数会先被执行。这个参数函数的作用是为每一个分片( slice )中的数据遍历应用一次函数。换句话说就是假设我们的输入数据集( RDD )有 1 个分片,则只有一个 seqOp 函数在运行,假设有 3 个分片,则有三个 seqOp 函数在运行。可能有点难以理解,不过没关系,到后面结合实例就很容易理解了。

另一个参数 " combOp: (U, U) => U " 接受的也是一个函数类型,以输入为任意类型的两个输入参数而输出为一个与输入同类型的值的函数作为参数。这个函数会在上面那个函数执行以后再执行。这个参数函数的输入数据来自于第一个参数函数的输出结果,这个函数仅会执行 1 次,它是用来最终聚合结果用的。同样这里搞不懂没关系,下一小节的实例部分保证让您明白。

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {

最后是上面这个红色加粗的 " : U " 它是 aggregate 方法的返回值类型,也是泛型表示。

对了,最后还有一个 " withScope ",这个就不介绍了,因为笔者也不知道它是干嘛的,哈哈哈哈。反正对我们理解这个方法也没什么影响。

aggregate 正确的使用姿势

我们直接在 spark-shell 中来演示实例了。这里以两个小例子来演示,一个是不带分片的 RDD ,另一个则是带 3 个分片的 RDD 。

首先我们来创建一个 RDD

scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24

scala> rdd1 collect
warning: there was one feature warning; re-run with -feature for details
res24: Array[Int] = Array(1, 2, 3, 4, 5)

这个 RDD 仅有 1 个分片,包含 5 个数据: 1, 2, 3, 4, 5 。

然后我们来应用一下 aggregate 方法。

哦,不对,在使用 aggregate 之前,我们还是先定义两个要给 aggregate 当作输入参数的函数吧。

scala> :paste
// Entering paste mode (ctrl-D to finish)

def pfun1(p1: Int, p2: Int): Int = {

    p1 * p2

}

// Exiting paste mode, now interpreting.

pfun1: (p1: Int, p2: Int)Int

scala> 

首先来定义第 1 个函数,即等下要被当成 seqOp 的形参使用的函数。在上一小节我们知道 seqOp 函数是一个输入类型为 U, T 类型而输出为 U 类型的函数。但是在这里,因为我们的 RDD 只包含一个 Int 类型数据,所以这里的 seqOp 的两个输入参数都是 Int 类型的,这是没毛病的哦!然后这个函数的返回类型也为 Int 。我们这个函数的作用就是将输入的参数 p1 , p2 求积以后返回。

scala> :paste
// Entering paste mode (ctrl-D to finish)

def pfun2(p3: Int, p4: Int): Int = {

    p3 + p4

}

// Exiting paste mode, now interpreting.

pfun2: (p3: Int, p4: Int)Int

scala>

接着是第 2 个函数。就不再解释什么了。

然后终于可以开始应用我们的 aggregate 方法了。

scala> rdd1.aggregate(3)(pfun1, pfun2)
res25: Int = 363

scala> 

输出结果是 363 !这个结果是怎么算出来的呢?

首先我们的 zeroValue 即初值是 3 。然后通过上面小节的介绍,我们知道首先会应用 pfun1 函数,因为我们这个 RDD 只有 1 个分片,所以整个运算过程只会有一次 pfun1 函数调用。它的计算过程如下:

首先用初值 3 作为 pfun1 的参数 p1 ,然后再用 RDD 中的第 1 个值,即 1 作为 pfun1 的参数 p2 。由此我们可以得到第一个计算值为 3 * 1 = 3 。接着这个结果 3 被当成 p1 参数传入,RDD 中的第 2 个值即 2 被当成 p2 传入,由此得到第二个计算结果为 3 * 2 = 6 。以此类推,整个 pfun1 函数执行完成以后,得到的结果是  3 * 1 * 2 * 3 * 4 * 5 = 360 。这个 pfun1 的应用过程有点像是 “在 RDD 中滑动计算” 。

在 aggregate 方法的第 1 个参数函数 pfun1 执行完毕以后,我们得到了结果值 360 。于是,这个时候就要开始执行第 2 个参数函数 pfun2 了。

pfun2 的执行过程与 pfun1 是差不多的,同样会将 zeroValue 作为第一次运算的参数传入,在这里即是将 zeroValue 即 3 当成 p3 参数传入,然后是将 pfun1 的结果 360 当成 p4 参数传入,由此得到计算结果为 363 。因为 pfun1 仅有一个结果值,所以整个 aggregate 过程就计算完毕了,最终的结果值就是 363

怎么样?相信您已经完全明白 aggregate 方法的的作用与用法了吧。下面再贴一个有多个分片的 RDD 的示例。

scala> val rdd2 = sc.makeRDD(1 to 10, 3)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at <console>:24

scala> rdd2.getNumPartitions
res26: Int = 3

scala> rdd2.foreachPartition(myprint)
1 , 2 , 3 ,
4 , 5 , 6 ,
7 , 8 , 9 , 10 , 

这里定义了一个拥有 3 个分片的 RDD 。然后 aggregate 的两个函数参数仍然是使用上面定义的 pfun1 与 pfun2 。

scala> rdd2.aggregate(2)(pfun1, pfun2)
res29: Int = 10334

结果是 10334 。怎么来的呢?

因为前面小节有提到 seqOp 函数,即这里的 pfun1 函数会分别在 RDD 的每个分片中应用一次,所以这里 pfun1 的计算过程为

2 * 1 * 2 * 3       = 12
2 * 4 * 5 * 6       = 240
2 * 7 * 8 * 9 * 10  = 10080

标橙的为 zeroValue 。

在这里 pfun1 的输出结果有 3 个值。然后就来应用 combOp 即这里的 pfun2

2 + 12 + 240 + 10080  = 10334

所以,结果就是 10334 咯!



轻松理解 Spark 的 aggregate 方法

原文地址:https://www.cnblogs.com/chorm590/p/spark_201904201159.html

时间: 2024-10-03 23:29:15

轻松理解 Spark 的 aggregate 方法的相关文章

《深入理解SPARK:核心思想与源码分析》——SparkContext的初始化(中)

<深入理解Spark:核心思想与源码分析>一书前言的内容请看链接<深入理解SPARK:核心思想与源码分析>一书正式出版上市 <深入理解Spark:核心思想与源码分析>一书第一章的内容请看链接<第1章 环境准备> <深入理解Spark:核心思想与源码分析>一书第二章的内容请看链接<第2章 SPARK设计理念与基本架构> 由于本书的第3章内容较多,所以打算分别开辟三篇随笔分别展现. <深入理解Spark:核心思想与源码分析>一

(版本定制)第3课:从作业和容错的角度来理解Spark Streaming

本节课内容: 1.Spark Streaming Job架构和运行机制 2.Spark Streaming Job容错架构和运行机制 理解Spark Streaming Job整个架构和运行机制对于精通Spark Streaming来说是至关重要的. 一.首先我们运行以下程序,然后通过这个程序的运行过程进一步加深对Spark Streaming流处理Job的执行过程的理解,代码如下: object OnlineForeachRDD2DB { def main(args: Array[String

轻松理解UML用例图时序图类图的教程

摘自https://zhuanlan.zhihu.com/p/29874146 写在前面 当你老大扔给你这样的图,或者你需要完成某些功能而去看文档的时候发现以下类似这样的图会不会不(一)知(脸)所(懵)措(逼): (图片来至wikipedia) (图片来至微信内网页支付时序图) (图片来至wikipedia) 如果你看了都不会一脸懵逼,那么可以出门左转啦,这篇文章就是来说明这些图的意思,让你在工作交流中,或者在看一些文档,或者看我的一些关于设计模式的文章,甚至架构建模中,都能轻松理解,毫无压力!

Spark运行调试方法与学习资源汇总

最近,在学习和使用Spark的过程中,遇到了一些莫名其妙的错误和问题,在逐个解决的过程中,体会到有必要对解决上述问题的方法进行总结,以便能够在短时间内尽快发现问题来源并解决问题,现与各位看官探讨学习如下: 解决spark运行调试问题的四把“尖刀”: 1.Log 包括控制台日志.主从节点日志.HDFS日志等.许多错误可以通过日志,直接对错误类型.错误来源进行准确定位,因此,学会读取和分析Log是解决问题的第一步. 2.Google 确定错误类型和原因后,就可以使用Google在Spark User

轻松理解js的函数和构造函数的区别

如何轻松理解js的函数和构造函数的区别,这是个一直头大的问题,很多例子都没有清晰的描述清楚.. 在这里,我就用平常的道理来阐述一下,希望能理解清楚. 从这里开始入手吧 这是普通函数的定义和调用方式.看起来没什么特别的,但是往下看就有奇怪的东西了. 再做一个: 你没发先一个很奇怪的现象吗? 你的这个函数里面并没有返回什么,也就是没有return ,但是你调用的时候却可以接受啊. 如:var p=new Person('niexiaoqian'); alert(p.name); //niexiaoq

理解toString()和hashCode()方法的重写

一般toString()方法会应用在自己bean类中.根据需要 在重写的toString 方法中 组织自己想要显示的当前对象的信息. 比如按一定规则格式返回 所以属性的名称及值. 当你要读取关于对象的一些有用细节时,可以在对象上调用toString(). 如,当把一个对象引用传递给System.out.println();时,该对象的toString()方法被调用. Java中所有的类都继承自Object父类,所以,我们在类中只要重写一下toString()方法就可以显示出我们想要的信息了. 看

理解jquery的.on()方法

jquery在的.on()方法用来给元素绑定事件处理函数的,我经常用在两个地方: 给未来的元素绑定事件:我总是这样用:$(document).on('click','#div1',function(){}); 给拥有同一个父元素的多个子元素绑定事件. 可以查看以前写的博客:jQuery中对未来的元素绑定事件 先来看一个实例: 在页面A的一个div里动态加载页面B,页面B里的一个div绑定了一个单击事件. 页面A如下: <body> <input type="button&quo

C++动态内存管理好难怎么办?零基础图文讲解,小白轻松理解原理

首先我们先了解一下内存: C语言使用malloc/free动态管理内存空间,C++引入了new/delete,new[]/delete[]来动态管理内存.如果大家在自学C++中遇到困难,想找一个学习C++的环境,可以加入我的C++学习交流扣群先是513801371,能够共同学习交流和分享!![](https://s1.51cto.com/images/blog/201905/11/f3795621980960d47c291497e516b846.jpg?x-oss-process=image/w

Java反射理解(五)-- 方法反射的基本操作

Java反射理解(五)-- 方法反射的基本操作 方法的反射 1. 如何获取某个方法 方法的名称和方法的参数列表才能唯一决定某个方法 2. 方法反射的操作 method.invoke(对象,参数列表) 举例 具体操作请看下面举例: import java.lang.reflect.Method; class A{ public void print(){ System.out.println("helloworld"); } public void print(int a,int b){