第 4 章 DStream 转换
DStream 上的原语与 RDD 的类似,分为 Transformations(转换)和 Output Operations(输
出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及
各种 Window 相关的原语。
4.1 无状态转化操作
无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的
每一个 RDD。部分无状态转化操作列在了下表中。注意,针对键值对的 DStream 转化操作(比如
reduceByKey())要添加 import StreamingContext._才能在 Scala 中使用。
需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个 DStream 在内部
是由许多 RDD(批次)组成,且无状态转化操作是分别应用到每个 RDD 上的。例如,reduceByKey()
会归约每个时间区间中的数据,但不会归约不同区间之间的数据。
举个例子,在之前的 wordcount 程序中,我们只会统计 1 秒内接收到的数据的单词个数,而
不会累加。
无状态转化操作也能在多个 DStream 间整合数据,不过也是在各个时间区间内。例如,键 值
对 DStream 拥有和 RDD 一样的与连接相关的转化操作,也就是 cogroup()、join()、leftOuterJoin()
等。我们可以在 DStream 上使用这些操作,这样就对每个批次分别执行了对应的 RDD 操作。
我们还可以像在常规的 Spark 中一样使用 DStream 的 union() 操作将它和另一个 DStream
的内容合并起来,也可以使用 StreamingContext.union()来合并多个流。
4.2 有状态转化操作
4.2.1 UpdateStateByKey
UpdateStateByKey 原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例
如流计算中累加 wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的
访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如
何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,
状态) 对。
updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间
对应的(键,状态)对组成的。
updateStateByKey 操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功
能,你需要做下面两步:
1. 定义状态,状态可以是一个任意的数据类型。
2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。
使用 updateStateByKey 需要对检查点目录进行配置,会使用检查点来保存状态。
更新版的 wordcount:
(1)编写代码
package com.lxl.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object WorldCount { def main(args: Array[String]) { // 定义更新状态方法,参数 values 为当前批次单词频度,state 为以往批次单词频度 val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(3)) ssc.checkpoint(".") // Create a DStream that will connect to hostname:port, like hadoop102:9999 val lines = ssc.socketTextStream("hadoop102", 9999) // Split each line into words val words = lines.flatMap(_.split(" ")) //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Count each word in each batch val pairs = words.map(word => (word, 1)) // 使用 updateStateByKey 来更新状态,统计从运行开始以来单词总的次数 val stateDstream = pairs.updateStateByKey[Int](updateFunc) stateDstream.print() //val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console //wordCounts.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate //ssc.stop() } }
(2)启动程序并向 9999 端口发送数据
[[email protected] kafka]$ nc -lk 9999
ni shi shui
ni hao ma
(3)结果展示
------------------------------------------- Time: 1504685175000 ms ------------------------------------------- ------------------------------------------- Time: 1504685181000 ms ------------------------------------------- (shi,1) (shui,1) (ni,1) ------------------------------------------- Time: 1504685187000 ms ------------------------------------------- (shi,1) (ma,1) (hao,1) (shui,1) (ni,2)
笔记:
[[email protected] spark]$ sudo yum install nc.x86_64
[[email protected] spark]$ nc -lk 9999
package com.atlxl.helloworld import java.io.{BufferedReader, InputStreamReader} import java.net.Socket import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver /* 自定义输入流 */ class CustomerRecevicer(host:String, port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){ //接收器启动的时候调用 override def onStart(): Unit = { new Thread("receiver"){ override def run(): Unit = { //接受数据并提交给框架 receive() } }.start() } def receive(): Unit ={ var socket: Socket = null var input: String = null try { socket = new Socket(host,port) //生成数据流 val reader = new BufferedReader(new InputStreamReader(socket.getInputStream)) //接收数据 /* 方式一: */ while (!isStopped() && (input = reader.readLine()) != null){ store(input) } /* 方式二: */ // input = reader.readLine() // store(input) // while (!isStopped() && input != null){ // store(input) // input = reader.readLine() // } // restart("restart") }catch { case e:java.net.ConnectException => restart("restart") case t:Throwable => restart("restart") } } //接收器关闭的时候调用 override def onStop(): Unit = {} }
package com.atlxl.helloworld import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object WordCount { def main(args: Array[String]): Unit = { //conf val conf = new SparkConf().setAppName("wc").setMaster("local[*]") // val ssc = new StreamingContext(conf,Seconds(5)) //保存状态信息 ssc.checkpoint("./check") //获取数据 // val lineDStream = ssc.socketTextStream("hadoop102",9999) //自定义获取 val lineDStream = ssc.receiverStream(new CustomerRecevicer("hadoop102", 9999)) //DStream[String] val wordsDStream = lineDStream.flatMap(_.split(" ")) //DStream[(String,1)] val k2vDStream = wordsDStream.map((_,1)) //DStream[(String,sum)] //无状态转换 // val result = k2vDStream.reduceByKey(_+_) val updateFuc =(v:Seq[Int],state:Option[Int])=> { val preStatus = state.getOrElse(0) Some(preStatus + v.sum) } //有状态转换 val result = k2vDStream.updateStateByKey(updateFuc) result.print() //运行 ssc.start() ssc.awaitTermination() } }
4.2.2 Window Operations
Window Operations 有点类似于 Storm 中的 State,可以设置窗口的大小和滑动窗口的间隔来
动态的获取当前 Steaming 的允许状态。
基于窗口的操作会在一个比 StreamingContext 的批次间隔更长的时间范围内,通过整合多个
批次的结果,计算出整个窗口的结果。
所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长,两者都必须是
StreamContext 的批次间隔的整数倍。窗口时长控制每次计算最近的多少个批次的数据,其实就
是最近的 windowDuration/batchInterval 个批次。如果有一个以 10 秒为批次间隔的源 DStream,
要创建一个最近 30 秒的时间窗口(即最近 3 个批次),就应当把 windowDuration 设为 30 秒。
而滑动步长的默认值与批次间隔相等,用来控制对新的 DStream 进行计算的间隔。如果源
DStream 批次间隔为 10 秒,并且我们只希望每两个批次计算一次窗口结果, 就应该把滑动步
长设置为 20 秒。
假设,你想拓展前例从而每隔十秒对持续 30 秒的数据生成 word count。为做到这个,我们需
要在持续 30 秒数据的(word,1)对DStream上应用 reduceByKey。使用操作 reduceByKeyAndWindow.
# reduce last 30 seconds of data, every 10 second windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x -y, 30, 20)
关于 Window 的操作有如下原语:
(1)window(windowLength, slideInterval): 基于对源 DStream 窗化的批次进行计算返回一个新
的 Dstream
(2)countByWindow(windowLength, slideInterval):返回一个滑动窗口计数流中的元素。
(3)reduceByWindow(func, windowLength, slideInterval):通过使用自定义函数整合滑动区间流
元素来创建一个新的单元素流。
(4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]):当在一个(K,V)对
的 DStream 上调用此函数,会返回一个新(K,V)对的 DStream,此处通过对滑动窗口中批次数据使
用 reduce 函数来整合每个 key 的 value 值。Note:默认情况下,这个操作使用 Spark 的默认数量并
行任务(本地是 2),在集群模式中依据配置属性(spark.default.parallelism)来做 grouping。你可以通
过设置可选参数 numTasks 来设置不同数量的 tasks。
(5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]):这个函
数是上述函数的更高效版本,每个窗口的 reduce 值都是通过用前一个窗的 reduce 值来递增计算。
通过 reduce 进入到滑动窗口数据并”反向 reduce”离开窗口的旧数据来实现这个操作。一个例子
是随着窗口滑动对 keys 的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于”可逆
的 reduce 函数”,也就是这些 reduce 函数有相应的”反 reduce”函数(以参数 invFunc 形式传入)。
如前述函数,reduce 任务的数量通过可选参数来配置。注意:为了使用这个操作,检查点必须可
用。
(6)countByValueAndWindow(windowLength,slideInterval, [numTasks]):对(K,V)对的 DStream
调用,返回(K,Long)对的新 DStream,其中每个 key 的值是其在滑动窗口中频率。如上,可配置
reduce 任务数量。
reduceByWindow() 和 reduceByKeyAndWindow() 让我们可以对每个窗口更高效地进行归约
操作。它们接收一个归约函数,在整个窗口上执行,比如 +。除此以外,它们还有一种特殊形式,
通过只考虑新进入窗口的数据和离开窗口的数据,让 Spark 增量计算归约结果。这种特殊形式需
要提供归约函数的一个逆函数,比 如 + 对应的逆函数为 -。对于较大的窗口,提供逆函数可以
大大提高执行效率
val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1)) val ipCountDStream = ipDStream.reduceByKeyAndWindow( {(x, y) => x + y}, {(x, y) => x - y}, Seconds(30), Seconds(10)) //加上新进入窗口的批次中的元素 //移除离开窗口的老批次中的元素 //窗口时长// 滑动步长
countByWindow() 和 countByValueAndWindow() 作 为 对 数 据 进 行 计 数 操 作 的 简 写 。
countByWindow()返回一个表示每个窗口中元素个数的 DStream,而 countByValueAndWindow()返
回的 DStream 则包含窗口中每个值的个数。
val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()} val ipAddressRequestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10)) val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))
WordCount 第三版:3 秒一个批次,窗口 12 秒,滑步 6 秒。
package com.atguigu.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object WorldCount { def main(args: Array[String]) { // 定义更新状态方法,参数 values 为当前批次单词频度,state 为以往批次单词频度 val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(3)) ssc.checkpoint(".") // Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("hadoop102", 9999) // Split each line into words val words = lines.flatMap(_.split(" ")) //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(12), Seconds(6)) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate //ssc.stop() } }
4.3 其他重要操作
4.3.1 Transform
Transform 原语允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream
的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也
就是对 DStream 中的 RDD 应用转换。
比如下面的例子,在进行单词统计的时候,想要过滤掉 spam 的信息。
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information val cleanedDStream = wordCounts.transform { rdd => rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning ... }
4.3.2 Join
连接操作(leftOuterJoin, rightOuterJoin, fullOuterJoin也可以),可以连接Stream-Stream,windows
stream to windows-stream、stream-dataset
Stream-Stream Joins
val stream1: DStream[String, String] = ... val stream2: DStream[String, String] = ... val joinedStream = stream1.join(stream2) val windowedStream1 = stream1.window(Seconds(20)) val windowedStream2 = stream2.window(Minutes(1)) val joinedStream = windowedStream1.join(windowedStream2)
Stream-dataset joins
val dataset: RDD[String, String] = ... val windowedStream = stream.window(Seconds(20))... val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
原文地址:https://www.cnblogs.com/LXL616/p/11159243.html