Spark DStream 转换

第 4 章 DStream 转换

  DStream 上的原语与 RDD 的类似,分为 Transformations(转换)和 Output Operations(输

出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及

各种 Window 相关的原语。

4.1 无状态转化操作

  无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的

每一个 RDD。部分无状态转化操作列在了下表中。注意,针对键值对的 DStream 转化操作(比如

reduceByKey())要添加 import StreamingContext._才能在 Scala 中使用。

  需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个 DStream 在内部

是由许多 RDD(批次)组成,且无状态转化操作是分别应用到每个 RDD 上的。例如,reduceByKey()

会归约每个时间区间中的数据,但不会归约不同区间之间的数据。

  举个例子,在之前的 wordcount 程序中,我们只会统计 1 秒内接收到的数据的单词个数,而

不会累加。

  无状态转化操作也能在多个 DStream 间整合数据,不过也是在各个时间区间内。例如,键 值

对 DStream 拥有和 RDD 一样的与连接相关的转化操作,也就是 cogroup()、join()、leftOuterJoin()

等。我们可以在 DStream 上使用这些操作,这样就对每个批次分别执行了对应的 RDD 操作。

  我们还可以像在常规的 Spark 中一样使用 DStream 的 union() 操作将它和另一个 DStream

的内容合并起来,也可以使用 StreamingContext.union()来合并多个流。

4.2 有状态转化操作

4.2.1 UpdateStateByKey

  UpdateStateByKey 原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例

如流计算中累加 wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的

访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如

何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,

状态) 对。

  updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间

对应的(键,状态)对组成的。

  updateStateByKey 操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功

能,你需要做下面两步:

1. 定义状态,状态可以是一个任意的数据类型。

2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。

  使用 updateStateByKey 需要对检查点目录进行配置,会使用检查点来保存状态。

  更新版的 wordcount:

(1)编写代码

package com.lxl.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WorldCount {
  def main(args: Array[String]) {
    // 定义更新状态方法,参数 values 为当前批次单词频度,state 为以往批次单词频度
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(3))
    ssc.checkpoint(".")
    // Create a DStream that will connect to hostname:port, like hadoop102:9999
    val lines = ssc.socketTextStream("hadoop102", 9999)
    // Split each line into words
    val words = lines.flatMap(_.split(" "))
    //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
    // Count each word in each batch
    val pairs = words.map(word => (word, 1))
    // 使用 updateStateByKey 来更新状态,统计从运行开始以来单词总的次数
    val stateDstream = pairs.updateStateByKey[Int](updateFunc)
    stateDstream.print()
    //val wordCounts = pairs.reduceByKey(_ + _)
    // Print the first ten elements of each RDD generated in this DStream to the console
    //wordCounts.print()
    ssc.start() // Start the computation
    ssc.awaitTermination() // Wait for the computation to terminate
    //ssc.stop()
  }
} 

(2)启动程序并向 9999 端口发送数据

[[email protected] kafka]$ nc -lk 9999

ni shi shui

ni hao ma

(3)结果展示

-------------------------------------------
Time: 1504685175000 ms
-------------------------------------------
-------------------------------------------
Time: 1504685181000 ms
-------------------------------------------
(shi,1)
(shui,1)
(ni,1)
-------------------------------------------
Time: 1504685187000 ms
-------------------------------------------
(shi,1)
(ma,1)
(hao,1)
(shui,1)
(ni,2)

笔记:

[[email protected] spark]$ sudo yum install nc.x86_64
[[email protected] spark]$ nc -lk 9999
package com.atlxl.helloworld

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver

/*
自定义输入流
 */

class CustomerRecevicer(host:String, port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){

  //接收器启动的时候调用
  override def onStart(): Unit = {

    new Thread("receiver"){
      override def run(): Unit = {
        //接受数据并提交给框架
        receive()
      }
    }.start()

  }

