使用dataframe解决spark TopN问题:分组、排序、取TopN

package com.profile.mainimport org.apache.spark.sql.expressions.Windowimport org.apache.spark.sql.functions._

import com.profile.tools.{DateTools, JdbcTools, LogTools, SparkTools}import com.dhd.comment.Constantimport com.profile.comment.Comments /**  * 测试类 //使用dataframe解决spark TopN问题:分组、排序、取TopN  * @author   * date 2017-09-27 14:55  */object Test {

def main(args: Array[String]): Unit = {     val sc=SparkTools.getSparkContext     val sqlContext = new org.apache.spark.sql.SQLContext(sc)     import sqlContext.implicits._

val df = sc.parallelize(Seq(       (0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3),       (1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3),       (2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8),       (3,"cat8",35.6))).toDF("Hour", "Category", "TotalValue")

df.show     /*     +----+--------+----------+     |Hour|Category|TotalValue|     +----+--------+----------+     |   0|   cat26|      30.9|     |   0|   cat13|      22.1|     |   0|   cat95|      19.6|     |   0|  cat105|       1.3|     |   1|   cat67|      28.5|     |   1|    cat4|      26.8|     |   1|   cat13|      12.6|     |   1|   cat23|       5.3|     |   2|   cat56|      39.6|     |   2|   cat40|      29.7|     |   2|  cat187|      27.9|     |   2|   cat68|       9.8|     |   3|    cat8|      35.6|     +----+--------+----------+     */

/*     val w = Window.partitionBy($"Hour").orderBy($"TotalValue".desc)     //取Top1     val dfTop1 = df.withColumn("rn", rowNumber.over(w)).where($"rn" === 1).drop("rn")     //注意:row_number()在spark1.x版本中为rowNumber(),在2.x版本为row_number()     //取Top3     val dfTop3 = df.withColumn("rn", rowNumber.over(w)).where($"rn" <= 3).drop("rn")

dfTop1.show*/     /*     +----+--------+----------+     |Hour|Category|TotalValue|     +----+--------+----------+     |   1|   cat67|      28.5|     |   3|    cat8|      35.6|     |   2|   cat56|      39.6|     |   0|   cat26|      30.9|     +----+--------+----------+     *///     dfTop3.show     /*     +----+--------+----------+     |Hour|Category|TotalValue|     +----+--------+----------+     |   1|   cat67|      28.5|     |   1|    cat4|      26.8|     |   1|   cat13|      12.6|     |   3|    cat8|      35.6|     |   2|   cat56|      39.6|     |   2|   cat40|      29.7|     |   2|  cat187|      27.9|     |   0|   cat26|      30.9|     |   0|   cat13|      22.1|     |   0|   cat95|      19.6|     +----+--------+----------+     */

//使用RDD解决spark TopN问题:分组、排序、取TopN

val rdd1 = sc.parallelize(Seq(       (0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3),       (1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3),       (2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8),       (3,"cat8",35.6)))

val rdd2 = rdd1.map(x => (x._1,(x._2, x._3))).groupByKey()     /*     rdd2.collect     res9: Array[(Int, Iterable[(String, Double)])] = Array((0,CompactBuffer((cat26,30.9), (cat13,22.1), (cat95,19.6), (cat105,1.3))),                                                            (1,CompactBuffer((cat67,28.5), (cat4,26.8), (cat13,12.6), (cat23,5.3))),                                  (2,CompactBuffer((cat56,39.6), (cat40,29.7), (cat187,27.9), (cat68,9.8))),                                  (3,CompactBuffer((cat8,35.6))))

*/     val N_value = 1  //取前3

val rdd3 = rdd2.map( x => {       val i2 = x._2.toBuffer       val i2_2 = i2.sortBy(_._2)       if (i2_2.length > N_value) i2_2.remove(0, (i2_2.length - N_value))       (x._1, i2_2.toIterable)     })

/*      rdd3.collect     res8: Array[(Int, Iterable[(String, Double)])] = Array((0,ArrayBuffer((cat95,19.6), (cat13,22.1), (cat26,30.9))),                                                            (1,ArrayBuffer((cat13,12.6), (cat4,26.8), (cat67,28.5))),                                  (2,ArrayBuffer((cat187,27.9), (cat40,29.7), (cat56,39.6))),                                  (3,ArrayBuffer((cat8,35.6))))     */

val rdd4 = rdd3.flatMap(x => {       val y = x._2       for (w <- y) yield (x._1, w._1, w._2)     })

rdd4.collect     /*     res3: Array[(Int, String, Double)] = Array((0,cat95,19.6), (0,cat13,22.1), (0,cat26,30.9),                                                (1,cat13,12.6), (1,cat4,26.8), (1,cat67,28.5),                            (2,cat187,27.9), (2,cat40,29.7), (2,cat56,39.6),                            (3,cat8,35.6))     */

rdd4.toDF("Hour", "Category", "TotalValue").show    /* +----+--------+----------+     |Hour|Category|TotalValue|     +----+--------+----------+     |   0|   cat95|      19.6|     |   0|   cat13|      22.1|     |   0|   cat26|      30.9|     |   2|  cat187|      27.9|     |   2|   cat40|      29.7|     |   2|   cat56|      39.6|     |   1|   cat13|      12.6|     |   1|    cat4|      26.8|     |   1|   cat67|      28.5|     |   3|    cat8|      35.6|     +----+--------+----------+*/

}

}
时间: 2024-11-13 04:59:22

