Update:sparksql:第3节 Dataset (DataFrame) 的基础操作 & 第4节 SparkSQL_聚合操作_连接操作

  • 8. Dataset (DataFrame) 的基础操作

    • 8.1. 有类型操作
    • 8.2. 无类型转换
    • 8.5. Column 对象
  • 9. 缺失值处理
  • 10. 聚合
  • 11. 连接

8. Dataset (DataFrame) 的基础操作

导读

这一章节主要目的是介绍 Dataset 的基础操作, 当然, DataFrame 就是 Dataset, 所以这些操作大部分也适用于 DataFrame

  1. 有类型的转换操作
  2. 无类型的转换操作
  3. 基础 Action
  4. 空值如何处理
  5. 统计操作

8.1. 有类型操作

分类 算子 解释

转换


flatMap

通过 flatMap 可以将一条数据转为一个数组, 后再展开这个数组放入 Dataset

import spark.implicits._
val ds = Seq("hello world", "hello pc").toDS()
ds.flatMap( _.split(" ") ).show()

map

map 可以将数据集中每条数据转为另一种形式

import spark.implicits._
val ds = Seq(Person("zhangsan", 15), Person("lisi", 15)).toDS()
ds.map( person => Person(person.name, person.age * 2) ).show()

mapPartitions

mapPartitions 和 map 一样, 但是 map 的处理单位是每条数据, mapPartitions的处理单位是每个分区

import spark.implicits._
val ds = Seq(Person("zhangsan", 15), Person("lisi", 15)).toDS()
ds.mapPartitions( iter => {
    val returnValue = iter.map(
      item => Person(item.name, item.age * 2)
    )
    returnValue
  } )
  .show()

transform

map 和 mapPartitions 以及 transform 都是转换, map 和 mapPartitions 是针对数据, 而 transform 是针对整个数据集, 这种方式最大的区别就是 transform 可以直接拿到 Dataset 进行操作

import spark.implicits._
val ds = spark.range(5)
ds.transform( dataset => dataset.withColumn("doubled", ‘id * 2) )

as

as[Type] 算子的主要作用是将弱类型的 Dataset 转为强类型的 Dataset, 它有很多适用场景, 但是最常见的还是在读取数据的时候, 因为 DataFrameReader 体系大部分情况下是将读出来的数据转换为 DataFrame 的形式, 如果后续需要使用 Dataset的强类型 API, 则需要将 DataFrame 转为 Dataset. 可以使用 as[Type] 算子完成这种操作

import spark.implicits._

val structType = StructType(
  Seq(
    StructField("name", StringType),
    StructField("age", IntegerType),
    StructField("gpa", FloatType)
  )
)

val sourceDF = spark.read
  .schema(structType)
  .option("delimiter", "\t")
  .csv("dataset/studenttab10k")

val dataset = sourceDF.as[Student]
dataset.show()

过滤


filter

filter 用来按照条件过滤数据集

import spark.implicits._
val ds = Seq(Person("zhangsan", 15), Person("lisi", 15)).toDS()
ds.filter( person => person.name == "lisi" ).show()

聚合


groupByKey

grouByKey 算子的返回结果是 KeyValueGroupedDataset, 而不是一个 Dataset, 所以必须要先经过 KeyValueGroupedDataset 中的方法进行聚合, 再转回 Dataset, 才能使用 Action 得出结果

其实这也印证了分组后必须聚合的道理

import spark.implicits._
val ds = Seq(Person("zhangsan", 15), Person("zhangsan", 15), Person("lisi", 15)).toDS()
ds.groupByKey( person => person.name ).count().show()

切分


randomSplit

randomSplit 会按照传入的权重随机将一个 Dataset 分为多个 Dataset, 传入 randomSplit 的数组有多少个权重, 最终就会生成多少个 Dataset, 这些权重的加倍和应该为 1, 否则将被标准化

val ds = spark.range(15)
val datasets: Array[Dataset[lang.Long]] = ds.randomSplit(Array[Double](2, 3))
datasets.foreach(dataset => dataset.show())

sample

sample 会随机在 Dataset 中抽样

val ds = spark.range(15)
ds.sample(withReplacement = false, fraction = 0.4).show()

排序


orderBy

orderBy 配合 Column 的 API, 可以实现正反序排列

import spark.implicits._
val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
ds.orderBy("age").show()
ds.orderBy(‘age.desc).show()

sort

其实 orderBy 是 sort 的别名, 所以它们所实现的功能是一样的

import spark.implicits._
val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
ds.sort(‘age.desc).show()

分区


coalesce

减少分区, 此算子和 RDD 中的 coalesce 不同, Dataset 中的 coalesce 只能减少分区数, coalesce 会直接创建一个逻辑操作, 并且设置 Shuffle 为 false

val ds = spark.range(15)
ds.coalesce(1).explain(true)

repartitions

repartitions 有两个作用, 一个是重分区到特定的分区数, 另一个是按照某一列来分区, 类似于 SQL 中的 DISTRIBUTE BY

val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
ds.repartition(4)
ds.repartition(‘name)

去重


