sparksql系列(三) sparksql列操作、窗口函数、join

一:Sparksql列操作

初始化SparkContext及数据:

import java.util.Arrays

import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{DataFrame, Row, SparkSession, functions}
import org.apache.spark.sql.functions.{col, desc, length, row_number, trim, when}
import org.apache.spark.sql.functions.{countDistinct,sum,count,avg}
import org.apache.spark.sql.functions.concat
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SaveMode
import java.util.ArrayList

object WordCount {

  def initSparkAndData() : DataFrame = {

    val sparkSession= SparkSession.builder().master("local").appName("AppName").getOrCreate()
    val javasc = new JavaSparkContext(sparkSession.sparkContext)
    val nameRDD = javasc.parallelize(Arrays.asList("{‘name‘:‘wangwu‘,‘age‘:‘18‘,‘vip‘:‘t‘}",
      "{‘name‘:‘sunliu‘,‘age‘:‘19‘,‘vip‘:‘t‘}","{‘name‘:‘zhangsan‘,‘age‘:‘20‘,‘vip‘:‘f‘}"));
    val namedf = sparkSession.read.json(nameRDD)

    namedf
  }
}

增加列

    val data = initSparkAndData()

    //方法一:可以添加常量值
    data.select(when(col("name").isNotNull,1).otherwise(0) as "usergroup").show(100)   
    //方法二:  只能已经存在的列操作
    data.withColumn("time", concat(col("age"),col("name")) ).show(100)

删除列

    val data = initSparkAndData()
    data.drop("vip").show(100)

二:SparkSql 窗口函数

    传统数据库中就有这个函数,就是partation by () order by ()。那下面让我们看看sparksql中怎么写:

    val data = initSparkAndData()
    data.withColumn("isVsip", row_number().over(Window.partitionBy(col("vip")).orderBy(desc("name")))).show(100)

    上面的意思是按照VIP分组,后按照name排序,作为新的列isVsip。项目中用来作为提取最新记录的函数,举例如下:

    统计出用户最近登录记录:

    val sparkSession= SparkSession.builder().master("local").appName("AppName").getOrCreate()
    val javasc = new JavaSparkContext(sparkSession.sparkContext)

    val nameRDD1 = javasc.parallelize(Arrays.asList("{‘name‘:‘wangwu‘,‘time‘:‘2019-08-12‘}",
      "{‘name‘:‘sunliu‘,‘time‘:‘2019-08-13‘}","{‘name‘:‘zhangsan‘,‘time‘:‘2019-08-14‘}"));
    val namedf1 = sparkSession.read.json(nameRDD1)

    val nameRDD2 = javasc.parallelize(Arrays.asList("{‘name‘:‘wangwu‘,‘time‘:‘2019-09-12‘}",
      "{‘name‘:‘sunliu‘,‘time‘:‘2019-08-13‘}","{‘name‘:‘zhangsan‘,‘time‘:‘2019-07-14‘}"));
    val namedf2 = sparkSession.read.json(nameRDD2)
    //上面全是构造数据。
    namedf1.union(namedf2).withColumn("max_time", row_number().over(Window.partitionBy(col("name")).orderBy(desc("time"))))
      .filter(col("max_time") ===1).show(100)

三:Sparksql join操作

初始化SparkContext及数据:

import java.util.Arrays

import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{DataFrame, Row, SparkSession, functions}
import org.apache.spark.sql.functions.{col, desc, length, row_number, trim, when}
import org.apache.spark.sql.functions.{countDistinct,sum,count,avg}
import org.apache.spark.sql.functions.concat
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SaveMode
import java.util.ArrayList