  def receive(): Unit ={

    var socket: Socket = null
    var input: String = null

    try {
      socket = new Socket(host,port)

      //生成数据流
      val reader = new BufferedReader(new InputStreamReader(socket.getInputStream))

      //接收数据
      /*
      方式一:
       */
      while (!isStopped() && (input = reader.readLine()) != null){
        store(input)
      }
      /*
      方式二:
       */
//      input = reader.readLine()
//      store(input)
//      while (!isStopped() && input != null){
//        store(input)
//        input = reader.readLine()
//      }
//
      restart("restart")

    }catch {
      case e:java.net.ConnectException => restart("restart")
      case t:Throwable => restart("restart")
    }
  }

  //接收器关闭的时候调用
  override def onStop(): Unit = {}
}
package com.atlxl.helloworld

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    //conf
    val conf = new SparkConf().setAppName("wc").setMaster("local[*]")

    //
    val ssc = new StreamingContext(conf,Seconds(5))

    //保存状态信息
    ssc.checkpoint("./check")

    //获取数据
//    val lineDStream = ssc.socketTextStream("hadoop102",9999)

    //自定义获取
    val lineDStream = ssc.receiverStream(new CustomerRecevicer("hadoop102", 9999))

    //DStream[String]
    val wordsDStream = lineDStream.flatMap(_.split(" "))

    //DStream[(String,1)]
    val k2vDStream = wordsDStream.map((_,1))

    //DStream[(String,sum)]
    //无状态转换
//    val result = k2vDStream.reduceByKey(_+_)

    val updateFuc =(v:Seq[Int],state:Option[Int])=> {

      val preStatus = state.getOrElse(0)
      Some(preStatus + v.sum)
    }

    //有状态转换
    val result = k2vDStream.updateStateByKey(updateFuc)
    result.print()

    //运行
    ssc.start()
    ssc.awaitTermination()

  }

}

4.2.2 Window Operations

  Window Operations 有点类似于 Storm 中的 State,可以设置窗口的大小和滑动窗口的间隔来

动态的获取当前 Steaming 的允许状态。

  基于窗口的操作会在一个比 StreamingContext 的批次间隔更长的时间范围内,通过整合多个

批次的结果,计算出整个窗口的结果。

  所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长,两者都必须是

StreamContext 的批次间隔的整数倍。窗口时长控制每次计算最近的多少个批次的数据,其实就

是最近的 windowDuration/batchInterval 个批次。如果有一个以 10 秒为批次间隔的源 DStream,

要创建一个最近 30 秒的时间窗口(即最近 3 个批次),就应当把 windowDuration 设为 30 秒。

而滑动步长的默认值与批次间隔相等,用来控制对新的 DStream 进行计算的间隔。如果源

DStream 批次间隔为 10 秒,并且我们只希望每两个批次计算一次窗口结果, 就应该把滑动步

长设置为 20 秒。

  假设,你想拓展前例从而每隔十秒对持续 30 秒的数据生成 word count。为做到这个,我们需

要在持续 30 秒数据的(word,1)对DStream上应用 reduceByKey。使用操作 reduceByKeyAndWindow.

# reduce last 30 seconds of data, every 10 second
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x -y, 30, 20)

关于 Window 的操作有如下原语:

  (1)window(windowLength, slideInterval): 基于对源 DStream 窗化的批次进行计算返回一个新

的 Dstream

  (2)countByWindow(windowLength, slideInterval):返回一个滑动窗口计数流中的元素。

  (3)reduceByWindow(func, windowLength, slideInterval):通过使用自定义函数整合滑动区间流

元素来创建一个新的单元素流。

  (4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]):当在一个(K,V)对

的 DStream 上调用此函数,会返回一个新(K,V)对的 DStream,此处通过对滑动窗口中批次数据使

用 reduce 函数来整合每个 key 的 value 值。Note:默认情况下,这个操作使用 Spark 的默认数量并

行任务(本地是 2),在集群模式中依据配置属性(spark.default.parallelism)来做 grouping。你可以通

过设置可选参数 numTasks 来设置不同数量的 tasks。

  (5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]):这个函