dropDuplicates

使用 dropDuplicates 可以去掉某一些列中重复的行

import spark.implicits._
val ds = spark.createDataset(Seq(Person("zhangsan", 15), Person("zhangsan", 15), Person("lisi", 15)))
ds.dropDuplicates("age").show()

distinct

当 dropDuplicates 中没有传入列名的时候, 其含义是根据所有列去重, dropDuplicates() 方法还有一个别名, 叫做 distinct

所以, 使用 distinct 也可以去重, 并且只能根据所有的列来去重

import spark.implicits._
val ds = spark.createDataset(Seq(Person("zhangsan", 15), Person("zhangsan", 15), Person("lisi", 15)))
ds.distinct().show()

集合操作


except

except 和 SQL 语句中的 except 一个意思, 是求得 ds1 中不存在于 ds2 中的数据, 其实就是差集

val ds1 = spark.range(1, 10)
val ds2 = spark.range(5, 15)

ds1.except(ds2).show()

intersect

求得两个集合的交集

val ds1 = spark.range(1, 10)
val ds2 = spark.range(5, 15)

ds1.intersect(ds2).show()

union

求得两个集合的并集

val ds1 = spark.range(1, 10)
val ds2 = spark.range(5, 15)

ds1.union(ds2).show()

limit

限制结果集数量

val ds = spark.range(1, 10)
ds.limit(3).show()

8.2. 无类型转换

分类 算子 解释

选择


select

select 用来选择某些列出现在结果集中

import spark.implicits._
val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
ds.select($"name").show()

selectExpr

在 SQL 语句中, 经常可以在 select 子句中使用 count(age)rand() 等函数, 在 selectExpr 中就可以使用这样的 SQL 表达式, 同时使用 select 配合 expr函数也可以做到类似的效果

import spark.implicits._
import org.apache.spark.sql.functions._
val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
ds.selectExpr("count(age) as count").show()
ds.selectExpr("rand() as random").show()
ds.select(expr("count(age) as count")).show()

withColumn

通过 Column 对象在 Dataset 中创建一个新的列或者修改原来的列

import spark.implicits._
import org.apache.spark.sql.functions._
val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
ds.withColumn("random", expr("rand()")).show()

withColumnRenamed

修改列名

import spark.implicits._
val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
ds.withColumnRenamed("name", "new_name").show()

剪除


drop

剪掉某个列

import spark.implicits._
val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
ds.drop(‘age).show()

聚合


groupBy

按照给定的行进行分组

import spark.implicits._
val ds = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()
ds.groupBy(‘name).count().show()

8.5. Column 对象

导读

Column 表示了 Dataset 中的一个列, 并且可以持有一个表达式, 这个表达式作用于每一条数据, 对每条数据都生成一个值, 之所以有单独这样的一个章节是因为列的操作属于细节, 但是又比较常见, 会在很多算子中配合出现

分类 操作 解释

创建


单引号  在 Scala 中是一个特殊的符号, 通过  会生成一个 Symbol 对象, Symbol 对象可以理解为是一个字符串的变种, 但是比字符串的效率高很多, 在 Spark 中, 对 Scala 中的 Symbol 对象做了隐式转换, 转换为一个 ColumnName 对象, ColumnName 是 Column 的子类, 所以在 Spark中可以如下去选中一个列

val spark = SparkSession.builder().appName("column").master("local[6]").getOrCreate()
import spark.implicits._
val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()

val c1: Symbol = ‘name

$

同理, $ 符号也是一个隐式转换, 同样通过 spark.implicits 导入, 通过 $ 可以生成一个 Column 对象

val spark = SparkSession.builder().appName("column").master("local[6]").getOrCreate()
import spark.implicits._
val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()

val c2: ColumnName = $"name"

col

SparkSQL 提供了一系列的函数, 可以通过函数实现很多功能, 在后面课程中会进行详细介绍, 这些函数中有两个可以帮助我们创建 Column 对象, 一个是 col, 另外一个是 column

val spark = SparkSession.builder().appName("column").master("local[6]").getOrCreate()
import org.apache.spark.sql.functions._
val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()

val c3: sql.Column = col("name")

column

val spark = SparkSession.builder().appName("column").master("local[6]").getOrCreate()
import org.apache.spark.sql.functions._
val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()

val c4: sql.Column = column("name")

Dataset.col

前面的 Column 对象创建方式所创建的 Column 对象都是 Free 的, 也就是没有绑定任何 Dataset, 所以可以作用于任何 Dataset, 同时, 也可以通过 Dataset 的 col 方法选择一个列, 但是这个 Column 是绑定了这个 Dataset 的, 所以只能用于创建其的 Dataset 上

val spark = SparkSession.builder().appName("column").master("local[6]").getOrCreate()
val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()

val c5: sql.Column = personDF.col("name")

Dataset.apply

可以通过 Dataset 对象的 apply 方法来获取一个关联此 Dataset 的 Column 对象

val spark = SparkSession.builder().appName("column").master("local[6]").getOrCreate()
val personDF = Seq(Person("zhangsan", 12), Person("zhangsan", 8), Person("lisi", 15)).toDS()

