Spark Streaming中的基本操作函数实例

官网文档中,大概可分为这几个

Transformations
Window Operations
Join Operations
Output Operations

请了解一些基本信息:

DStream是Spark Streaming提供的基本抽象。它表示连续的数据流,可以是从源接收的输入数据流,也可以是通过转换输入流生成的已处理数据流。在内部,DStream由一系列连续的RDD表示,这是Spark对不可变分布式数据集的抽象。DStream中的每个RDD都包含来自特定时间间隔的数据,如下图所示

Transformations 直达车

1)map(func),将func函数作用到每一个元素上并生成一个新的元素,得到一个新的的DStream对象,包含这些新的元素。

代码

object Map {  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(5))    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)    val mapLines = lines.map(word => "map_" + word)

    mapLines.print()

    ssc.start()    ssc.awaitTermination()  }}

结果

2)flatMap(func),将func函数作用到每一个元素上并生成0个或多个新的元素(例如下面的split就生成了>=0个新元素),得到一个新的DStream对象。包含这些新的元素。

代码

object FlatMap {  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(5))    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)    val fmapLines = lines.flatMap(_.split(" "))

    fmapLines.print()

    ssc.start()    ssc.awaitTermination()  }}

结果

ps:这里放一个关于RDD map和flatMap的依赖图(红色块表示一个RDD区,黑色块表示该分区集合),意会下

3)filter(func),对DStream每一个元素,应用func方法进行计算,如果func函数返回结果为true,则保留该元素,否则丢弃该元素,返回一个新的DStream。

代码

object Filter {  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(5))    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)

    val filterLines = lines.flatMap(_.split(" "))      .filter(!StringUtils.equals(_, "hello"))

    filterLines.print()

    ssc.start()    ssc.awaitTermination()  }}

结果

4)repartition(numPartitions),可自行操作

5)union(otherStream),返回一个新的DStream,它包含源DStream和otherDStream中元素的并集。

代码

def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(5))    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)    val union1 = lines.map(word => "union1_" + word)    val union2 = lines.map(word => "union2_" + word)    val union1_2 = union1.union(union2)

    union1.print()    union2.print()    union1_2.print()

    ssc.start()    ssc.awaitTermination()  }

结果

6)count(),通过计算源DStream的每个RDD中的元素数量,返回单个元素RDD的新DStream。

代码

object Count {  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(5))    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)    val mapLines = lines.map(_.split(" "))    val fmapLines = lines.flatMap(_.split(" "))

    mapLines.count().print()    fmapLines.count().print()

    ssc.start()    ssc.awaitTermination()  }}

结果

7)reduce(func),通过使用函数func(它接受两个参数并返回一个),其中两个参数(元素)两两计算,返回单个元素RDD的新DStream 。

代码

object Reduce {  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(5))    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)

    val fmapLines = lines.flatMap(_.split(" "))    val result = fmapLines.reduce(_ + "*" + _)    //fmapLines.reduce((a, b) => a + "*" + b)

    result.print()

    ssc.start()    ssc.awaitTermination()  }}

结果

8)countByValue(),当在类型为K的DStream元素上调用时,返回新DStream的元素是(K,Long)对,其中每个键的值(Long)是其在源DStream的每个RDD中的频率。

代码

object countByValue {  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(5))    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)

    val fmapLines = lines.flatMap(_.split(" "))    val countByKey = fmapLines.countByValue()

    countByKey.print()

    ssc.start()    ssc.awaitTermination()  }}

结果

9)reduceByKey(func, [numTasks]),当在类型为(K,V)的DStream元素上调用时,返回(K,V)对的新DStream,其中K为原来的K,V是由K经过传入func计算得到的。

注意:默认情况下,这使用Spark的默认并行任务数(local模式下默认为2,在群集模式下,数量由config属性确定spark.default.parallelism)进行分组。

代码

object ReduceByKey {  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(5))    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)

    val fmapLines = lines.flatMap(_.split(" "))    val tuple = fmapLines.map(word => (word, 1))    val reduceByKey = tuple.reduceByKey(_ + _)

    reduceByKey.print()

    ssc.start()    ssc.awaitTermination()  }}

