/** * @author DT大数据梦工厂 * 新浪微博 http://weibo.com/ilovepains/ * 微信公众账号:DT_Spark * 直播地址 YY频道:68917580 */object SparkSQLWindowFunctionOps { def main(args: Array[String]) { /** * 创建SparkConf对象,设置Spark程序运行时的配置信息 * 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置为local,则代表 * Spark程序运行在本地,适合机器配置一般的初学者 */ val conf = new SparkConf().setAppName("SparkSQLWindowFunctionOps").setMaster("spark://hadoop2001:7077") /** * 创建SparkContext对象 * SparkContext对象时Spark程序所有功能的唯一入口,无论是scala、java、python等都必须有一个SparkContext。 * SparkContext的核心作用:初始化Spark应用程序运行所需要的核心组件,包括DAGScheduler,TaskScheduler,SchedulerBackend * 同事还会负责Spark程序往Master注册程序等 * SparkContext是整个Spark应用程序中最为至关重要的一个对象 */ val sc = new SparkContext(conf) val hiveContext = new HiveContext(sc) hiveContext.sql("use hive") hiveContext.sql("DROP TABLE IF EXISTS scores") hiveContext.sql("CREATE TABLE IF NOT EXISTS scores(name STRING,score INT)" + " ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘ ‘ LINES TERMINATED BY ‘\\n‘") hiveContext.sql("LOAD DATA LOCAL INPATH ‘/root/test/testdate/topNGroup.txt‘ INTO TABLE scores") /** * 使用子查询的方式完成目标数据的提取,在目标数据内部使用窗口函数row_number来进行分组排序: * PARTITION BY:指定窗口函数分组的Key * ORDER BY : 分组进行排序 */ val result = hiveContext.sql("SELECT name,score " + "FROM (" + "SELECT name,score," + "row_number() OVER (PARTITION BY name ORDER BY score DESC) rank" + " FROM scores " + ") sub_scores " + "WHERE rank <= 4") result.show() //在Driver的控制台上打印出结果内容 hiveContext.sql("DROP TABLE IF EXISTS sortedResultScores") result.saveAsTable("sortedResultScores") } } DT大数据梦工厂由王家林老师及其团队打造,旨在为社会培养100万优秀大数据人才,Spark已是目前大数据行业主流数据处理框架和未来趋势。关注DT大数据梦工厂公众号: DT_Spark 查看免费公开课,内容绝对详细。
YY永久免费直播频道:68917580 王家林老师联系方式:
时间: 2024-10-29 19:06:26