val c6: sql.Column = personDF.apply("name")

apply 的调用有一个简写形式

val c7: sql.Column = personDF("name")

别名和转换


as[Type]

as 方法有两个用法, 通过 as[Type] 的形式可以将一个列中数据的类型转为 Type 类型

personDF.select(col("age").as[Long]).show()

as(name)

通过 as(name) 的形式使用 as 方法可以为列创建别名

personDF.select(col("age").as("age_new")).show()

添加列


withColumn

通过 Column 在添加一个新的列时候修改 Column 所代表的列的数据

personDF.withColumn("double_age", ‘age * 2).show()

操作


like

通过 Column 的 API, 可以轻松实现 SQL 语句中 LIKE 的功能

personDF.filter(‘name like "%zhang%").show()

isin

通过 Column 的 API, 可以轻松实现 SQL 语句中 ISIN 的功能

personDF.filter(‘name isin ("hello", "zhangsan")).show()

sort

在排序的时候, 可以通过 Column 的 API 实现正反序

personDF.sort(‘age.asc).show()
personDF.sort(‘age.desc).show()

9. 缺失值处理

导读

  1. DataFrame 中什么时候会有无效值
  2. DataFrame 如何处理无效的值
  3. DataFrame 如何处理 null
缺失值的处理思路

如果想探究如何处理无效值, 首先要知道无效值从哪来, 从而分析可能产生的无效值有哪些类型, 在分别去看如何处理无效值

什么是缺失值

一个值本身的含义是这个值不存在则称之为缺失值, 也就是说这个值本身代表着缺失, 或者这个值本身无意义, 比如说 null, 比如说空字符串

关于数据的分析其实就是统计分析的概念, 如果这样的话, 当数据集中存在缺失值, 则无法进行统计和分析, 对很多操作都有影响

缺失值如何产生的

Spark 大多时候处理的数据来自于业务系统中, 业务系统中可能会因为各种原因, 产生一些异常的数据

例如说因为前后端的判断失误, 提交了一些非法参数. 再例如说因为业务系统修改 MySQL 表结构产生的一些空值数据等. 总之在业务系统中出现缺失值其实是非常常见的一件事, 所以大数据系统就一定要考虑这件事.

缺失值的类型

常见的缺失值有两种

  • nullNaN 等特殊类型的值, 某些语言中 null 可以理解是一个对象, 但是代表没有对象, NaN 是一个数字, 可以代表不是数字

    针对这一类的缺失值, Spark 提供了一个名为 DataFrameNaFunctions 特殊类型来操作和处理

  • "Null""NA"" " 等解析为字符串的类型, 但是其实并不是常规字符串数据

    针对这类字符串, 需要对数据集进行采样, 观察异常数据, 总结经验, 各个击破

DataFrameNaFunctions

DataFrameNaFunctions 使用 Dataset 的 na 函数来获取

val df = ...
val naFunc: DataFrameNaFunctions = df.na

当数据集中出现缺失值的时候, 大致有两种处理方式, 一个是丢弃, 一个是替换为某值, DataFrameNaFunctions 中包含一系列针对空值数据的方案

  • DataFrameNaFunctions.drop 可以在当某行中包含 null 或 NaN 的时候丢弃此行
  • DataFrameNaFunctions.fill 可以在将 null 和 NaN 充为其它值
  • DataFrameNaFunctions.replace 可以把 null 或 NaN 替换为其它值, 但是和 fill 略有一些不同, 这个方法针对值来进行替换
如何使用 SparkSQL 处理 null 和 NaN ?

首先要将数据读取出来, 此次使用的数据集直接存在 NaN, 在指定 Schema 后, 可直接被转为 Double.NaN

val schema = StructType(
  List(
    StructField("id", IntegerType),
    StructField("year", IntegerType),
    StructField("month", IntegerType),
    StructField("day", IntegerType),
    StructField("hour", IntegerType),
    StructField("season", IntegerType),
    StructField("pm", DoubleType)
  )
)

val df = spark.read
  .option("header", value = true)
  .schema(schema)
  .csv("dataset/beijingpm_with_nan.csv")

对于缺失值的处理一般就是丢弃和填充

丢弃包含 null 和 NaN 的行

当某行数据所有值都是 null 或者 NaN 的时候丢弃此行

df.na.drop("all").show()

当某行中特定列所有值都是 null 或者 NaN 的时候丢弃此行

df.na.drop("all", List("pm", "id")).show()

当某行数据任意一个字段为 null 或者 NaN 的时候丢弃此行

df.na.drop().show()
df.na.drop("any").show()

当某行中特定列任意一个字段为 null 或者 NaN 的时候丢弃此行

df.na.drop(List("pm", "id")).show()
df.na.drop("any", List("pm", "id")).show()
填充包含 null 和 NaN 的列

填充所有包含 null 和 NaN 的列

df.na.fill(0).show()

填充特定包含 null 和 NaN 的列

df.na.fill(0, List("pm")).show()

根据包含 null 和 NaN 的列的不同来填充

import scala.collection.JavaConverters._

