spark教程(四)-action 操作 group 系列

groupBy(f, numPartitions=None, partitionFunc=<function portable_hash>):根据 条件 分组,这个条件是一个函数;输出 (key,迭代器)

## 条件是分组依据,条件不影响最后的输出格式,输出格式仍和原数据相同
## 如 原来是 [1, 2],经过分组后分到了 第 1 组,输出是 [1, [1, 2]], [1, 2] 完全保留

# 这个例子相当于求 奇偶数
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
result = rdd.groupBy(lambda x: x % 2).collect()     # [(0, <pyspark.resultiterable.ResultIterable object at 0x7f2f76096890>),
                                                    # (1, <pyspark.resultiterable.ResultIterable object at 0x7f2f760965d0>)]
# 解析迭代器并排序
sorted([(x, sorted(y)) for (x, y) in result])       # [(0, [2, 8]), (1, [1, 1, 3, 5])]

groupByKey(numPartitions=None, partitionFunc=<function portable_hash>):按 key 进行分组;输出 (key,迭代器)

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 3)])
rdd.groupByKey().collect()      # [(‘a‘, <pyspark.resultiterable.ResultIterable object at 0x7f2f760a5a90>),
                                # (‘b‘, <pyspark.resultiterable.ResultIterable object at 0x7f2f760a5b10>)]

rdd.groupByKey().mapValues(len).collect()       # [(‘a‘, 2), (‘b‘, 1)]

sorted(rdd.groupByKey().mapValues(list).collect())  # [(‘a‘, [1, 3]), (‘b‘, [1])] 把迭代器转成 list

groupWith(other, *others):把多个 RDD 的 key 进行分组;输出 (key,迭代器)

分组后的数据是有顺序的,每个 key 对应的 value 是按 原本 RDD 的顺序的,如果原本 RDD 没有这个 key,留空

w = sc.parallelize([("a", 5), ("b", 6)])
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
z = sc.parallelize([("b", 42)])
w.groupWith(x, y, z).collect()

[(x, tuple(map(list, y))) for x, y in list(w.groupWith(x, y, z).collect())]     # [(‘a‘, ([5], [1], [2], [])), (‘b‘, ([6], [4], [], [42]))]

未完待续...

原文地址:https://www.cnblogs.com/yanshw/p/11655598.html

时间: 2024-10-12 22:53:34

spark教程(四)-action 操作 group 系列的相关文章

spark transformation与action操作函数

一.Transformation map(func) 返回一个新的分布式数据集,由每个原元素经过函数处理后的新元素组成 filter(func) 返回一个新的数据集,经过fun函数处理后返回值为true的原元素组成 flatMap(func) 类似于map,但每个输入元素会被映射为0个或多个输出元素 mapPartitions(func)  类似于map,对RDD的每个分区起作用 intersection(otherDataset) 求两个RDD的交集 distinct([numTasks])

spark教程(四)-SparkContext 和 RDD 算子

SparkContext SparkContext 是在 spark 库中定义的一个类,作为 spark 库的入口点: 它表示连接到 spark,在进行 spark 操作之前必须先创建一个 SparkContext 的实例,并且只能创建一个: 利用 SparkContext 实例创建的对象都是 RDD,这是相对于 SparkSession 说的,因为 它创建的对象都是 DataFrame: 创建 sc class SparkContext(__builtin__.object): def __i

Spark常用函数讲解--Action操作

摘要: RDD:弹性分布式数据集,是一种特殊集合 ‚ 支持多种来源 ‚ 有容错机制 ‚ 可以被缓存 ‚ 支持并行操作,一个RDD代表一个分区里的数据集RDD有两种操作算子:         Transformation(转换):Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住       了数据集的逻辑操作         Ation(执行):触发Spark作业的运行,真正触发转换算子的计算 本系列主要讲解Spark中常用的函数操作:   

史上最详细的Android Studio系列教程四--Gradle基础

史上最详细的Android Studio系列教程四--Gradle基础

CRL快速开发框架系列教程四(删除数据)

本系列目录 CRL快速开发框架系列教程一(Code First数据表不需再关心) CRL快速开发框架系列教程二(基于Lambda表达式查询) CRL快速开发框架系列教程三(更新数据) CRL快速开发框架系列教程四(删除数据) CRL快速开发框架系列教程五(使用缓存) CRL快速开发框架系列教程六(分布式缓存解决方案) CRL快速开发框架系列教程七(使用事务) CRL快速开发框架系列教程八(使用CRL.Package) CRL快速开发框架系列教程九(导入/导出数据) CRL快速开发框架系列教程十(

spark core源码分析9 从简单例子看action操作

上一节举例讲解了transformation操作,这一节以reduce为例讲解action操作 首先看submitJob方法,它将我们reduce中写的处理函数随JobSubmitted消息传递出去,因为每个分区都需要调用它进行计算: 而resultHandler是指最后合并的方法,在每个task完成后,需要调用resultHandler将最终结果合并.所以它不需要随JobSubmitted消息传递,而是保存在JobWaiter中 /** * Submit a job to the job sc

Android Studio系列教程四--Gradle基础

Android Studio系列教程四--Gradle基础 本文为个人原创,欢迎转载,但请务必在明显位置注明出处! 其实很早之前也写了一篇Gradle的基础博客,但是时间很久了,现在Gradle已经更新了很多,所以暂且结合Stduio 1.0正式版与最新的Gradle语法来详细讲解下,小伙伴们直接跟我一步步来学习吧. 什么是Gradle? Gradle是一种依赖管理工具,基于Groovy语言,面向Java应用为主,它抛弃了基于XML的各种繁琐配置,取而代之的是一种基于Groovy的内部领域特定(

webpack4 系列教程(四): 单页面解决方案--代码分割和懒加载

本节课讲解webpack4打包单页应用过程中的代码分割和代码懒加载.不同于多页面应用的提取公共代码,单页面的代码分割和懒加载不是通过webpack配置来实现的,而是通过webpack的写法和内置函数实现的. 目前webpack针对此项功能提供 2 种函数: import(): 引入并且自动执行相关 js 代码 require.ensure(): 引入但需要手动执行相关 js 代码 本文将会进行逐一讲解. >>> 本节课源码 >>> 所有课程源码 1. 准备工作 此次代码

Spark RDD Action操作

reduce def reduce(f: (T, T) => T): T通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的 1 2 3 4 5 6 7 8 9 10 11 scala> val rdd1 = sc.makeRDD(1 to 10,2) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at makeRDD at <console>:24 scala> rdd1.r