结果

10)join(otherStream, [numTasks]),当在(K,V)和(K,W)对的两个DStream上调用时,返回新的DStream内容是(K,(V,W))对。numTasks并行度,可选

代码

object Join {  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(5))    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)    val words = lines.flatMap(_.split(" "))    val join1 = words.map(word => (word, "join1_" + word))    val join2 = words.map(word => (word, "join2_" + word))    val join1_2 = join1.join(join2)

    join1.print()    join2.print()    join1_2.print()

    ssc.start()    ssc.awaitTermination()  }}

结果

11)cogroup(otherStream, [numTasks]),当在(K,V)和(K,W)对的DStream上调用时,返回(K,Seq [V],Seq [W])元组的新DStream。numTasks并行度,可选

代码

object Cogroup {  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(5))    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)    val words = lines.flatMap(_.split(" "))    val cogroup1 = words.map(word => (word, "cogroup1_" + word))    val cogroup2 = words.map(word => (word, "cogroup2_" + word))    val cogroup1_2 = cogroup1.cogroup(cogroup2)

    cogroup1.print()    cogroup2.print()    cogroup1_2.print()

    ssc.start()    ssc.awaitTermination()  }}

结果

12)transform(func直达车,通过将RDD-to-RDD函数应用于源DStream的每个RDD来返回新的DStream。这可以用于在DStream上执行任意RDD操作。

代码

object Transform {  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(5))    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)

    val words = lines.transform(rdd=>{      rdd.flatMap(_.split(" "))    })

    words.print()

    ssc.start()    ssc.awaitTermination()  }}

结果

13)updateStateByKey(func)直达车,返回一个新的“状态”DStream,其中通过在键的先前状态和键的新值上应用给定函数来更新每个键的状态。这可用于维护每个密钥的任意状态数据。

代码

object UpdateStateByKey {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FileWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(5))    ssc.sparkContext.setLogLevel("ERROR")

    //请注意,使用updateStateByKey需要配置检查点目录    ssc.checkpoint("D:\\spark\\checkpoint")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)

    val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)    val state = result.updateStateByKey[Int](updateFunction _)

    state.print()

    ssc.start()    ssc.awaitTermination()  }

  /**    * 更新数据    * @param newValues    * @param runningCount    * @return    */  def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {

    val current = newValues.sum    val pre = runningCount.getOrElse(0)    Some(current + pre)

  }}

结果

Window Operations 直达车

window:定时的进行一段时间内数据的操作
window length:窗口的长度
sliding interval:窗口的间隔
这两个参数和batch size是倍数关系,不是的话会报错

1)window(windowLengthslideInterval),将当前时刻当前长度窗口中的元素取出形成一个新的DStream。

代码

object Window {  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(5))    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)

    val fmapLines = lines.flatMap(_.split(" "))    //每隔5秒去计算前10秒的结果    val window = fmapLines.window(Seconds(10), Seconds(5))

    window.print()

    ssc.start()    ssc.awaitTermination()  }}

结果

2)countByWindow(windowLengthslideInterval),和count类似,只不过Dstream是我们截取的。

代码

object CountByWindow {  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(5))    ssc.checkpoint("D:\\spark\\checkpoint")    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)

    val fmapLines = lines.flatMap(_.split(" "))    //每隔5s统计当前10秒长度的时间窗口的DStream中元素的个数:    val countByWindow = fmapLines.countByWindow(Seconds(10), Seconds(5))

    countByWindow.print()

    ssc.start()    ssc.awaitTermination()  }}

结果

3)reduceByWindow(funcwindowLengthslideInterval),和reduce类似,只不过Dstream是我们截取的。

代码

object ReduceByWindow {  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(5))    ssc.checkpoint("D:\\spark\\checkpoint")    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)

    val fmapLines = lines.flatMap(_.split(" "))    val reduceByWindow = fmapLines.reduceByWindow(_ + "*" + _, Seconds(10), Seconds(5))

    reduceByWindow.print()

    ssc.start()    ssc.awaitTermination()  }}

结果

4)reduceByKeyAndWindow(funcwindowLengthslideInterval, [numTasks]) 直达车,和reduceByKey类似,只不过Dstream是我们截取的。