df.na.fill(Map[String, Any]("pm" -> 0).asJava).show
如何使用 SparkSQL 处理异常字符串 ?

读取数据集, 这次读取的是最原始的那个 PM 数据集

val df = spark.read
  .option("header", value = true)
  .csv("dataset/BeijingPM20100101_20151231.csv")

使用函数直接转换非法的字符串

df.select(‘No as "id", ‘year, ‘month, ‘day, ‘hour, ‘season,
    when(‘PM_Dongsi === "NA", 0)
    .otherwise(‘PM_Dongsi cast DoubleType)
    .as("pm"))
  .show()

使用 where 直接过滤

df.select(‘No as "id", ‘year, ‘month, ‘day, ‘hour, ‘season, ‘PM_Dongsi)
  .where(‘PM_Dongsi =!= "NA")
  .show()

使用 DataFrameNaFunctions 替换, 但是这种方式被替换的值和新值必须是同类型

df.select(‘No as "id", ‘year, ‘month, ‘day, ‘hour, ‘season, ‘PM_Dongsi)
  .na.replace("PM_Dongsi", Map("NA" -> "NaN"))
  .show()

10. 聚合

导读

  1. groupBy
  2. rollup
  3. cube
  4. pivot
  5. RelationalGroupedDataset 上的聚合操作
groupBy

groupBy 算子会按照列将 Dataset 分组, 并返回一个 RelationalGroupedDataset 对象, 通过 RelationalGroupedDataset 可以对分组进行聚合

Step 1: 加载实验数据
private val spark = SparkSession.builder()
    .master("local[6]")
    .appName("aggregation")
    .getOrCreate()

  import spark.implicits._

  private val schema = StructType(
    List(
      StructField("id", IntegerType),
      StructField("year", IntegerType),
      StructField("month", IntegerType),
      StructField("day", IntegerType),
      StructField("hour", IntegerType),
      StructField("season", IntegerType),
      StructField("pm", DoubleType)
    )
  )

  private val pmDF = spark.read
    .schema(schema)
    .option("header", value = true)
    .csv("dataset/pm_without_null.csv")
Step 2: 使用 functions 函数进行聚合
import org.apache.spark.sql.functions._

val groupedDF: RelationalGroupedDataset = pmDF.groupBy(‘year)

groupedDF.agg(avg(‘pm) as "pm_avg")
  .orderBy(‘pm_avg)
  .show()
Step 3: 除了使用 functions 进行聚合, 还可以直接使用 RelationalGroupedDataset 的 API 进行聚合
groupedDF.avg("pm")
  .orderBy(‘pm_avg)
  .show()

groupedDF.max("pm")
  .orderBy(‘pm_avg)
  .show()
多维聚合

我们可能经常需要针对数据进行多维的聚合, 也就是一次性统计小计, 总计等, 一般的思路如下

Step 1: 准备数据
private val spark = SparkSession.builder()
  .master("local[6]")
  .appName("aggregation")
  .getOrCreate()

import spark.implicits._

private val schemaFinal = StructType(
  List(
    StructField("source", StringType),
    StructField("year", IntegerType),
    StructField("month", IntegerType),
    StructField("day", IntegerType),
    StructField("hour", IntegerType),
    StructField("season", IntegerType),
    StructField("pm", DoubleType)
  )
)

private val pmFinal = spark.read
  .schema(schemaFinal)
  .option("header", value = true)
  .csv("dataset/pm_final.csv")
Step 2: 进行多维度聚合
import org.apache.spark.sql.functions._

val groupPostAndYear = pmFinal.groupBy(‘source, ‘year)
  .agg(sum("pm") as "pm")

val groupPost = pmFinal.groupBy(‘source)
  .agg(sum("pm") as "pm")
  .select(‘source, lit(null) as "year", ‘pm)

groupPostAndYear.union(groupPost)
  .sort(‘source, ‘year asc_nulls_last, ‘pm)
  .show()

大家其实也能看出来, 在一个数据集中又小计又总计, 可能需要多个操作符, 如何简化呢? 请看下面

rollup 操作符

rollup 操作符其实就是 groupBy 的一个扩展, rollup 会对传入的列进行滚动 groupBygroupBy 的次数为列数量 + 1, 最后一次是对整个数据集进行聚合

Step 1: 创建数据集
import org.apache.spark.sql.functions._

val sales = Seq(
  ("Beijing", 2016, 100),
  ("Beijing", 2017, 200),
  ("Shanghai", 2015, 50),
  ("Shanghai", 2016, 150),
  ("Guangzhou", 2017, 50)
).toDF("city", "year", "amount")
Step 1: rollup 的操作
sales.rollup("city", "year")
  .agg(sum("amount") as "amount")
  .sort($"city".desc_nulls_last, $"year".asc_nulls_last)
  .show()