object WordCount {
  def joinTestData() = {
    val sparkSession= SparkSession.builder().master("local").appName("AppName").getOrCreate()
    val javasc = new JavaSparkContext(sparkSession.sparkContext)

    val nameRDD = javasc.parallelize(Arrays.asList("{‘name‘:‘zhangsan‘,‘age‘:‘18‘,‘sex‘:‘N‘}", "{‘name‘:‘lisi‘,‘age‘:‘19‘,‘sex‘:‘F‘}","{‘‘:‘‘,‘‘:‘‘,‘‘:‘‘}"));
    val nameRDD1 = javasc.parallelize(Arrays.asList("{‘name‘:‘wangwu‘,‘age‘:‘18‘,‘vip‘:‘t‘}", "{‘name‘:‘sunliu‘,‘age‘:‘19‘,‘vip‘:‘t‘}","{‘name‘:‘zhangsan‘,‘age‘:‘18‘,‘vip‘:‘f‘}"));
    val data1 = sparkSession.read.json(nameRDD)
    val data2 = sparkSession.read.json(nameRDD1)

    (data1,data2)
  }
}

left、leftouter、left_outer三者相同

    val dataTuple = joinTestData()
    val data1 = dataTuple._1
    val data2 = dataTuple._2

    val left = data1.join(data2,data1("name") === data2("name") ,"left").show(100)

      结果:

age name sex age name vip
null null null null null null
18 zhangsan N 18 zhangsan f
19 lisi f null null null

right、rightouter、right_outer三者相同

    val dataTuple = joinTestData()
    val data1 = dataTuple._1
    val data2 = dataTuple._2

    val right = data1.join(data2,data1("name") === data2("name") ,"right").show(100)

      结果:

age name sex age name vip
null null null 18 wangwu t
18 zhangsan N 18 zhangsan f
null null null   sunliu t

cross、inner两者相同

    val dataTuple = joinTestData()
    val data1 = dataTuple._1
    val data2 = dataTuple._2

    val right = data1.join(data2,data1("name") === data2("name") ,"right").show(100)

      结果:

age name sex age name vip
18 zhangsan N 18 zhangsan f

full、fullouter、full_outer、outer四者相同

    val dataTuple = joinTestData()
    val data1 = dataTuple._1
    val data2 = dataTuple._2

    val full = data1.join(data2,data1("name") === data2("name") ,"full").show(100)

      结果:

age name sex age name vip
null null null 18 wangwu t
null null null null null null
18 zhangsan N 18 zhangsan f
null null null 19 sunliu t
19 lisi F null null null

leftsemi(innerjoin之后只保留左边的)

    val dataTuple = joinTestData()
    val data1 = dataTuple._1
    val data2 = dataTuple._2

    val leftsemi = data1.join(data2,data1("name") === data2("name") ,"leftsemi").show(100)

    真正在项目中的使用:项目中有一张大表,主键是用户ID,里面有用户所有基本信息。项目使用过程中一般要求关联大表取得所有基本信息,leftsemi一般用于缩减大表。

      结果:

age name sex
18 zhangsan N

leftanti(innerjoin之后去除能关联上之外的)

    val dataTuple = joinTestData()
    val data1 = dataTuple._1
    val data2 = dataTuple._2

    val leftouter = data1.join(data2,data1("name") === data2("name") ,"leftanti").show(100)

      结果:

age name sex
null null null
19 lisi F

原文地址:https://www.cnblogs.com/wuxiaolong4/p/11706811.html

时间: 2024-10-06 18:51:52

sparksql系列(三) sparksql列操作、窗口函数、join的相关文章

SQL Server中的三种物理连接操作:嵌套循环连接、合并连接、哈希匹配

浅谈SQL Server中的三种物理连接操作 Merge join.Hash join.Nested loop join对比分析 版权声明:本文为博主原创文章,未经博主允许不得转载.

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送–Spark入门实战系列>获取 1 运行环境说明 1.1 硬软件环境 主机操作系统:Windows 64位,双核4线程,主频2.2G,10G内存 虚拟软件:VMware? Workstation 9.0.0 build-812388 虚拟机操作系统:CentOS 64位,单核 虚拟机运行环境: JDK:1.7.0_55 64位 Hadoop:2.2.0(需要编译为64位) Scala:2.10.4 Spark:1.1.0(需要编译)