代码

object ReduceByKeyAndWindow {  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(5))    ssc.checkpoint("D:\\spark\\checkpoint")    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)

    val fmapLines = lines.flatMap(_.split(" "))    val tuple = fmapLines.map(word => (word, 1))    val reduceByKeyAndWindow = tuple.reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(10),Seconds(5))

    reduceByKeyAndWindow.print()

    ssc.start()    ssc.awaitTermination()  }}

结果

5)reduceByKeyAndWindow(funcinvFuncwindowLengthslideInterval, [numTasks]),和上面相比,多传入一个函数invFunc。向车站一样,有进去的人,也有出去的人,进去的人+1,出来的人-1。

代码

object ReduceByKeyAndWindow2 {  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(5))    ssc.checkpoint("D:\\spark\\checkpoint")    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)

    val fmapLines = lines.flatMap(_.split(" "))    val tuple = fmapLines.map(word => (word, 1))    val reduceByKeyAndWindow = tuple.reduceByKeyAndWindow((a: Int, b: Int) => (a + b), (a: Int, b: Int) => (a - b), Seconds(10), Seconds(5))

    reduceByKeyAndWindow.print()

    ssc.start()    ssc.awaitTermination()  }}

结果

6)countByValueAndWindow(windowLength,slideInterval, [numTasks]),和countByValue类似,只不过Dstream是我们截取的。

代码

object CountByValueAndWindow {  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(5))    ssc.checkpoint("D:\\spark\\checkpoint")    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)

    val fmapLines = lines.flatMap(_.split(" "))    val countByValueAndWindow = fmapLines.countByValueAndWindow(Seconds(10), Seconds(5))

    countByValueAndWindow.print()

    ssc.start()    ssc.awaitTermination()  }}

结果

Join Operations 直达车

1)Stream-stream joins 直达车

调用 join,leftOuterJoin,rightOuterJoin,fullOuterJoin就ok了

2)Stream-dataset joins 直达车

val dataset: RDD[String, String] = ...val windowedStream = stream.window(Seconds(20))...val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

我们调用transform后就可以和dataset 连接操作了

Output Operations 直达车

输出操作      含义
print() 在运行流应用程序的驱动程序节点上打印DStream中每批数据的前十个元素。这对开发和调试很有用。
saveAsTextFiles(prefix, [suffix]) 将此DStream的内容保存为文本文件。每个批处理间隔的文件名基于前缀后缀生成:“prefix-TIME_IN_MS [.suffix]”
saveAsObjectFiles(prefix, [suffix]) 将此DStream的内容保存为SequenceFiles序列化Java对象。每个批处理间隔的文件名基于前缀和 后缀生成:“prefix-TIME_IN_MS [.suffix]”。 
saveAsHadoopFiles(prefix, [suffix]) 将此DStream的内容保存为SequenceFiles序列化Java对象。每个批处理间隔的文件名基于前缀和 后缀生成:“prefix-TIME_IN_MS [.suffix]”。 
Python API这在Python API中不可用。  
foreachRDD(func) 最通用的输出运算符,它将函数func应用于从流生成的每个RDD。此函数应将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或通过网络将其写入数据库。请注意,函数func在运行流应用程序的驱动程序进程中执行,并且通常会在其中执行RDD操作,这将强制计算流式RDD。

1)foreachRDD(func),正确高效的使用 直达车

connection 为外部链接

代码

dstream.foreachRDD { rdd =>  rdd.foreachPartition { partitionOfRecords =>    val connection = createNewConnection()    partitionOfRecords.foreach(record => connection.send(record))    connection.close()  }}

更高效的