/**
  * 结果集:
  * +---------+----+------+
  * |     city|year|amount|
  * +---------+----+------+
  * | Shanghai|2015|    50| <-- 上海 2015 的小计
  * | Shanghai|2016|   150|
  * | Shanghai|null|   200| <-- 上海的总计
  * |Guangzhou|2017|    50|
  * |Guangzhou|null|    50|
  * |  Beijing|2016|   100|
  * |  Beijing|2017|   200|
  * |  Beijing|null|   300|
  * |     null|null|   550| <-- 整个数据集的总计
  * +---------+----+------+
  */
Step 2: 如果使用基础的 groupBy 如何实现效果?
val cityAndYear = sales
  .groupBy("city", "year") // 按照 city 和 year 聚合
  .agg(sum("amount") as "amount")

val city = sales
  .groupBy("city") // 按照 city 进行聚合
  .agg(sum("amount") as "amount")
  .select($"city", lit(null) as "year", $"amount")

val all = sales
  .groupBy() // 全局聚合
  .agg(sum("amount") as "amount")
  .select(lit(null) as "city", lit(null) as "year", $"amount")

cityAndYear
  .union(city)
  .union(all)
  .sort($"city".desc_nulls_last, $"year".asc_nulls_last)
  .show()

/**
  * 统计结果:
  * +---------+----+------+
  * |     city|year|amount|
  * +---------+----+------+
  * | Shanghai|2015|    50|
  * | Shanghai|2016|   150|
  * | Shanghai|null|   200|
  * |Guangzhou|2017|    50|
  * |Guangzhou|null|    50|
  * |  Beijing|2016|   100|
  * |  Beijing|2017|   200|
  * |  Beijing|null|   300|
  * |     null|null|   550|
  * +---------+----+------+
  */

很明显可以看到, 在上述案例中, rollup 就相当于先按照 cityyear 进行聚合, 后按照 city 进行聚合, 最后对整个数据集进行聚合, 在按照 city 聚合时, year 列值为 null, 聚合整个数据集的时候, 除了聚合列, 其它列值都为 null

使用 rollup 完成 pm 值的统计

上面的案例使用 rollup 来实现会非常的简单

import org.apache.spark.sql.functions._

pmFinal.rollup(‘source, ‘year)
  .agg(sum("pm") as "pm_total")
  .sort(‘source.asc_nulls_last, ‘year.asc_nulls_last)
  .show()
cube

cube 的功能和 rollup 是一样的, 但也有区别, 区别如下

  • rollup(A, B).sum©

    其结果集中会有三种数据形式: A B CA null Cnull null C

    不知道大家发现没, 结果集中没有对 B 列的聚合结果

  • cube(A, B).sum©

    其结果集中会有四种数据形式: A B CA null Cnull null Cnull B C

    不知道大家发现没, 比 rollup 的结果集中多了一个 null B C, 也就是说, rollup 只会按照第一个列来进行组合聚合, 但是 cube 会将全部列组合聚合

import org.apache.spark.sql.functions._

pmFinal.cube(‘source, ‘year)
  .agg(sum("pm") as "pm_total")
  .sort(‘source.asc_nulls_last, ‘year.asc_nulls_last)
  .show()

/**
  * 结果集为
  *
  * +-------+----+---------+
  * | source|year| pm_total|
  * +-------+----+---------+
  * | dongsi|2013| 735606.0|
  * | dongsi|2014| 745808.0|
  * | dongsi|2015| 752083.0|
  * | dongsi|null|2233497.0|
  * |us_post|2010| 841834.0|
  * |us_post|2011| 796016.0|
  * |us_post|2012| 750838.0|
  * |us_post|2013| 882649.0|
  * |us_post|2014| 846475.0|
  * |us_post|2015| 714515.0|
  * |us_post|null|4832327.0|
  * |   null|2010| 841834.0| <-- 新增
  * |   null|2011| 796016.0| <-- 新增
  * |   null|2012| 750838.0| <-- 新增
  * |   null|2013|1618255.0| <-- 新增
  * |   null|2014|1592283.0| <-- 新增
  * |   null|2015|1466598.0| <-- 新增
  * |   null|null|7065824.0|
  * +-------+----+---------+
  */
SparkSQL 中支持的 SQL 语句实现 cube 功能

SparkSQL 支持 GROUPING SETS 语句, 可以随意排列组合空值分组聚合的顺序和组成, 既可以实现 cube 也可以实现 rollup的功能

pmFinal.createOrReplaceTempView("pm_final")
spark.sql(
  """
    |select source, year, sum(pm)
    |from pm_final
    |group by source, year
    |grouping sets((source, year), (source), (year), ())
    |order by source asc nulls last, year asc nulls last
  """.stripMargin)
  .show()
RelationalGroupedDataset

常见的 RelationalGroupedDataset 获取方式有三种

  • groupBy
  • rollup
  • cube

无论通过任何一种方式获取了 RelationalGroupedDataset 对象, 其所表示的都是是一个被分组的 DataFrame, 通过这个对象, 可以对数据集的分组结果进行聚合

val groupedDF: RelationalGroupedDataset = pmDF.groupBy(‘year)

需要注意的是, RelationalGroupedDataset 并不是 DataFrame, 所以其中并没有 DataFrame 的方法, 只有如下一些聚合相关的方法, 如下这些方法在调用过后会生成 DataFrame 对象, 然后就可以再次使用 DataFrame 的算子进行操作了