数是上述函数的更高效版本,每个窗口的 reduce 值都是通过用前一个窗的 reduce 值来递增计算。

通过 reduce 进入到滑动窗口数据并”反向 reduce”离开窗口的旧数据来实现这个操作。一个例子

是随着窗口滑动对 keys 的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于”可逆

的 reduce 函数”,也就是这些 reduce 函数有相应的”反 reduce”函数(以参数 invFunc 形式传入)。

如前述函数,reduce 任务的数量通过可选参数来配置。注意:为了使用这个操作,检查点必须可

用。

  (6)countByValueAndWindow(windowLength,slideInterval, [numTasks]):对(K,V)对的 DStream

调用,返回(K,Long)对的新 DStream,其中每个 key 的值是其在滑动窗口中频率。如上,可配置

reduce 任务数量。

  reduceByWindow() 和 reduceByKeyAndWindow() 让我们可以对每个窗口更高效地进行归约

操作。它们接收一个归约函数,在整个窗口上执行,比如 +。除此以外,它们还有一种特殊形式,

通过只考虑新进入窗口的数据和离开窗口的数据,让 Spark 增量计算归约结果。这种特殊形式需

要提供归约函数的一个逆函数,比 如 + 对应的逆函数为 -。对于较大的窗口,提供逆函数可以

大大提高执行效率

val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1))
val ipCountDStream = ipDStream.reduceByKeyAndWindow(
 {(x, y) => x + y},
 {(x, y) => x - y},
 Seconds(30),
 Seconds(10))
 //加上新进入窗口的批次中的元素 //移除离开窗口的老批次中的元素 //窗口时长// 滑动步长

  countByWindow() 和 countByValueAndWindow() 作 为 对 数 据 进 行 计 数 操 作 的 简 写 。

countByWindow()返回一个表示每个窗口中元素个数的 DStream,而 countByValueAndWindow()返

回的 DStream 则包含窗口中每个值的个数。

val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()}
val ipAddressRequestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10))
val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))

WordCount 第三版:3 秒一个批次,窗口 12 秒,滑步 6 秒。

package com.atguigu.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WorldCount {
  def main(args: Array[String]) {
    // 定义更新状态方法,参数 values 为当前批次单词频度,state 为以往批次单词频度
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(3))
    ssc.checkpoint(".")
    // Create a DStream that will connect to hostname:port, like localhost:9999
    val lines = ssc.socketTextStream("hadoop102", 9999)
    // Split each line into words
    val words = lines.flatMap(_.split(" "))
    //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
    // Count each word in each batch
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(12), Seconds(6))
    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print()
    ssc.start() // Start the computation
    ssc.awaitTermination() // Wait for the computation to terminate
    //ssc.stop()
  }
} 

4.3 其他重要操作

4.3.1 Transform

  Transform 原语允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream

的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也

就是对 DStream 中的 RDD 应用转换。

  比如下面的例子,在进行单词统计的时候,想要过滤掉 spam 的信息。

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform { rdd =>
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
}

4.3.2 Join

  连接操作(leftOuterJoin, rightOuterJoin, fullOuterJoin也可以),可以连接Stream-Stream,windows

stream to windows-stream、stream-dataset

Stream-Stream Joins

val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)

Stream-dataset joins

val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

原文地址:https://www.cnblogs.com/LXL616/p/11159243.html

时间: 2024-10-27 21:19:51

Spark DStream 转换的相关文章

SparkStreaming DStream转换

1.无状态转换操作 (1)无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转换DStream中的每一个RDD. 部分无状态转化操作: (2)尽管这些函数韩起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上. 例如:reduceByKey()会化简每个事件区间中的数据,但不会化简不同区间之间的数据. (3)在wordcount中,我们只会统计几秒内接收到的数据的单词个数,而不会累加 (4)无状态转化操作也能

Spark RDD转换成DataFrame的两种方式