使用dataframe解决spark TopN问题:分组、排序、取TopN的相关文章

分组排序取次数

原文:分组排序取次数 函数 row_number() row_number() over (partition by patient_id /*需要分组的列*/ order by zyid /*排序的列*/) sqlSELECT ZYID, patient_id ,row_number() over (partition by patient_id order by zyid) visit_id from jk_patient_in_hospital oracle create or repla

hive 分组排序,topN

语法格式:row_number() OVER (partition by COL1 order by COL2 desc ) rankpartition by:类似hive的建表,分区的意思:order by :排序,默认是升序,加desc降序:rank:表示别名表示根据COL1分组,在分组内部根据 COL2排序,而此函数计算的值就表示每组内部排序后的顺序编号(组内连续的唯一的) -- 分组排序-- 求某用户日期最大的3天select a.* from( select p_day,muuid,r

mysql分组排序取最大值所在行的实现方法

如下图, 计划实现 :按照 parent_code 分组, 取组中code最大值所在的整条记录,如红色部分.(类似hive中: row_number() over(partition by)) select c.* from ( select a.*, (@i := case when @key_i=parent_code then @i+1 else 1 end) as sort_num,(@key_i:=parent_code) as tmp from my_test a, (SELECT

[MySQL] 分组排序取前N条记录以及生成自动数字序列,类似group by后 limit

前言:         同事的业务场景是,按照cid.author分组,再按照id倒叙,取出前2条记录出来.        oracle里面可以通过row_number() OVER (PARTITION BY cid,author ORDER BY id DESC) 表示根据cid,author分组,在分组内部根据id排序,而此函数计算的值就表示每组内部排序后的顺序编号(组内连续的唯一的),而mysql数据库就没有这样的统计函数,需要自己写复杂的sql来实现. 1,录入测试数据 USE csd

关于oracle分组排序取值的问题

按照 某字段分组 某字段排序 然后取出该分组中排第1条数据(每组只取一条) SELECT* FROM( SELECT a.*,row_number() over(partition by ORI_FEE_ID order by MODIFY_TIME DESC) cn FROM AGENT_RESERVE_FEE_RATE a ) WHERE cn = 1; - MODIFY_TIME 排序字段- ORI_FEE_ID 分组字段- cn 为取值区间

MySQL分组排序取前N条记录以及生成自动数字序列--group by 后 limit 外加 rownumber

同事提了一个需求,要求按照某列分组,然后将各组的前几条抽取出来. 表结构 CREATE TABLE `total_freq_ctrl` ( `time` int(10) unsigned NOT NULL, `machine` char(64) NOT NULL, `module` char(32) NOT NULL, `total_flow` int(10) unsigned NOT NULL, `deny_flow` int(10) unsigned NOT NULL, PRIMARY KE

oracle中分组排序取TOP n

数据库中数据处理时,经常需要对某一类的数据取Top n,这就涉及到分组group by 并排序 order by ,之后根据rownum获取前几名数据,oracle数据库中ROW_NUMBER() OVER函数可以实现这个功能,如下: /*获取tab_a表中每个省份counts最大的前10个kpi*/SELECT * FROM (SELECT ROW_NUMBER() OVER(PARTITION BY prov ORDER BY counts DESC) rn, prov, kpi, coun

mysql或者oracle分组排序取前几条数据

mysql: select a.* from(select t1.*,(select count(*)+1 from 表 where 分组字段=t1.分组字段 and 排序字段<t1.排序字段) as group_idfrom 表 t1) awhere a.group_id<=3 Oracle: SELECT t.*            FROM (SELECT ROW_NUMBER() OVER(PARTITION BY 分组字段 ORDER BY 排序字段 DESC) rn,      

Oracle 中分组排序取值的问题

整理一下排序: 建表语句:create table EXAM(  name    VARCHAR2(32),  subject VARCHAR2(32),  score   INTEGER)数据:INSERT INTO EXAM VALUES ('赵柳', '数学', '71');INSERT INTO EXAM VALUES ('张三', '数学', '81');INSERT INTO EXAM VALUES ('李四', '数学', '75');INSERT INTO EXAM VALUES