操作符 解释

avg


求平均数


count


求总数


max


求极大值


min


求极小值


mean


求均数


sum


求和


agg

聚合, 可以使用 sql.functions 中的函数来配合进行操作

pmDF.groupBy(‘year)
    .agg(avg(‘pm) as "pm_avg")

11. 连接

导读

  1. 无类型连接 join
  2. 连接类型 Join Types
无类型连接算子 join 的 API
Step 1: 什么是连接

按照 PostgreSQL 的文档中所说, 只要能在一个查询中, 同一时间并发的访问多条数据, 就叫做连接.

做到这件事有两种方式

  1. 一种是把两张表在逻辑上连接起来, 一条语句中同时访问两张表

    select * from user join address on user.address_id = address.id
  2. 还有一种方式就是表连接自己, 一条语句也能访问自己中的多条数据
    select * from user u1 join (select * from user) u2 on u1.id = u2.id
Step 2: join 算子的使用非常简单, 大致的调用方式如下
join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
Step 3: 简单连接案例

表结构如下

+---+------+------+            +---+---------+
| id|  name|cityId|            | id|     name|
+---+------+------+            +---+---------+
|  0|  Lucy|     0|            |  0|  Beijing|
|  1|  Lily|     0|            |  1| Shanghai|
|  2|   Tim|     2|            |  2|Guangzhou|
|  3|Danial|     0|            +---+---------+
+---+------+------+

如果希望对这两张表进行连接, 首先应该注意的是可以连接的字段, 比如说此处的左侧表 cityId 和右侧表 id 就是可以连接的字段, 使用 join 算子就可以将两个表连接起来, 进行统一的查询

val person = Seq((0, "Lucy", 0), (1, "Lily", 0), (2, "Tim", 2), (3, "Danial", 0))
  .toDF("id", "name", "cityId")

val cities = Seq((0, "Beijing"), (1, "Shanghai"), (2, "Guangzhou"))
  .toDF("id", "name")

person.join(cities, person.col("cityId") === cities.col("id"))
  .select(person.col("id"),
    person.col("name"),
    cities.col("name") as "city")
  .show()

/**
  * 执行结果:
  *
  * +---+------+---------+
  * | id|  name|     city|
  * +---+------+---------+
  * |  0|  Lucy|  Beijing|
  * |  1|  Lily|  Beijing|
  * |  2|   Tim|Guangzhou|
  * |  3|Danial|  Beijing|
  * +---+------+---------+
  */
Step 4: 什么是连接?

现在两个表连接得到了如下的表

+---+------+---------+
| id|  name|     city|
+---+------+---------+
|  0|  Lucy|  Beijing|
|  1|  Lily|  Beijing|
|  2|   Tim|Guangzhou|
|  3|Danial|  Beijing|
+---+------+---------+

通过对这张表的查询, 这个查询是作用于两张表的, 所以是同一时间访问了多条数据

spark.sql("select name from user_city where city = ‘Beijing‘").show()

/**
  * 执行结果
  *
  * +------+
  * |  name|
  * +------+
  * |  Lucy|
  * |  Lily|
  * |Danial|
  * +------+
  */

连接类型

如果要运行如下代码, 需要先进行数据准备

private val spark = SparkSession.builder()
  .master("local[6]")
  .appName("aggregation")
  .getOrCreate()

import spark.implicits._

val person = Seq((0, "Lucy", 0), (1, "Lily", 0), (2, "Tim", 2), (3, "Danial", 3))
  .toDF("id", "name", "cityId")
person.createOrReplaceTempView("person")

val cities = Seq((0, "Beijing"), (1, "Shanghai"), (2, "Guangzhou"))
  .toDF("id", "name")
cities.createOrReplaceTempView("cities")
连接类型 类型字段 解释

交叉连接


cross

解释

交叉连接就是笛卡尔积, 就是两个表中所有的数据两两结对

交叉连接是一个非常重的操作, 在生产中, 尽量不要将两个大数据集交叉连接, 如果一定要交叉连接, 也需要在交叉连接后进行过滤, 优化器会进行优化

SQL 语句
select * from person cross join cities
Dataset 操作
person.crossJoin(cities)
  .where(person.col("cityId") === cities.col("id"))
  .show()

内连接


inner

解释

内连接就是按照条件找到两个数据集关联的数据, 并且在生成的结果集中只存在能关联到的数据

SQL 语句
select * from person inner join cities on person.cityId = cities.id
Dataset 操作
person.join(right = cities,
  joinExprs = person("cityId") === cities("id"),
  joinType = "inner")
  .show()

全外连接


outerfullfullouter

解释

内连接和外连接的最大区别, 就是内连接的结果集中只有可以连接上的数据, 而外连接可以包含没有连接上的数据, 根据情况的不同, 外连接又可以分为很多种, 比如所有的没连接上的数据都放入结果集, 就叫做全外连接

SQL 语句
select * from person full outer join cities on person.cityId = cities.id
Dataset 操作
person.join(right = cities,
  joinExprs = person("cityId") === cities("id"),
  joinType = "full") // "outer", "full", "full_outer"
  .show()