dstream.foreachRDD { rdd =>  rdd.foreachPartition { partitionOfRecords =>    // ConnectionPool is a static, lazily initialized pool of connections    val connection = ConnectionPool.getConnection()    partitionOfRecords.foreach(record => connection.send(record))    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse  }}

原文地址:https://www.cnblogs.com/zhangrui153169/p/11375955.html

时间: 2024-10-29 09:59:45

Spark Streaming中的基本操作函数实例的相关文章

Spark Streaming中的操作函数分析

根据Spark官方文档中的描述,在Spark Streaming应用中,一个DStream对象可以调用多种操作,主要分为以下几类 Transformations Window Operations Join Operations Output Operations 一.Transformations 1.map(func) map操作需要传入一个函数当做参数,具体调用形式为 val b = a.map(func) 主要作用是,对DStream对象a,将func函数作用到a中的每一个元素上并生成新

流处理 —— Spark Streaming中的操作函数

1.1 map(fun) 操作 map操作需要传入一个函数当做参数, 主要作用是,对DStream对象a,将func函数作用到a中的每一个元素上并生成新的元素,得到的DStream对象b中包含这些新的元素. val conf = new SparkConf().setMaster("local[2]").setAppName("file streaming") val sc = new SparkContext(conf) val ssc = new Streami

Spark Streaming中动态Batch Size深入及RateController解析

本期内容 : BatchDuration与 Process Time 动态Batch Size Spark Streaming中有很多算子,是否每一个算子都是预期中的类似线性规律的时间消耗呢? 例如:join操作和普通Map操作的处理数据的时间消耗是否会呈现出一致的线性规律呢,也就是说,并非数据量规模越大就是简单加大BatchDuration 就可以解决问题的,数据量是一个方面,计算的算子也是一个考量的因素. 使用BatchSize来适配我们的流处理程序 : 线上的处理程序越来越重要,流入的数据

Spark Streaming中向flume拉取数据

在这里看到的解决方法 https://issues.apache.org/jira/browse/SPARK-1729 请是个人理解,有问题请大家留言. 其实本身flume是不支持像KAFKA一样的发布/订阅功能的,也就是说无法让spark去flume拉取数据,所以老外就想了个取巧的办法. 在flume中其实sinks是向channel主动拿数据的,那么就让就自定义sinks进行自监听,然后使sparkstreaming先和sinks连接在一起, 让streaming来决定是否拿数据及拿数据的频

spark streaming中使用checkpoint

从官方的Programming Guides中看到的 我理解streaming中的checkpoint有两种,一种指的是metadata的checkpoint,用于恢复你的streaming:一种是rdd的checkpoint的:下面的代码指的是第一种: // Function to create and setup a new StreamingContext def functionToCreateContext(): StreamingContext = { val ssc = new S

Spark Streaming编程指南

本文基于Spark Streaming Programming Guide原文翻译, 加上一些自己的理解和小实验的结果. 一.概述 Spark Streaming是基于Core Spark API的可扩展,高吞吐量,并具有容错能力的用于处理实时数据流的一个组件.Spark Streaming可以接收各种数据源传递来的数据,比如Kafka, Flume, Kinesis或者TCP等,对接收到的数据还可以使用一些用高阶函数(比如map, reduce, join及window)进行封装的复杂算法做进

Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN

Spark Streaming 编程指南 概述 一个入门示例 基础概念 依赖 初始化 StreamingContext Discretized Streams (DStreams)(离散化流) Input DStreams 和 Receivers(接收器) DStreams 上的 Transformations(转换) DStreams 上的输出操作 DataFrame 和 SQL 操作 MLlib 操作 缓存 / 持久性 Checkpointing Accumulators, Broadcas

Spark(十) -- Spark Streaming API编程

本文测试的Spark版本是1.3.1 Spark Streaming编程模型: 第一步: 需要一个StreamingContext对象,该对象是Spark Streaming操作的入口 ,而构建一个StreamingContext对象需要两个参数: 1.SparkConf对象:该对象是配置Spark 程序设置的,例如集群的Master节点,程序名等信息 2.Seconds对象:该对象设置了StreamingContext多久读取一次数据流 第二步: 构建好入口对象之后,直接调用该入口的方法读取各

(版本定制)第5课:基于案例分析Spark Streaming流计算框架的运行源码

本期内容: 1.在线动态计算分类最热门商品案例回顾与演示 2.基于案例分析Spark Streaming的运行源码 第一部分案例: package com.dt.spark.sparkstreaming import com.robinspark.utils.ConnectionPoolimport org.apache.spark.SparkConfimport org.apache.spark.sql.Rowimport org.apache.spark.sql.hive.HiveConte