spark TopN问题:dataframe和RDD比较

spark版本:spark 2.0.2

scala版本:2.11.8

服务器版本:CentOS 6.7

spark TopN问题,其实就是分组、排序、组内取值问题。

在shell下输入

spark-shell

 进入spark后输入以下命令:

//使用dataframe解决spark TopN问题:分组、排序、取TopN
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val df = sc.parallelize(Seq(
  (0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3),
  (1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3),
  (2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8),
  (3,"cat8",35.6))).toDF("Hour", "Category", "TotalValue")

df.show
/*
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   0|   cat26|      30.9|
|   0|   cat13|      22.1|
|   0|   cat95|      19.6|
|   0|  cat105|       1.3|
|   1|   cat67|      28.5|
|   1|    cat4|      26.8|
|   1|   cat13|      12.6|
|   1|   cat23|       5.3|
|   2|   cat56|      39.6|
|   2|   cat40|      29.7|
|   2|  cat187|      27.9|
|   2|   cat68|       9.8|
|   3|    cat8|      35.6|
+----+--------+----------+
*/

val w = Window.partitionBy($"hour").orderBy($"TotalValue".desc)
//取Top1
val dfTop1 = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")
//注意:row_number()在spark1.x版本中为rowNumber()
//取Top3
val dfTop3 = df.withColumn("rn", row_number.over(w)).where($"rn" <= 3).drop("rn")

dfTop1.show
/*
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   1|   cat67|      28.5|
|   3|    cat8|      35.6|
|   2|   cat56|      39.6|
|   0|   cat26|      30.9|
+----+--------+----------+
*/
dfTop3.show
/*
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   1|   cat67|      28.5|
|   1|    cat4|      26.8|
|   1|   cat13|      12.6|
|   3|    cat8|      35.6|
|   2|   cat56|      39.6|
|   2|   cat40|      29.7|
|   2|  cat187|      27.9|
|   0|   cat26|      30.9|
|   0|   cat13|      22.1|
|   0|   cat95|      19.6|
+----+--------+----------+
*/

//使用RDD解决spark TopN问题:分组、排序、取TopN

val rdd1 = sc.parallelize(Seq(
  (0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3),
  (1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3),
  (2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8),
  (3,"cat8",35.6)))

val rdd2 = rdd1.map(x => (x._1,(x._2, x._3))).groupByKey()
/*
rdd2.collect
res9: Array[(Int, Iterable[(String, Double)])] = Array((0,CompactBuffer((cat26,30.9), (cat13,22.1), (cat95,19.6), (cat105,1.3))),
                                                       (1,CompactBuffer((cat67,28.5), (cat4,26.8), (cat13,12.6), (cat23,5.3))),
													   (2,CompactBuffer((cat56,39.6), (cat40,29.7), (cat187,27.9), (cat68,9.8))),
													   (3,CompactBuffer((cat8,35.6))))

*/
val N_value = 3

val rdd3 = rdd2.map( x => {
    val i2 = x._2.toBuffer
    val i2_2 = i2.sortBy(_._2)
    if (i2_2.length > N_value) i2_2.remove(0, (i2_2.length - N_value))
    (x._1, i2_2.toIterable)
})

/*
 rdd3.collect
res8: Array[(Int, Iterable[(String, Double)])] = Array((0,ArrayBuffer((cat95,19.6), (cat13,22.1), (cat26,30.9))),
                                                       (1,ArrayBuffer((cat13,12.6), (cat4,26.8), (cat67,28.5))),
													   (2,ArrayBuffer((cat187,27.9), (cat40,29.7), (cat56,39.6))),
													   (3,ArrayBuffer((cat8,35.6))))
*/

val rdd4 = rdd3.flatMap(x => {
    val y = x._2
    for (w <- y) yield (x._1, w._1, w._2)
})

rdd4.collect
/*
res3: Array[(Int, String, Double)] = Array((0,cat95,19.6), (0,cat13,22.1), (0,cat26,30.9),
                                           (1,cat13,12.6), (1,cat4,26.8), (1,cat67,28.5),
										   (2,cat187,27.9), (2,cat40,29.7), (2,cat56,39.6),
										   (3,cat8,35.6))
*/

rdd4.toDF("Hour", "Category", "TotalValue").show
/*
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   0|   cat95|      19.6|
|   0|   cat13|      22.1|
|   0|   cat26|      30.9|
|   1|   cat13|      12.6|
|   1|    cat4|      26.8|
|   1|   cat67|      28.5|
|   2|  cat187|      27.9|
|   2|   cat40|      29.7|
|   2|   cat56|      39.6|
|   3|    cat8|      35.6|
+----+--------+----------+
*/

  

  参考资料:

http://stackoverflow.com/questions/33878370/spark-dataframe-select-the-first-row-of-each-group