左外连接


leftouterleft

解释

左外连接是全外连接的一个子集, 全外连接中包含左右两边数据集没有连接上的数据, 而左外连接只包含左边数据集中没有连接上的数据

SQL 语句
select * from person left join cities on person.cityId = cities.id
Dataset 操作
person.join(right = cities,
  joinExprs = person("cityId") === cities("id"),
  joinType = "left") // leftouter, left
  .show()

LeftAnti


leftanti

解释

LeftAnti 是一种特殊的连接形式, 和左外连接类似, 但是其结果集中没有右侧的数据, 只包含左边集合中没连接上的数据

SQL 语句
select * from person left anti join cities on person.cityId = cities.id
Dataset 操作
person.join(right = cities,
  joinExprs = person("cityId") === cities("id"),
  joinType = "left_anti")
  .show()

LeftSemi


leftsemi

解释

和 LeftAnti 恰好相反, LeftSemi 的结果集也没有右侧集合的数据, 但是只包含左侧集合中连接上的数据

SQL 语句
select * from person left semi join cities on person.cityId = cities.id
Dataset 操作
person.join(right = cities,
  joinExprs = person("cityId") === cities("id"),
  joinType = "left_semi")
  .show()

右外连接


rightouterright

解释

右外连接和左外连接刚好相反, 左外是包含左侧未连接的数据, 和两个数据集中连接上的数据, 而右外是包含右侧未连接的数据, 和两个数据集中连接上的数据

SQL 语句
select * from person right join cities on person.cityId = cities.id
Dataset 操作
person.join(right = cities,
  joinExprs = person("cityId") === cities("id"),
  joinType = "right") // rightouter, right
  .show()
[扩展] 广播连接
Step 1: 正常情况下的 Join 过程

Join 会在集群中分发两个数据集, 两个数据集都要复制到 Reducer 端, 是一个非常复杂和标准的 ShuffleDependency, 有什么可以优化效率吗?

Step 2: Map 端 Join

前面图中看的过程, 之所以说它效率很低, 原因是需要在集群中进行数据拷贝, 如果能减少数据拷贝, 就能减少开销

如果能够只分发一个较小的数据集呢?

可以将小数据集收集起来, 分发给每一个 Executor, 然后在需要 Join 的时候, 让较大的数据集在 Map 端直接获取小数据集, 从而进行 Join, 这种方式是不需要进行 Shuffle 的, 所以称之为 Map 端 Join

Step 3: Map 端 Join 的常规实现

如果使用 RDD 的话, 该如何实现 Map 端 Join 呢?

val personRDD = spark.sparkContext.parallelize(Seq((0, "Lucy", 0),
  (1, "Lily", 0), (2, "Tim", 2), (3, "Danial", 3)))

val citiesRDD = spark.sparkContext.parallelize(Seq((0, "Beijing"),
  (1, "Shanghai"), (2, "Guangzhou")))

val citiesBroadcast = spark.sparkContext.broadcast(citiesRDD.collectAsMap())

val result = personRDD.mapPartitions(
  iter => {
    val citiesMap = citiesBroadcast.value
    // 使用列表生成式 yield 生成列表
    val result = for (person <- iter if citiesMap.contains(person._3))
      yield (person._1, person._2, citiesMap(person._3))
    result
  }
).collect()

result.foreach(println(_))
Step 4: 使用 Dataset 实现 Join 的时候会自动进行 Map 端 Join

自动进行 Map 端 Join 需要依赖一个系统参数 spark.sql.autoBroadcastJoinThreshold, 当数据集小于这个参数的大小时, 会自动进行 Map 端 Join

如下, 开启自动 Join

println(spark.conf.get("spark.sql.autoBroadcastJoinThreshold").toInt / 1024 / 1024)

println(person.crossJoin(cities).queryExecution.sparkPlan.numberedTreeString)

当关闭这个参数的时候, 则不会自动 Map 端 Join 了

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
println(person.crossJoin(cities).queryExecution.sparkPlan.numberedTreeString)
Step 5: 也可以使用函数强制开启 Map 端 Join

在使用 Dataset 的 join 时, 可以使用 broadcast 函数来实现 Map 端 Join

import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
println(person.crossJoin(broadcast(cities)).queryExecution.sparkPlan.numberedTreeString)

即使是使用 SQL 也可以使用特殊的语法开启

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
val resultDF = spark.sql(
  """
    |select /*+ MAPJOIN (rt) */ * from person cross join cities rt
  """.stripMargin)
println(resultDF.queryExecution.sparkPlan.numberedTreeString)

原文地址:https://www.cnblogs.com/mediocreWorld/p/11626168.html

时间: 2024-10-15 06:41:36

Update:sparksql:第3节 Dataset (DataFrame) 的基础操作 & 第4节 SparkSQL_聚合操作_连接操作的相关文章

Android零基础入门第12节:熟悉Android Studio界面,开始装逼卖萌