Spark SQL支持两种方式将现有RDD转换为DataFrame.第一种方法使用反射来推断RDD的schema并创建DataSet然后将其转化为DataFrame.这种基于反射方法十分简便,但是前提是在您编写Spark应用程序时就已经知道RDD的schema类型.第二种方法是通过编程接口,使用您构建的StructType,然后将其应用于现有RDD.虽然此方法很麻烦,但它允许您在运行之前并不知道列及其类型的情况下构建DataSet 方法如下 1.将RDD转换成Rows 2.按照第一步Rows的结

Spark Dstream 创建

第 3 章 Dstream 创建 Spark Streaming 原生支持一些不同的数据源.一些“核心”数据源已经被打包到 Spark Streaming 的 Maven 工件中,而其他的一些则可以通过 spark-streaming-kafka 等附加工件获取. 每个接收器都以 Spark 执行器程序中一个长期运行的任务的形式运行,因此会占据分配给应用 的 CPU 核心.此外,我们还需要有可用的 CPU 核心来处理数据.这意味着如果要运行多个接 收器,就必须至少有和接收器数目相同的核心数,还要

Spark中RDD转换成DataFrame的两种方式(分别用Java和Scala实现)

一:准备数据源     在项目下新建一个student.txt文件,里面的内容为: 1,zhangsan,20 2,lisi,21 3,wanger,19 4,fangliu,18 二:实现 Java版: 1.首先新建一个student的Bean对象,实现序列化和toString()方法,具体代码如下: import java.io.Serializable; @SuppressWarnings("serial") public class Student implements Ser

Spark之SparkStreaming案例

一.Spark Streaming的介绍 ??Spark Streaming是Spark 核心API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理. 数据可以从诸如Kafka,Flume,Kinesis或TCP套接字的许多来源中获取,并且可以使用由高级功能(如map,reduce,join和window)表达的复杂算法进行处理. 最后,处理后的数据可以推送到文件系统,数据库和实时仪表板. 事实上,您可以在数据流上应用Spark的机器学习和图形处理算法. ??在内部,它的工作原理如下. S

Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN

Spark Streaming 编程指南 概述 一个入门示例 基础概念 依赖 初始化 StreamingContext Discretized Streams (DStreams)(离散化流) Input DStreams 和 Receivers(接收器) DStreams 上的 Transformations(转换) DStreams 上的输出操作 DataFrame 和 SQL 操作 MLlib 操作 缓存 / 持久性 Checkpointing Accumulators, Broadcas

整合Kafka到Spark Streaming——代码示例和挑战

作者Michael G. Noll是瑞士的一位工程师和研究员,效力于Verisign,是Verisign实验室的大规模数据分析基础设施(基础Hadoop)的技术主管.本文,Michael详细的演示了如何将Kafka整合到Spark Streaming中. 期间, Michael还提到了将Kafka整合到 Spark Streaming中的一些现状,非常值得阅读,虽然有一些信息在Spark 1.2版本中已发生了一些变化,比如HA策略: 通过Spark Contributor.Spark布道者陈超我

Spark Streaming:大规模流式数据处理的新贵(转)

原文链接:Spark Streaming:大规模流式数据处理的新贵 摘要:Spark Streaming是大规模流式数据处理的新贵,将流式计算分解成一系列短小的批处理作业.本文阐释了Spark Streaming的架构及编程模型,并结合实践对其核心技术进行了深入的剖析,给出了具体的应用场景及优化方案. 提到Spark Streaming,我们不得不说一下BDAS(Berkeley Data Analytics Stack),这个伯克利大学提出的关于数据分析的软件栈.从它的视角来看,目前的大数据处

Spark详解

? Spark概述 当前,MapReduce编程模型已经成为主流的分布式编程模型,它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上.但是MapReduce也存在一些缺陷,如高延迟.不支持DAG模型.Map与Reduce的中间数据落地等.因此在近两年,社区出现了优化改进MapReduce的项目,如交互查询引擎Impala.支持DAG的TEZ.支持内存计算Spark等.Spark是UC Berkeley AMP lab开源的通用并行计算框架,以其先进的设计理念,已经