《Spark MLlib机器学习》

 

时间: 2024-12-21 03:46:52

spark TopN问题:dataframe和RDD比较的相关文章

使用dataframe解决spark TopN问题:分组、排序、取TopN

package com.profile.mainimport org.apache.spark.sql.expressions.Windowimport org.apache.spark.sql.functions._ import com.profile.tools.{DateTools, JdbcTools, LogTools, SparkTools}import com.dhd.comment.Constantimport com.profile.comment.Comments /**

转】Spark SQL 之 DataFrame

原博文出自于: http://www.cnblogs.com/BYRans/p/5003029.html 感谢! Spark SQL 之 DataFrame 转载请注明出处:http://www.cnblogs.com/BYRans/ 概述(Overview) Spark SQL是Spark的一个组件,用于结构化数据的计算.Spark SQL提供了一个称为DataFrames的编程抽象,DataFrames可以充当分布式SQL查询引擎. DataFrames DataFrame是一个分布式的数据

spark结构化数据处理:Spark SQL、DataFrame和Dataset

本文讲解Spark的结构化数据处理,主要包括:Spark SQL.DataFrame.Dataset以及Spark SQL服务等相关内容.本文主要讲解Spark 1.6.x的结构化数据处理相关东东,但因Spark发展迅速(本文的写作时值Spark 1.6.2发布之际,并且Spark 2.0的预览版本也已发布许久),因此请随时关注Spark SQL官方文档以了解最新信息. 文中使用Scala对Spark SQL进行讲解,并且代码大多都能在spark-shell中运行,关于这点请知晓. 概述 相比于

第56课:Spark SQL和DataFrame的本质

一.Spark SQL与Dataframe Spark SQL之所以是除Spark core以外最大和最受关注的组件的原因: a) 能处理一切存储介质和各种格式的数据(你同时可以方便的扩展Spark SQL的功能来支持更多的数据类型,例如KUDO) b)Spark SQL 把数据仓库的计算能力推向了一个新的高度.不仅是无敌的计算速度(Spark SQL比Shark快了一个数量级,Shark比Hive快了一个数量级),尤其是在tungsten成熟以后会更加无可匹敌.更为重要的是把数据仓库的计算复杂

Spark修炼之道(进阶篇)——Spark入门到精通:第十三节 Spark Streaming—— Spark SQL、DataFrame与Spark Streaming

主要内容 Spark SQL.DataFrame与Spark Streaming 1. Spark SQL.DataFrame与Spark Streaming 源码直接参照:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala import org.apache.spark.SparkConf

Spark SQL and DataFrame Guide(1.4.1)——之Data Sources

数据源(Data Sources) Spark SQL通过DataFrame接口支持多种数据源操作.一个DataFrame可以作为正常的RDD操作,也可以被注册为临时表. 1. 通用的Load/Save函数 默认的数据源适用所有操作(可以用spark.sql.sources.default设置默认值) 之后,我们就可以使用hadoop fs -ls /user/hadoopuser/在此目录下找到namesAndFavColors.parquet文件. 手动指定数据源选项 我们可以手动指定数据源

Spark SQL和DataFrame的学习总结

1.DataFrame 一个以命名列组织的分布式数据集.概念上相当于关系数据库中一张表或在R / Python中的data frame数据结构,但DataFrame有丰富的优化.在spark 1.3之前,核心的新类型为RDD-schemaRDD,现改为DataFrame.spark 通过DataFrame操作大量的数据源,包括外部文件(如 json.avro.parquet.sequencefile 等等).hive.关系数据库.cassandra等. DataFrame与RDD区别: RDD以

Spark 学习(四)RDD自定义分区和缓存

一,简介 二,自定义分区规则 2.1 普通的分组TopN实现 2.2 自定义分区规则TopN实现 三,RDD的缓存 3.1 RDD缓存简介 3.2 RDD缓存方式 正文 一,简介 在之前的文章中,我们知道RDD的有一个特征:就是一组分片(Partition),即数据集的基本组成单位.对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度.用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值.默认值就是程序所分配到的CPU Core的数目.这个分配的规则我们是

Learning Spark中文版--第三章--RDD编程(1)

? ?本章介绍了Spark用于数据处理的核心抽象概念,具有弹性的分布式数据集(RDD).一个RDD仅仅是一个分布式的元素集合.在Spark中,所有工作都表示为创建新的RDDs.转换现有的RDDs,或者调用RDDs上的操作来计算结果.在底层,Spark自动将数据中包含的数据分发到你的集群中,并将你对它们执行的操作进行并行化.数据科学家和工程师都应该阅读这一章,因为RDDs是Spark的核心概念.我们强烈建议你在这些例子中尝试一些 交互式shell(参见"Spark的Python和Scala she