通过前两期的学习,我们可以正确搭建好Android Studio的开发环境,也创建了HelloWorld工程并成功运行,那么本期就带你一起来了解这款强大的IDE开发工具吧. 一.Android Studio主页面板 重新打开Android Studio会进入如下主页面,与第一次打开有略微差别. 从上图可以看到,一共有四大区域,接下来分别简单介绍: 区域1为最近新建或打开的Android Studio工程列表,便于快速进入对应项目.首次使用时该区域为空,列表内容也可以进行删除操作. 区域2为And

Android零基础入门第44节:ListView数据动态更新

经过前面几期的学习,关于ListView的一些基本用法大概学的差不多了,但是你可能发现了,所有ListView里面要填充的数据都是静态的,但在实际开发中,这些数据往往都是动态变化的,比如数据内容发生改变.增加几行.或者删除几行,这就涉及到ListView数据的更新问题. 接下来通过一个简单的示例程序来学习ListView的数据更新. 继续使用WidgetSample工程,在app/main/res/layout/目录下创建updatedata_layout.xml文件,在其中填充如下代码片段:

Android零基础入门第64节:揭开RecyclerView庐山真面目

大家还记得之前在第38期~第50期都在学习列表控件吗,其中用了8期讲ListView的使用,相信都已经掌握好了吧.那么本期一起来学习Android 5.X新增的一个列表组件,那就是RecyclerView的使用. 一.RecyclerView概述 从前面的学习我们知道,ListView的功能非常强大,几乎绝大部分应用程序都会使用到,虽然也学会一些方法技巧来提升ListView的效率,但其性能还是不是很完美. 另外ListView的可扩展性相对来说比较弱,以前要实现每个列表项的高度不同的界面,或者

Android零基础入门第61节:滚动视图ScrollView

原文:Android零基础入门第61节:滚动视图ScrollView 前面几期学习了ProgressBar系列组件.ViewAnimator系列组件.Picker系列组件和时间日期系列组件,接下来几期继续来学习常见的其他组件. 一.ScrollView概述 从前面的学习有的同学可能已经发现,当拥有很多内容时屏幕显示不完,显示不全的部分完全看不见.但是在实际项目里面,很多内容都不止一个屏幕宽度或高度,那怎么办呢?那就需要本节学习的ScrollView来完成. 在默认情况下,ScrollView只是

Android零基础入门第60节:日历视图CalendarView和定时器Chronometer

原文:Android零基础入门第60节:日历视图CalendarView和定时器Chronometer 上一期学习了AnalogClock.DigitalClock和TextClock时钟组件,本期继续来学习日历视图CalendarView和定时器Chronometer. 一.CalendarView 日历视图(CalendarView)可用于显示和选择日期,用户既可选择一个日期,也可通过触 摸来滚动日历.如果希望监控该组件的日期改变,则可调用CalendarView的 setOnDateCha

Android零基础入门第62节:搜索框组件SearchView

原文:Android零基础入门第62节:搜索框组件SearchView 一.SearchView概述 SearchView是搜索框组件,它可以让用户在文本框内输入文字,并允许通过监听器监控用户输入,当用户输入完成后提交搜索时,也可通过监听器执行实际的搜索. SearchView默认是展示一个search的icon,点击icon展开搜索框,也可以自己设定图标.用SearchView时可指定如下表所示的常见XML属性及相关方法. 如果为SearchView增加一个配套的ListView,则可以为Se

Android零基础入门第16节:Android用户界面开发概述

相信通过前面15期的学习,Android的开发环境已经基本掌握了,如果仍有问题,欢迎到Android零基础入门技术讨论微信群交流,从本期开始正式来一步一步踏入Android开发之路. Android应用开发的一项内容就是用户界面的开发.Android提供了非常丰富的用户界面组件,借助于这些用户界面组件,开发者可以非常方便地进行用户界面开发,而且可以开发出非常优秀的用户界面. 一.界面UI元素介绍 Android应用是运行于手机系统上的程序,这种程序给用户的第一印象就是用户界面.接下来从以下几个方

Android零基础入门第58节:数值选择器NumberPicker

原文:Android零基础入门第58节:数值选择器NumberPicker 上一期学习了日期选择器DatePicker和时间选择器TimePicker,是不是感觉非常简单,本期继续来学习数值选择器NumberPicker . 一.NumberPicker概述 NumberPicker 是用于选择一组预定义好数字的组件,用户既可以通过键盘输入数值,也可以通过滚动来选择数值. NumberPicker的常用方法如下: setMinValue(int minVal):设置该组件支持的最小值. setM

Android零基础入门第40节:自定义ArrayAdapter

ListView用起来还是比较简单的,也是Android应用程序中最重要的一个组件,但其他ListView可以随你所愿,能够完成很多想要的精美列表,而这正是我们接下来要学习的内容. 一.自定义ArrayAdapter 从上期自定义列表项示例知道,每个列表项的图标都一样,如果需要每个列表项的图标根据内容动态表示,Android系统的ArrayAdapter就无能为力了,就只能使用自定义ArrayAdapter来实现啦. 做法就是创建一个ArrayAdapter的子类,重写其getView()方法,