第0章 预备知识0.1 Scala0.1.1 Scala 操作符0.1.2 拉链操作0.2 Spark Core0.2.1 Spark RDD 持久化0.2.2 Spark 共享变量0.3 Spark SQL0.3.1 RDD、DataFrame 与 DataSet0.3.2 DataSet 与 RDD 互操作0.3.3 RDD、DataFrame 与 DataSet 之间的转换0.3.4 用户自定义聚合函数(UDAF)0.3.5 开窗函数0.4 Spark Streaming0.4.1 Dstream transformation 算子概览0.4.2 Dstream updataStateByKey 算子概览0.4.3 窗口操作0.4.4 Receiver 与 Direct0.5 Java0.5.1 对象池
第0章 预备知识
0.1 Scala
0.1.1 Scala 操作符
List 元素的追加
方式1-在列表的最后增加数据
方式2-在列表的最前面增加数据
方式3-在列表的最后增加数据
示例代码如下:
object ListDemo01 { def main(args: Array[String]): Unit = { // 说明 // 1. 在默认情况下 List 是 scala.collection.immutable.List 即不可变 // 2. 在 scala 中,List 就是不可变的,如需要使用可变的 List,则需要使用 ListBuffer // 3. List 在 package object scala 中做了声明 val List = scala.collection.immutable.List // 4. val Nil = scala.collection.immutable.Nil // List()
val list01 = List(1, 2, 3, "Hello") // 创建时,直接分配元素 println(list01) // List(1, 2, 3, Hello)
val list02 = Nil // 空集合 println(list02) // List()
// 访问 List 的元素 val value1 = list01(1) // 1是索引,表示取出第2个元素 println("value1=" + value1) // 2
println("====================list追加元素后的效果====================") // 通过 :+ 和 +: 给 list 追加元素(本身的集合并没有变化) val list1 = List(1, 2, 3, "abc") // :+ 运算符表示在列表的最后增加数据 val list2 = list1 :+ 4 // (1,2,3,"abc", 4) println(list1) // list1 没有变化 (1, 2, 3, "abc"),说明 list1 还是不可变 println(list2) // 新的列表结果是 [1, 2, 3, "abc", 4]
val list3 = 10 +: list1 // (10, 1, 2, 3, "abc") println("list3=" + list3)
// :: 符号的使用 val list4 = List(1, 2, 3, "abc") // 说明 val list5 = 4 :: 5 :: 6 :: list4 :: Nil 步骤: // 1. List() // 2. List(List(1, 2, 3, "abc")) // 3. List(6, List(1, 2, 3, "abc")) // 4. List(5, 6, List(1, 2, 3, "abc")) // 5. List(4, 5, 6, List(1, 2, 3, "abc")) val list5 = 4 :: 5 :: 6 :: list4 :: Nil println("list5=" + list5)
// ::: 符号的使用 // 说明 val list6 = 4 :: 5 :: 6 :: list4 ::: Nil 步骤: // 1. List() // 2. List(1, 2, 3, "abc") // 3. List(6, 1, 2, 3, "abc") // 4. List(5, 6, 1, 2, 3, "abc") // 5. List(4, 5, 6, 1, 2, 3, "abc") // 下面等价 4 :: 5 :: 6 :: list4 val list6 = 4 :: 5 :: 6 :: list4 ::: Nil println("list6=" + list6) }}
输出结果如下:
List(1, 2, 3, Hello)List()value1=2====================list追加元素后的效果====================List(1, 2, 3, abc)List(1, 2, 3, abc, 4)list3=List(10, 1, 2, 3, abc)list5=List(4, 5, 6, List(1, 2, 3, abc))list6=List(4, 5, 6, 1, 2, 3, abc)
0.1.2 拉链操作
把一对集合 A 和 B 的包含的元素合成到一个集合中:
object zipTest01 { def main(args: Array[String]): Unit = { val prices1 = List(5.0, 20.0, 9.95) val quantities1 = List(10, 2, 1) println(prices1.zip(quantities1))
println("----------------------------------")
val prices2 = List(5.0, 20.0, 9.95) val quantities2 = List(10, 2) println(prices2.zip(quantities2))
println("----------------------------------")
val prices3 = List(5.0, 20.0, 9.95) val quantities3 = List(10, 2) println(prices3.zipAll(quantities3, 9.95, 1))
println("----------------------------------")
val prices4 = List(5.0, 20.0, 9.95) val quantities4 = List(10, 2, 1) println(prices4.zipWithIndex) println(quantities4.zipWithIndex) }}
运行结果:
List((5.0,10), (20.0,2), (9.95,1))----------------------------------List((5.0,10), (20.0,2))----------------------------------List((5.0,10), (20.0,2), (9.95,1))----------------------------------List((5.0,0), (20.0,1), (9.95,2))List((10,0), (2,1), (1,2))
这个方法之所以叫“拉链(zip)”,是因为它就像拉链的齿状结构一样将两个集合结合在一起。注意
:如果一个集合比另一个集合短, 那么结果中的对偶数量和较短的那个集合的元素数量相同。
zipAll 方法可以让你指定较短列表的缺省值。
zipWithIndex 方法返回对偶的列表,其中每个对偶中第二个组成部分是每个元素的下标。
0.2 Spark Core
0.2.1 Spark RDD 持久化
Spark 非常重要的一个功能特性就是可以将 RDD 持久化在内存中,当对 RDD 执行持久化操作时,每个节点都会将自己操作的 RDD 的 partition 持久化到内存中,并且在之后对该 RDD 的反复使用中,直接使用内存的 partition。这样的话,对于针对一个 RDD 反复执行多个操作的场景, 就只要对 RDD 计算一次即可,后面直接使用该 RDD,而不需要反复计算多次该 RDD
。
巧妙使用 RDD 持久化,甚至在某些场景下,可以将 Spark 应用程序的性能提高 10 倍。对于迭代式算法和快速交互式应用来说,RDD 持久化是非常重要的。
例如,读取一个有着数十万行数据的 HDFS 文件,形成 linesRDD,这一读取过程会消耗大量时间,在 count 操作结束后,linesRDD 会被丢弃,会被后续的数据覆盖,当第二次再次使用 count 时,又需要重新读取 HDFS 文件数据,再次形成新的 linesRDD,这会导致反复消耗大量时间,会严重降低系统性能。
如果在读取完成后将 linesRDD 缓存起来,那么下一次执行 count 操作时将会直接使用缓存起来的 linesRDD,这会节省大量的时间。
要持久化一个 RDD,只要调用其 cache() 或者 persist() 方法即可。在该 RDD 第一次被计算出来时,就会直接缓存在每个节点中,而且 Spark 的持久化机制还是自动容错
的,如果持久化的 RDD 的任何 partition 丢失了,那么 Spark 会自动通过其源 RDD,使用 transformation 操作重新计算该 partition。
cache() 和 persist() 的区别在于,cache() 是 persist() 的一种简化方式,cache() 的底层就是调用的 persist() 的无参版本,同时就是调用 persist(MEMORY_ONLY),将输入持久化到内存中
。如果需要从内存中清除缓存,那么可以使用 unpersist() 方法。
Spark 自己也会在 shuffle 操作时,进行数据的持久化,比如写入磁盘,主要是为了在节点失败时,避免需要重新计算整个过程。
以下为对一个 156 万行大小为 168MB 的文本文件进行处理, textFile 后只进行 count 操作,持久化与不持久化的结果如下:
0.2.2 Spark 共享变量
Spark 一个非常重要的特性就是共享变量
。
默认情况下,如果在一个算子的函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个 task 中
,此时每个 task 只能操作自己的那份变量副本。如果多个 task 想要共享某个变量,那么这种方式是做不到的。
Spark 为此提供了两种共享变量,一种是 Broadcast Variable(广播变量)
,另一种是 Accumulator(累加变量)
。Broadcast Variable 会将用到的变量,仅仅为每个节点拷贝一份
,更大的用途是优化性能,减少网络传输以及内存损耗。Accumulator 则可以让多个 task 共同操作一份变量
,主要可以进行累加操作。Broadcast Variable 是共享读变量,task 不能去修改它,而 Accumulator 可以让多个 task 操作一个变量。
1.广播变量
广播变量允许程序员在每个机器上保留缓存的只读变量,而不是给每个任务发送一个副本
。例如,可以使用它们以有效的方式为每个节点提供一个大型输入数据集的副本。Spark 还尝试使用高效的广播算法分发广播变量,以降低通信成本。
Spark action 被划分为多个 Stages,被多个 “shuffle” 操作(宽依赖)所分割。Spark 自动广播每个阶段任务所需的公共数据(一个 Stage 中多个 task 使用的数据),以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前反序列化
。这意味着,显式创建广播变量仅在跨多个阶段的任务需要相同数据或者以反序列化格式缓存数据很重要时才有用。
Spark 提供的 Broadcast Variable 是只读的
,并且在每个节点上只会有一个副本,而不会为每个 task 都拷贝一份副本,因此,它的最大作用,就是减少变量到各个节点的网络传输消耗,以及在各个节点上的内存消耗
。此外,Spark 内部也使用了高效的广播算法来减少网络消耗。
可以通过调用 SparkContext 的 broadcast() 方法来针对每个变量创建广播变量。然后在算子的函数内,使用到广播变量时,每个节点只会拷贝一份副本了,每个节点可以使用广播变量的 value() 方法获取值。
2.累加器
累加器(accumulator):Accumulator 是仅仅被相关操作累加的变量
,因此可以在并行中被有效地支持。它们可用于实现计数器
(如 MapReduce)或总和计数
。
Accumulator 是存在于 Driver 端的,从节点不断把值发到 Driver 端,在 Driver端计数
(Spark UI 在 SparkContext 创建时被创建, 即在 Driver 端被创建,因此它可以读取 Accumulator 的数值),存在于 Driver 端的一个值,从节点是读取不到的
。
Spark 提供的 Accumulator 主要用于多个节点对一个变量进行共享性的操作
。
Accumulator 只提供了累加的功能,但是却给我们提供了多个 task 对于同一个变量并行操作的功能,但是 task 只能对 Accumulator 进行累加操作,不能读取它的值
,只有 Driver 程序可以读取 Accumulator 的值。
自定义累加器类型的功能在 1.X 版本中就已经提供了,但是使用起来比较麻烦,在 2.0 版本后, 累加器的易用性有了较大的改进,而且官方还提供了一个新的抽象类:AccumulatorV2
来提供更加友好的自定义类型累加器的实现方式。
官方同时给出了一个实现的示例: CollectionAccumulator 类, 这个类允许以集合的形式收集 spark 应用执行过程中的一些信息。例如,我们可以用这个类收集 Spark 处理数据时的一些细节,当然,由于累加器的值最终要汇聚到 driver 端,为了避免 driver 端的 outofmemory 问题,需要对收集的信息的规模要加以控制,不宜过大。
package com.atguigu.session
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable
/** * 自定义累加器 */class SessionStatisticAccumulator extends AccumulatorV2[String, mutable.HashMap[String, Int]]() {
// 自定义累加器:要求要在类的里面维护一个 mutable.HashMap 结构 val countMap = new mutable.HashMap[String, Int]()
// 判断累加器是否为空 override def isZero: Boolean = { this.countMap.isEmpty }
// 复制一个一模一样的累加器 override def copy(): AccumulatorV2[String, mutable.HashMap[String, Int]] = { val acc = new SessionStatisticAccumulator acc.countMap ++= this.countMap // 将两个 Map 拼接在一起 acc }
// 重置累加器 override def reset(): Unit = { this.countMap.clear() }
// 向累加器中添加 KV 对(K 存在,V 累加1,K 不存在,重新创建) override def add(k: String): Unit = { if (!this.countMap.contains(k)) { this.countMap += (k -> 0) }
this.countMap.update(k, this.countMap(k) + 1) }
// 两个累加器进行合并(先判断两个累加器是否是同一类型的,再将两个 Map 进行合并(是个小难点)) override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Int]]): Unit = { other match { // (1 : 100).foldLeft(0) 等价于 (0 : (1 to 100))(_+_) 又等价于 { case (int1, int2) => int1 + int2 } // acc.countMap.foldLeft(this.countMap) 等价于 this.countMap : acc.countMap 又等价于 this.countMap 和 acc.countMap 的每一个 KV 做操作 case acc: SessionStatisticAccumulator => acc.countMap.foldLeft(this.countMap) { case (map, (k, v)) => map += (k -> (map.getOrElse(k, 0) + v)) } } }
override def value: mutable.HashMap[String, Int] = { this.countMap }}
0.3 Spark SQL
0.3.1 RDD、DataFrame 与 DataSet
1、RDD
RDD,全称为 Resilient Distributed Datasets,即分布式数据集,是 Spark 中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可以并行计算的集合。在 Spark 中,对数据的所有操作不外乎创建 RDD、转化已有 RDD 以及调用 RDD 操作进行求值
。每个 RDD 都被分为多个分区, 这些分区运行在集群中的不同的节点上。RDD 可以包含 Python、Java、Scala 中任意类型的对象,甚至可以包含用户自定义的对象。RDD 具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD 允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升查询速度
。
RDD 支持两种操作:transformation
操作和 action
操作。RDD 的 transformation 操作是返回一个新的 RDD 的操作,比如 map 和 filter(),而 action 操作则是向驱动器程序返回结果或者把结果写入外部系统的操作,比如 count()和 first()。
2、DataFrame
DataFrame 是一个分布式数据容器
。相比于 RDD,DataFrame 更像传统数据库中的二维表格,除了数据之外,还记录数据的结构信息,即 schema。同时,与 Hive 类似,DataFrame 也支持嵌套数据类型(struct,array 和 map)。从 API 易用性的角度上看,DataFrame API 提供的是一套高层的关系操作,比函数式的 RDD API 要更加友好,门槛更低。由于与 R 和 Pandas 中的 DataFrame 类似, Spark DataFrame 很好地继承了传统单机数据分析的开放和体验。
如上图所示,左侧的 RDD[Person] 虽然以 Person 为类型参数,但是 Spark 框架本身不了解 Person 类的内部结构。而右侧的 DataFrame 却提供了详细的结构信息,使得 SparkSQL 可以清楚地知道该数据集中包含那些列,每列的名称是什么。DataFrame 多了数据的结构信息,即 schema。RDD 是分布式的 Java 对象的集合。
DataFrame 是分布式的 Row 对象的集合。DataFrame 处理提供了比 RDD 更为丰富的算子以外,更重要的是提升了执行效率、减少数据读取以及执行计划的优化,比如
filter 下推、裁剪等。
DataFrame 是 DataSet 的特例,DataFrame = DataSet[Row],所以可以通过 as 方法将 DataFrame 转换为 DataSet。Row 是一个类型, 跟 Car、Person 这些类型一样,所有的表结构信息都用 Row 来表示。
3、DataSet
DataSet 是 DataFrame API 的一个拓展,是 Spark 最新的数据抽象。DataSet 具有用户友好的 API 风格,既具有类型安全检查也具有 DataFrame 的查询优化特性。
DataSet 支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效率。
样例类被用来在 DataSet 中定义数据的结构信息,样例类中每个属性的名称直接映射到 DataSet 中的字段名称。
DataSet 是强类型的
。比如可以有 DataSet[Car],DataSet[Person]。
DataFrame 只知道字段,但是不知道字段的类型
,所以在执行这些操作的时候是没有办法在编译的时候检查是否类型失败的,比如你可以对一个 String 类型进行加减法操作,在执行的时候才会报错,而 DataSet 不仅仅知道字段,而且知道字段类型,所以有更为严格的错误检查
。就像 JSON 对象和类对象之间的类比。
0.3.2 DataSet 与 RDD 互操作
介绍一下 Spark 将 RDD 转换成 DataFrame 的两种方式:
1.通过反射获取 Schema:使用 case class 的方式,不过在 scala 2.10 中最大支持 22 个字段的 case class,这点需要注意;
2.通过编程获取 Schema:通过 spark 内部的 StructType 方式,将普通的 RDD 转换成 DataFrame。
DataSet与RDD互操作代码:
import org.apache.spark.SparkConfimport org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}import org.apache.spark.sql.{DataFrame, Row, SparkSession}
case class Person(name: String, age: Int)
object SparkRDDtoDF {
def main(agrs: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]") conf.set("spark.sql.warehouse.dir", "file:D:\\learn\\JetBrains\\workspace_idea3\\commerce_basic\\analyse\\product\\src\\test\\") conf.set("spark.sql.shuffle.partitions", "20")
val sparkSession = SparkSession.builder().appName("RDD to DataFrame").config(conf).getOrCreate()
// 通过代码的方式,设置 Spark log4j 的级别 sparkSession.sparkContext.setLogLevel("WARN")
import sparkSession.implicits._ // use case class convert RDD to DataFrame // 通过反射的方式完成 RDD 向 DataFrame 的转换 // val peopleDF = rddToDFCase(sparkSession)
// 通过编程的方式完成 RDD 向 DataFrame 的转换 val peopleDF = rddToDF(sparkSession)
peopleDF.show() peopleDF.select($"name", $"age").filter($"age" > 20).show() }
// 1. 通过编程获取 Schema def rddToDF(sparkSession: SparkSession): DataFrame = { // 设置 schema 结构 val schema = StructType(Seq(StructField("name", StringType, true), StructField("age", IntegerType, true))) val rowRDD = sparkSession.sparkContext .textFile("file:D:\\learn\\JetBrains\\workspace_idea3\\commerce_basic\\analyse\\product\\src\\test\\people.txt", 2) .map(x => x.split(",")).map(x => Row(x(0), x(1).trim().toInt))
sparkSession.createDataFrame(rowRDD, schema) }
// 2. 通过反射获取 Schema def rddToDFCase(sparkSession: SparkSession): DataFrame = { // 导入隐饰操作,否则 RDD 无法调用 toDF 方法 import sparkSession.implicits._ val peopleRDD = sparkSession.sparkContext .textFile("file:D:\\learn\\JetBrains\\workspace_idea3\\commerce_basic\\analyse\\product\\src\\test\\people.txt", 2) .map(x => x.split(",")).map(x => Person(x(0), x(1).trim().toInt)).toDF()
peopleRDD }}
0.3.3 RDD、DataFrame 与 DataSet 之间的转换
1、DataFrame/DataSet 转 RDD
val rdd1=testDF.rddval rdd2=testDS.rdd
2、RDD 转 DataFrame
import spark.implicits._ val testDF = rdd.map { line => (line._1, line._2) }.toDF("col1", "col2")
3、RDD 转 DataSet
参见 DataSet 与 RDD 互操作代码。
4、DataFrame 转 DataSet
import spark.implicits._val testDF = testDS.toDF
5、DataSet 转 DataFrame
import spark.implicits._// 定义字段名和类型case class Coltest(col1: String, col2: Int) extends Serializable val testDS = testDF.as[Coltest]
0.3.4 用户自定义聚合函数(UDAF)
1、弱类型 UDAF 函数
通过继承 UserDefinedAggregateFunction 来实现用户自定义聚合函数。
package com.atguigu.product
import org.apache.spark.sql.Rowimport org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}
/** * 自定义弱类型的聚合函数(UDAF) */class GroupConcatDistinct extends UserDefinedAggregateFunction { // 设置 UDAF 函数的输入类型为 String override def inputSchema: StructType = StructType(StructField("cityInfoInput", StringType) :: Nil)
// 设置 UDAF 函数的缓冲区类型为 String override def bufferSchema: StructType = StructType(StructField("cityInfoBuffer", StringType) :: Nil)
// 设置 UDAF 函数的输出类型为 String override def dataType: DataType = StringType
// 设置一致性检验,如果为 true,那么输入不变的情况下计算的结果也是不变的 override def deterministic: Boolean = true
// 初始化自定义的 UDAF 函数 // 设置聚合中间 buffer 的初始值 // 需要保证这个语义:两个初始 buffer 调用下面实现的 merge 方法后也应该为初始 buffer, // 即如果你初始值是1,然后你 merge 是执行一个相加的动作,两个初始 buffer 合并之后等于 2,不会等于初始 buffer 了, // 这样的初始值就是有问题的,所以初始值也叫"zero value" override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = "" }
// 设置 UDAF 函数的缓冲区更新:实现一个字符串带去重的拼接 // 用输入数据 input 更新 buffer 值,类似于 combineByKey override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { // 缓冲中的已经拼接过的城市信息串 var cityInfoBuffer = buffer.getString(0) // 刚刚传递进来的某个城市信息 val cityInfoInput = input.getString(0)
// 在这里要实现去重的逻辑 // 判断:之前没有拼接过某个城市信息,那么这里才可以接下去拼接新的城市信息 if (!cityInfoBuffer.contains(cityInfoInput)) { if ("".equals(cityInfoBuffer)) { cityInfoBuffer += cityInfoInput } else { // 比如 1:北京 // 1:北京,2:上海 cityInfoBuffer += "," + cityInfoInput } }
buffer.update(0, cityInfoBuffer) }
// 把两个自定义的 UDAF 函数的值合并在一起 // 合并两个 buffer, 将 buffer2 合并到 buffer1. 在合并两个分区聚合结果的时候会被用到, 类似于 reduceByKey // 这里要注意该方法没有返回值,在实现的时候是把 buffer2 合并到 buffer1 中去,你需要实现这个合并细节 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { // cityInfoBuffer1: cityId1:cityName1, cityId2:cityName2, cityId3:cityName3, ... var cityInfoBuffer1 = buffer1.getString(0) // cityInfoBuffer2: cityId1:cityName1, cityId2:cityName2, cityId3:cityName3, ... val cityInfoBuffer2 = buffer2.getString(0)
// 将 cityInfoBuffer2 中的数据带去重的加入到 cityInfoBuffer1 中 for (cityInfo <- cityInfoBuffer2.split(",")) { if (!cityInfoBuffer1.contains(cityInfo)) { if ("".equals(cityInfoBuffer1)) { cityInfoBuffer1 += cityInfo } else { cityInfoBuffer1 += "," + cityInfo } } }
buffer1.update(0, cityInfoBuffer1) }
// 计算并返回最终的聚合结果 override def evaluate(buffer: Row): Any = { buffer.getString(0) }}
2、强类型 UDAF 函数
通过继承 Aggregator 来实现强类型自定义聚合函数。
import org.apache.spark.sql.{Encoder, Encoders}import org.apache.spark.sql.expressions.Aggregator
// 定义 case 类case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)
object MyAverage extends Aggregator[Employee, Average, Double] {
/** * 计算并返回最终的聚合结果 */ def zero: Average = Average(0L, 0L)
/** * 根据传入的参数值更新 buffer 值 */ def reduce(buffer: Average, employee: Employee): Average = { buffer.sum += employee.salary buffer.count += 1 buffer }
/** * 合并两个 buffer 值,将 buffer2 的值合并到 buffer1 */ def merge(b1: Average, b2: Average): Average = { b1.sum += b2.sum b1.count += b2.count b1 }
/** * 计算输出 */ def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
/** * 设定中间值类型的编码器,要转换成 case 类 * Encoders.product 是进行 scala 元组和 case 类转换的编码器 */ def bufferEncoder: Encoder[Average] = Encoders.product
/** * 设定最终输出值的编码器 */ def outputEncoder: Encoder[Double] = Encoders.scalaDouble}
0.3.5 开窗函数
开窗函数与聚合函数一样,都是对行的集合组进行聚合计算
。
开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用 GROUP BY 子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列
。
开窗函数的调用格式为:函数名(列) OVER(选项)
第一大类:聚合开窗函数
-> 聚合函数(列) OVER(选项),这里的选项可以是 PARTITION BY 子句,但不可以是 ORDER BY 子句。
第二大类:排序开窗函数
-> 排序函数(列) OVER(选项), 这里的选项可以是 ORDER BY 子句, 也可以是 OVER(PARTITION BY 子句 ORDER BY 子句),但不可以只是 PARTITION BY 子句。
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("score").setMaster("local[*]")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
import sparkSession.implicits._ val scoreDF = sparkSession.sparkContext .makeRDD(Array( Score("a1", 1, 80), Score("a2", 1, 78), Score("a3", 1, 95), Score("a4", 2, 74), Score("a5", 2, 92), Score("a6", 3, 99), Score("a7", 3, 99), Score("a8", 3, 45), Score("a9", 3, 55), Score("a10", 3, 78) )) .toDF("name", "grade", "score")
scoreDF.createOrReplaceTempView("score") scoreDF.show() }
执行结果如下所示:
+----+-----+-----+|name|grade|score|+----+-----+-----+| a1| 1| 80|| a2| 1| 78|| a3| 1| 95|| a4| 2| 74|| a5| 2| 92|| a6| 3| 99|| a7| 3| 99|| a8| 3| 45|| a9| 3| 55|| a10| 3| 78|+----+-----+-----+
1、聚合开窗函数
OVER 关键字表示把聚合函数当成聚合开窗函数而不是聚合函数。SQL 标准允许将所有聚合函数用做聚合开窗函数。
sparkSession.sql("select name, grade, score, count(name) over() name_count from score").show()
查询结果如下所示:
+----+-----+-----+----------+|name|grade|score|name_count|+----+-----+-----+----------+| a1| 1| 80| 10|| a2| 1| 78| 10|| a3| 1| 95| 10|| a4| 2| 74| 10|| a5| 2| 92| 10|| a6| 3| 99| 10|| a7| 3| 99| 10|| a8| 3| 45| 10|| a9| 3| 55| 10|| a10| 3| 78| 10|+----+-----+-----+----------+
在上边的例子中,开窗函数 COUNT(*) OVER() 对于查询结果的每一行都返回所有符合条件的行的条数。OVER 关键字后的括号中还经常添加选项用以改变进行聚合运算的窗口范围
。如果 OVER 关键字后的括号中的选项为空,则开窗函数会对结果集中的所有行进行聚合运算。
开窗函数的 OVER 关键字后括号中的可以使用 PARTITION BY 子句来定义行的分区来供进行聚合计算。与 GROUP BY 子句不同,PARTITION BY 子句创建的分区是独立于结果集的,创建的分区只是供进行聚合计算的,而且不同的开窗函数所创建的分区也不互相影响
。下面的 SQL 语句用于显示按照班级分组后每组的人数:
sparkSession.sql("select name, grade, score, count(name) over(partition by grade) name_count from score").show()
执行结果如下所示:
+----+-----+-----+----------+|name|grade|score|name_count|+----+-----+-----+----------+| a1| 1| 80| 3|| a2| 1| 78| 3|| a3| 1| 95| 3|| a6| 3| 99| 5|| a7| 3| 99| 5|| a8| 3| 45| 5|| a9| 3| 55| 5|| a10| 3| 78| 5|| a4| 2| 74| 2|| a5| 2| 92| 2|+----+-----+-----+----------+
OVER (PARTITION BY grade) 表示对结果集按照 grade 进行分区,并且计算当前行所属的组的聚合计算结果。在同一个 SELECT 语句中可以同时使用多个开窗函数,而且这些开窗函数并不会相互干扰
。
2、排序开窗函数
对于排序开窗函数来讲,它支持的开窗函数分别为:ROW_NUMBER(行号)
、RANK(排名:会跳跃)
、DENSE_RANK(密集排名)
和 NTILE(分组排名)
。
sparkSession.sql("select name, grade, score, row_number() over(order by score) as row_number from score").show()sparkSession.sql("select name, grade, score, rank() over(order by score) as rank from score").show()sparkSession.sql("select name, grade, score, dense_rank() over(order by score) as dense_rank from score").show()sparkSession.sql("select name, grade, score, ntile(6) over(order by score) as ntile from score").show()
执行的结果如下:
+----+-----+-----+----------+|name|grade|score|row_number|+----+-----+-----+----------+| a8| 3| 45| 1|| a9| 3| 55| 2|| a4| 2| 74| 3|| a2| 1| 78| 4|| a10| 3| 78| 5|| a1| 1| 80| 6|| a5| 2| 92| 7|| a3| 1| 95| 8|| a6| 3| 99| 9|| a7| 3| 99| 10|+----+-----+-----+----------+
+----+-----+-----+----+|name|grade|score|rank|+----+-----+-----+----+| a8| 3| 45| 1|| a9| 3| 55| 2|| a4| 2| 74| 3|| a2| 1| 78| 4|| a10| 3| 78| 4|| a1| 1| 80| 6|| a5| 2| 92| 7|| a3| 1| 95| 8|| a6| 3| 99| 9|| a7| 3| 99| 9|+----+-----+-----+----+
+----+-----+-----+----------+|name|grade|score|dense_rank|+----+-----+-----+----------+| a8| 3| 45| 1|| a9| 3| 55| 2|| a4| 2| 74| 3|| a2| 1| 78| 4|| a10| 3| 78| 4|| a1| 1| 80| 5|| a5| 2| 92| 6|| a3| 1| 95| 7|| a6| 3| 99| 8|| a7| 3| 99| 8|+----+-----+-----+----------+
+----+-----+-----+-----+|name|grade|score|ntile|+----+-----+-----+-----+| a8| 3| 45| 1|| a9| 3| 55| 1|| a4| 2| 74| 2|| a2| 1| 78| 2|| a10| 3| 78| 3|| a1| 1| 80| 3|| a5| 2| 92| 4|| a3| 1| 95| 4|| a6| 3| 99| 5|| a7| 3| 99| 6|+----+-----+-----+-----+
看到上面的结果了吧,下面来介绍下相关的内容。我们得到的最终结果是按照 score 进行升序显示的。
对于 row_number() over(order by score) as row_number
来说,这个排序开窗函数是按 score 升序的方式来排序,并得出排序结果的序号。
对于 rank() over(order by score) as rank
来说,这个排序形容函数是按 score 升序的方式来排序,并得出排序结果的排名号。这个函数求出来的排名结果可以并列, 并列排名之后的排名将是并列的排名加上并列数(简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第三名,也就是没有了第二名,但是有两个第一名)。
对于 dense_rank() over(order by score) as dense_rank
来说,这个排序函数是按 score 升序的方式来排序,并得出排序结果的排名号。这个函数与 rank() 函数不同在于,并列排名之后的排名只是并列排名加 1(简单说每个人只有一种排名,然后出现两个并列第一名的情况,这时候排在两个第一名后面的人将是第二名,也就是两个第一名,一个第二名)。
对于 ntile(6) over(order by score) as ntile
来说,这个排序函数是按 score 升序的方式来排序,然后 6 等分成 6 个组, 并显示所在组的序号。
排序函数和聚合开窗函数类似,也支持在 OVER 子句中使用 PARTITION BY 语句。例如:
sparkSession.sql("select name, grade, score, row_number() over(partition by grade order by score) as row_number from score").show()sparkSession.sql("select name, grade, score, rank() over(partition by grade order by score) as rank from score").show()sparkSession.sql("select name, grade, score, dense_rank() over(partition by grade order by score) as dense_rank from score").show()sparkSession.sql("select name, grade, score, ntile(6) over(partition by grade order by score) as ntile from score").show()
需要注意一点,在排序开窗函数中使用 PARTITION BY 子句需要放置在 ORDER BY 子句之前
。
完整的测试代码如下:
import org.apache.spark.SparkConfimport org.apache.spark.sql.SparkSession
case class Score(name: String, grade: Int, score: Int)
object WindowFunctionTest { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("score").setMaster("local[*]") val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
import sparkSession.implicits._ val scoreDF = sparkSession.sparkContext .makeRDD(Array( Score("a1", 1, 80), Score("a2", 1, 78), Score("a3", 1, 95), Score("a4", 2, 74), Score("a5", 2, 92), Score("a6", 3, 99), Score("a7", 3, 99), Score("a8", 3, 45), Score("a9", 3, 55), Score("a10", 3, 78) )) .toDF("name", "grade", "score")
scoreDF.createOrReplaceTempView("score")
scoreDF.show()
// 1、聚合开窗函数 // sparkSession.sql("select name, grade, score, count(name) over() name_count from score").show()
// 显示按照班级分组后每组的人数 // sparkSession.sql("select name, grade, score, count(name) over(partition by grade) name_count from score").show()
// 2、排序开窗函数 sparkSession.sql("select name, grade, score, row_number() over(order by score) as row_number from score").show() sparkSession.sql("select name, grade, score, rank() over(order by score) as rank from score").show() sparkSession.sql("select name, grade, score, dense_rank() over(order by score) as dense_rank from score").show() sparkSession.sql("select name, grade, score, ntile(6) over(order by score) as ntile from score").show()
sparkSession.sql("select name, grade, score, row_number() over(partition by grade order by score) as row_number from score").show() sparkSession.sql("select name, grade, score, rank() over(partition by grade order by score) as rank from score").show() sparkSession.sql("select name, grade, score, dense_rank() over(partition by grade order by score) as dense_rank from score").show() sparkSession.sql("select name, grade, score, ntile(6) over(partition by grade order by score) as ntile from score").show() }}
0.4 Spark Streaming
0.4.1 Dstream transformation 算子概览
0.4.2 Dstream updataStateByKey 算子概览
updateStateByKey 操作,可以让我们为每一个 key 维护一个 state,并持续不断地更新该 state。
1.首先,要定义一个 state,可以是任意的数据类型。
2.其次,要定义 state 更新函数 -- 指定一个函数如何使用之前的 state 和新值来更新 state。
对于每个 batch,Spark 都会为每个之前已经存在的 key 去应用一次 state 更新函数,无论这个 key 在 batch 中是否有新的数据。如果 state 更新函数返回 none,那么 key 对应的 state 就会被删除。
当然,对于每一个新出现的 key,也会执行 state 更新函数。
注意
:updateStateByKey 操作,要求必须开启 checkpoint 机制。
package com.atguigu.stream
import org.apache.spark.SparkConfimport org.apache.spark.sql.SparkSessionimport org.apache.spark.streaming.{Seconds, StreamingContext}
object UpdateStateByKeyWordCount {
def main(args: Array[String]): Unit = { // 构建 Spark 上下文 val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
// 创建 Spark 客户端 val spark = SparkSession.builder().config(sparkConf).getOrCreate() val sc = spark.sparkContext val ssc = new StreamingContext(sc, Seconds(5))
// 设置检查点目录 ssc.checkpoint("./streaming_checkpoint")
val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1))
val wordCount = pairs.updateStateByKey((values: Seq[Int], state: Option[Int]) => { var newValue = state.getOrElse(0) for (value <- values) { newValue += value } Option(newValue) })
wordCount.print()
ssc.start() ssc.awaitTermination() }}
0.4.3 窗口操作
Spark Streaming 提供了窗口计算,允许在数据的滑动窗口上应用转换,下图说明了这个滑动窗口:
如图所示,每当窗口滑过源 DStream 时,落在窗口内的源 RDD 被组合并运行,以产生窗口 DStream 的 RDD。在这种具体情况下,操作应用于最近 3 个时间单位的数据,并以 2 个时间单位滑动。这表明任何窗口操作都需要指定两个参数。
窗口长度 -- 窗口的持续时间(此图中窗口长度为 3)。
滑动间隔 -- 执行窗口操作的间隔(此图中滑动间隔为 2)。
这两个参数必须是源 DStream 的 batch 间隔的倍数
(上图中 batch 间隔为 1)。
batch 间隔为切割 RDD 的间隔,滑动间隔为每隔多长时间来计算一次,窗口长度为每次计算的数据量是多少
。
0.4.4 Receiver 与 Direct
1.Receiver
Receiver 是使用 Kafka 的高层次 Consumer API 来实现的。Receiver 每隔一段 batch 时间去 Kafka 获取那段时间最新的消息数据,Receiver 从 Kafka 获取的数据都是存储在 Spark Executor 的内存中的,然后 Spark Streaming 启动的 job 会去处理那些数据
。
对于高阶消费者,谁来消费分区不是由 Spark Streaming 决定的,也不是 Storm 决定的,有一个高阶消费者 API, 由高阶消费者决定分区向消费者的分配,即由高阶消费者 API 决定消费者消费哪个分区,而消费者读取数据后什么时候提交 offset 也不是由它们自己决定的,高阶消费者 API 会根据参数配置隔几秒提交一次
。
这会引起一个问题,当 Spark Streaming 中的 Receiver 读取 Kafka 分区数据时,假设读取了 100 条数据,高阶消费者 API 会执行 offset 的提交,例如每隔 3 秒,这 100 条数据就是 RDD,假设此 RDD 还没有处理完, 高阶消费者 API 执行了 offset 提交,但是 Spark Streaming 挂掉了,由于 RDD 在内存中,那么 RDD 的数据就丢失了,如果想重新拿数据,从哪里去拿不是由 Spark Streaming 说了算的,是由高阶 API 决定的,由于 offset 已经提交,高阶 API 认为这个数据 Spark Streaming 已经拿过了,再拿要拿 100 条以后的数据,那么之前丢失的 100 条数据就永远丢失了。
针对这一问题,Spark Streaming 设计了一个规则,即 Spark Streaming 预写日志规则(Write Ahead Log,WAL)
,每读取一批数据,会写一个 WAL 文件,在 WAL文件中,读了多少条就写多少条,WAL 文件存储于 HDFS 上。假设 RDD 中有 100 条数据,那么 WAL 文件中也有 100 条数据,此时如果 Spark Streaming 挂掉,那么回去读取 HDFS 上的 WAL 文件,把 WAL 文件中的 100 条数据取出再生成 RDD,然后再去消费。由于这一设计需要写 HDFS,会对整体性能造成影响
。
假设有 6 个分区,高阶消费者的话会在 Spark 集群的 Worker 上启动 Receiver,有 6 个分区则会用 6 个线程去读取分区数据,这是在一个 Worker 的一个 Receiver中有 6 个线程同时读取 6 个分区的数据,随着数据量越来越大, 数据读取会成为瓶颈,此时可以创建多个 Receiver 分散读取分区数据,然后每个 Receiver 创建一个 Dstream,再把这些流全部都合并起来,然后进行计算。读取时,一方面把 RDD 放在内存中,一方面写 HDFS 中的 WAL 文件。
根据上面的情景,又要创建多个 Receiver,又要进行合并,又要在内存中存储 RDD,又要写 HDFS 上的 WAL 文件,高级 API 的缺点还是比较多的
。
高阶消费者是由高阶消费者 API 自己提交 offset 到 ZooKeeper 中。
2.Direct
低阶消费者需要自己维护 offset,Spark Streaming 从分区里读一部分数据,然后将 offset 保存到 CheckpointPath 目录中
,比如 5s 生成一个 Spark Streaming job(每个 action 操作启动一次 job),每个 job 生成的时候,会写一次 CheckpointPath 下的文件,Checkpoint 中有 job 信息和 offset 信息(当然还有 RDD 依赖关系等其他信息),即保存了未完成的 job 和分区读取的 offset,一旦 Spark Streaming 挂掉后重启,可以通过从 CheckpointPath 中的文件中反序列化来读取 Checkpoint 的数据
。
假设有 5 个分区,第一次 Spark Streaming 读取 100 条数据,那么每个 partition 都会读取 100 条数据,这 100 条数据对应 offset 是 0~99,这 5 个分区的 100 条数据数据直接对应 RDD 的 5 个分区,针对这一 RDD 会启动一个 job 进行处理,job 启动时会将 job 信息和 offset(0~99)写入 CheckpointPath,处理完成前保存 job 和 offset,一旦处理完成,job 信息会被删除,但是 offset 信息会被保留,通过这次的 offset 确定下一次的读取范围,即 100~199,新的 job 信息会被写入,新的 offset 100~199 覆盖原来的 0~99。如果处理第二批次的时候挂掉了, offset 还在,就可以重读这块数据。
0.5 Java
0.5.1 对象池
在学习 MySQL 时,我们接触到了数据库连接池技术,数据库连接池负责分配、管理和释放数据库连接,它允许应用程序重复使用一个现有的数据库连接,而不是再重新建立一个
;释放空闲时间超过最大空闲时间的数据库连接来避免因为没有释放数据库连接而引起的数据库连接遗漏。这项技术能明显提高对数据库操作的性能。
在实际开发时,对象的创建和销毁操作也是非常消耗资源的,因此,我们考虑使用对象池技术。当我们需要创建对象时,向对象池申请一个对象,如果对象池里有空闲的可用节点,就会把节点返回给用户;当我们需要销毁对象时,将对象返回给对象池即可。
我们常用的数据库连接池是 C3P0 等数据库连接池,根据对象池的概念,我们发现对象池与数据库连接池有很大的相似之处,其实,很多数据库连接池就是借助对象池技术实现的,因此,我们可以通过对象池实现自己的数据库连接池。
MySQL代理代码:
/** * MySQL 客户端代理对象 * * @param jdbcUrl MySQL URL * @param jdbcUser MySQL 用户 * @param jdbcPassword MySQL 密码 * @param client 默认客户端实现 */case class MySqlProxy(jdbcUrl: String, jdbcUser: String, jdbcPassword: String, client: Option[Connection] = None) { // 获取客户端连接对象 private val mysqlClient = client getOrElse { DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword) }
/** * 执行增删改 SQL 语句 * * @param sql * @param params * @return 影响的行数 */ def executeUpdate(sql: String, params: Array[Any]): Int = { var rtn = 0 var pstmt: PreparedStatement = null
try { // 第一步:关闭自动提交 mysqlClient.setAutoCommit(false)
// 第二步:根据传入的 sql 语句创建 prepareStatement pstmt = mysqlClient.prepareStatement(sql)
// 第三步:为 prepareStatement 中的每个参数填写数值 if (params != null && params.length > 0) { for (i <- 0 until params.length) { pstmt.setObject(i + 1, params(i)) } }
// 第四步:执行增删改操作 rtn = pstmt.executeUpdate()
// 第五步:手动提交 mysqlClient.commit() } catch { case e: Exception => e.printStackTrace }
rtn }
/** * 执行查询 SQL 语句 * * @param sql * @param params */ def executeQuery(sql: String, params: Array[Any], queryCallback: QueryCallback) { var pstmt: PreparedStatement = null var rs: ResultSet = null
try { // 第一步:根据传入的 sql 语句创建 prepareStatement pstmt = mysqlClient.prepareStatement(sql)
// 第二步:为 prepareStatement 中的每个参数填写数值 if (params != null && params.length > 0) { for (i <- 0 until params.length) { pstmt.setObject(i + 1, params(i)) } }
// 第三步:执行查询操作 rs = pstmt.executeQuery()
// 第四步:处理查询后的结果 queryCallback.process(rs) } catch { case e: Exception => e.printStackTrace } }
/** * 批量执行 SQL 语句 * * @param sql * @param paramsList * @return 每条SQL语句影响的行数 */ def executeBatch(sql: String, paramsList: Array[Array[Any]]): Array[Int] = { var rtn: Array[Int] = null var pstmt: PreparedStatement = null try { // 第一步:关闭自动提交 mysqlClient.setAutoCommit(false)
pstmt = mysqlClient.prepareStatement(sql)
// 第二步:为 prepareStatement 中的每个参数填写数值 if (paramsList != null && paramsList.length > 0) { for (params <- paramsList) { for (i <- 0 until params.length) { pstmt.setObject(i + 1, params(i)) } pstmt.addBatch() } }
// 第三步:执行批量的 SQL 语句 rtn = pstmt.executeBatch()
// 第四步:手动提交 mysqlClient.commit() } catch { case e: Exception => e.printStackTrace }
rtn }
// 关闭 MySQL 客户端 def shutdown(): Unit = mysqlClient.close()}
如上代码所示,我们完成了 MySQL 代理类 MySqlProxy 的创建,每个 MySqlProxy 对象都会完成一次与 MySQL 的连接并提供操作 MySQL 数据库的接口,那么如果我们将 MySqlProxy 对象创建的工作交给对象池,那么就可以实现重复利用与 MySQL 建立的连接,这与数据库连接池的功能是一样的。
在本项目中,我们使用了 Apache common-pool2 框架,Apache common-pool2 包提供了一个通用的对象池技术的实现
。可以很方便的基于它来实现自己的对象池,比如 DBCP 和 Jedis 他们的内部对象池的实现就是依赖于 common-pool2。
common-pool2 有四个核心:
1、工作类:要通过对象池创建对象的类,例如 MySqlProxy 类。
2、工厂类:生产工作类的工厂,工厂类是基于 BasePooledObjectFactory 的。
3、配置类:对象池活跃对象个数、最大空闲数等信息都需要配置,基于 GenericObjectPoolConfig。
4、对象池:实际的对象池类,基于 GenericObjectPool,其对象的创建需要传入工厂类对象和配置类对象。
common-pool2 的对应关系如下图所示:
对象池实现代码:
/** * 扩展知识:将 MySqlProxy 实例视为对象,MySqlProxy 实例的创建使用对象池进行维护 * * 创建自定义工厂类,继承 BasePooledObjectFactory 工厂类,负责对象的创建、包装和销毁 * * @param jdbcUrl * @param jdbcUser * @param jdbcPassword * @param client */class PooledMySqlClientFactory(jdbcUrl: String, jdbcUser: String, jdbcPassword: String, client: Option[Connection] = None) extends BasePooledObjectFactory[MySqlProxy] with Serializable { // 用于池来创建对象 override def create(): MySqlProxy = MySqlProxy(jdbcUrl, jdbcUser, jdbcPassword, client)
// 用于池来包装对象 override def wrap(obj: MySqlProxy): PooledObject[MySqlProxy] = new DefaultPooledObject(obj)
// 用于池来销毁对象 override def destroyObject(p: PooledObject[MySqlProxy]): Unit = { p.getObject.shutdown() super.destroyObject(p) }}
/** * 创建 MySQL 池工具类 */object CreateMySqlPool { // 加载 JDBC 驱动,只需要一次 Class.forName("com.mysql.jdbc.Driver")
// 在 org.apache.commons.pool2.impl 中预设了三个可以直接使用的对象池:GenericObjectPool、GenericKeyedObjectPool 和 SoftReferenceObjectPool // 创建 genericObjectPool 为 GenericObjectPool // GenericObjectPool 的特点是可以设置对象池中的对象特征,包括 LIFO 方式、最大空闲数、最小空闲数、是否有效性检查等等 private var genericObjectPool: GenericObjectPool[MySqlProxy] = null
// 伴生对象通过 apply 完成对象的创建 def apply(): GenericObjectPool[MySqlProxy] = { // 单例模式 if (this.genericObjectPool == null) { this.synchronized { // 获取 MySQL 配置参数 val jdbcUrl = ConfigurationManager.config.getString(Constants.JDBC_URL) val jdbcUser = ConfigurationManager.config.getString(Constants.JDBC_USER) val jdbcPassword = ConfigurationManager.config.getString(Constants.JDBC_PASSWORD) val size = ConfigurationManager.config.getInt(Constants.JDBC_DATASOURCE_SIZE)
val pooledFactory = new PooledMySqlClientFactory(jdbcUrl, jdbcUser, jdbcPassword) val poolConfig = { // 创建标准对象池配置类的实例 val c = new GenericObjectPoolConfig // 设置配置对象参数 // 设置最大对象数 c.setMaxTotal(size) // 设置最大空闲对象数 c.setMaxIdle(size) c }
// 对象池的创建需要工厂类和配置类 // 返回一个 GenericObjectPool 对象池 this.genericObjectPool = new GenericObjectPool[MySqlProxy](pooledFactory, poolConfig) } }
genericObjectPool }}
GenericObjectPool 的核心方法如下:
borrowObject():从对象池中取出一个对象。
returnObject():将使用完成的对象还给对象池。
在每次使用 CreateMySqlPool 时,通过 borrowObject() 提取对象,通过 returnObject() 归还对象。
原文地址:https://www.cnblogs.com/chenmingjun/p/10990227.html