Spark入门实战系列--6.SparkSQL(上)--SparkSQL简介

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.SparkSQL的发展历程 1.1 Hive and Shark SparkSQL的前身是Shark,给熟悉RDBMS但又不理解MapReduce的技术人员提供快速上手的工具,Hive应运而生,它是当时唯一运行在Hadoop上的SQL-on-Hadoop工具.但是MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的I/O,降低的运行效率,为了提高SQL-on-Hadoop的效率,大量的

Spark入门实战系列--6.SparkSQL(中)--深入了解运行计划及调优

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.1  运行环境说明 1.1.1 硬软件环境 l  主机操作系统:Windows 64位,双核4线程,主频2.2G,10G内存 l  虚拟软件:VMware® Workstation 9.0.0 build-812388 l  虚拟机操作系统:CentOS6.5 64位,单核 l  虚拟机运行环境: Ø  JDK:1.7.0_55 64位 Ø  Hadoop:2.2.0(需要编译为64位) Ø 

SQL Server 2008空间数据应用系列三:SQL Server 2008空间数据类型

原文:SQL Server 2008空间数据应用系列三:SQL Server 2008空间数据类型 友情提示,您阅读本篇博文的先决条件如下: 1.本文示例基于Microsoft SQL Server 2008 R2调测. 2.具备 Transact-SQL 编程经验和使用 SQL Server Management Studio 的经验. 3.熟悉或了解Microsoft SQL Server 2008中的空间数据类型. 4.具备相应的GIS专业理论知识. 5.其他相关知识. SQL Serve

Cocos2d-x 系列三之C++语言

1.面向对象<1>直接定义类 class People { public: void sayHello() { printf("hello c oop \n"); } }; People * p = new People(); p->sayHello(); <2>使用头文件定义 Ml.h #ifndef ML_H_ #define ML_H_ namespace cjyn { // 命名空间 class Ml { public: void sayHello

Android高效率编码-第三方SDK详解系列(三)——JPush推送牵扯出来的江湖恩怨,XMPP实现推送,自定义客户端推送

Android高效率编码-第三方SDK详解系列(三)--JPush推送牵扯出来的江湖恩怨,XMPP实现推送,自定义客户端推送 很久没有更新第三方SDK这个系列了,所以更新一下这几天工作中使用到的推送,写这个系列真的很要命,你要去把他们的API文档大致的翻阅一遍,而且各种功能都实现一遍,解决各种bug各种坑,不得不说,极光推送真坑,大家使用还是要慎重,我们看一下极光推送的官网 https://www.jpush.cn/common/ 推送比较使用,很多软件有需要,所以在这个点拿出来多讲讲,我们本节

Exchange 2013SP1和O365混合部署系列三

继续,基本上大多数都是截图,在某些地方,会有一些提示. 下面就是和本地域进行同步,主要用到的是Dirsync工具. 全都是下一步. 继续下一步. 继续下一步. 这里输入的就是O365的管理员账号.也就是当时注册的那个. 本地管理员账号. 继续下一步. 开始自动配置. 开始同步本地的账户.到WAAD. 图上我们可以看到同步状态. 继续下面的操作,激活已同步的用户. 根据需要选择. 有印象没?安装同步工具的时候选的那个密码同步的选项. 先到这吧,下篇开始在本地Exchange 2013 SP1上配置

Apache Kafka系列(三) Java API使用

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka系列(三) Java API使用 摘要: Apache Kafka Java Client API 一.基本概念 Kafka集成了Producer/Consumer连接Broker的客户端工具,但是在消息处理方面,这两者主要用于服务端(Broker)的简单操作,如: 1.创建Topic 2.罗列出已存在的Topic 3.对已有Topic的Produce/Consume测试