流处理 —— Spark Streaming中的操作函数

1.1 map(fun) 操作

map操作需要传入一个函数当做参数, 主要作用是,对DStream对象a,将func函数作用到a中的每一个元素上并生成新的元素,得到的DStream对象b中包含这些新的元素。

    val conf = new SparkConf().setMaster("local[2]").setAppName("file streaming")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10))    // input:java output:(java,1)
    ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream").map((_,1))

1.2 flatMap(func)

类似map + flatten 即先对集合中的每个元素进行map,再对map后的每个元素中的每个元素进行flatten。

val conf = new SparkConf().setMaster("local[2]").setAppName("file streaming")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))

// input:java scala    ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream").flatMap((_,1))

output:
(java,1)
(scala,1)  

1.3 filter(func)

过滤filter传入一个func函数。对DStream a中的每一个元素,应用func方法进行计算,如果func函数返回结果为true,则保留该元素,否则丢弃该元素,返回一个新的DStream b

val conf = new SparkConf().setMaster("local[2]").setAppName("file streaming")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))

// input:java scala    

ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream").flatMap((_,1)).filter(_.equalsIgnoreCase("java"))

output:
(java,1)

1.4 union(otherStream)

这个操作将两个DStream进行合并,生成一个包含着两个DStream中所有元素的新DStream对象。

val conf = new SparkConf().setMaster("local[2]").setAppName("file streaming")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))

//input:java
val ds = ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream")

val ds1 = ds.map(p => s"${p}_one")
val ds2 = ds.map(p => s"${p}_two")

ds1.union(ds2).print()
// output:
java_one
java_two

1.5 count()

统计DStream中每个RDD包含的元素的个数,得到一个新的DStream,这个DStream中只包含一个元素,这个元素是对应语句单词统计数值。

val conf = new SparkConf().setMaster("local[2]").setAppName("file streaming")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))

//input:java scala java hadoop
val ds = ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream").flatMap(_.split(""))

//output:
4

1.6  reduce(func)

返回一个包含一个元素的DStream,传入的func方法会作用在调用者的每一个元素上,将其中的元素顺次的两两进行计算。

// input: java scala spark    ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream")
      .flatMap(_.split(" "))
      .reduce(_ + "_" + _)

//optput:
java_scala_spark

1.7  countByValue()

某个DStream中的元素类型为K,调用这个方法后,返回的DStream的元素为(K, Long)对,后面这个Long值是原DStream中每个RDD元素key出现的频率。

 // input: java scala java   

ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream")
          .flatMap(_.split(" "))
          .map((1,_))
          .countByValue() // 统计
              .foreachRDD(f => {
                f.foreachPartition(p => {
                  p.foreach(println)
                })
              })
// output:
((1,java),2)
((1,scala),1)

1.8  reduceByKey(func, [numTasks])

调用这个操作的DStream是以(K, V)的形式出现,返回一个新的元素格式为(K, V)的DStream。返回结果中,K为原来的K,V是由K经过传入func计算得到的。还可以传入一个并行计算的参数,在local模式下,默认为2。在其他模式下,默认值由参数spark.default.parallelism确定。

// input: java scala java java
    ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream")
          .flatMap(_.split(" "))
          .map((_,1))
          .reduceByKey(_ + _) // 统计
          .print()

//output:
java,3
scala,1

1.9 join(otherStream, [numTasks])

由一个DStream对象调用该方法,元素内容为(k, V),传入另一个DStream对象,元素内容为(k, W),返回的DStream中包含的内容是(k, (V, W))。这个方法也可以传入一个并行计算的参数,该参数与reduceByKey中是相同的。

// input: java scala

    val ds = ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream")
          .flatMap(_.split(" "))

    val ds1 = ds.map(p => (p,s"${p}_one"))
    val ds2 = ds.map(p=> (p,s"${p}_two"))

    ds1.join(ds2).print()

// output:
(java,(java_one,java_two))
(scala,(scala_one,scala_two))

1.10 cogroup(otherStream, [numTasks])

由一个DStream对象调用该方法,元素内容为(k, V),传入另一个DStream对象,元素内容为(k, W),返回的DStream中包含的内容是(k, (Seq[V], Seq[W]))。这个方法也可以传入一个并行计算的参数,该参数与reduceByKey中是相同的。

// input:java scala java
val ds = ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream")
          .flatMap(_.split(" "))

    val ds1 = ds.map(p => (p,s"${p}_one"))
    val ds2 = ds.map(p=> (p,s"${p}_two"))

    ds1.cogroup(ds2).print()

// output:
(scala,(CompactBuffer(scala_one),CompactBuffer(scala_two)))
(java,(CompactBuffer(java_one, java_one),CompactBuffer(java_two, java_two)))

1.11 transform(func)

DStream的transform操作极大的丰富了DStream上能够进行的操作内容。使用transform操作后,除了可以使用DStream提供的一些转换方法之外(例如RDD操作),还能够直接调用任意的调用RDD上的操作函数。

注意: transform传入的方法是被每一个batch调用的。这样可以支持在RDD上做一些时变的操作,即RDD,分区数以及广播变量可以在不同的batch之间发生变化。

val ssc = ContextUtils.getStreamingContext(this.getClass.getSimpleName, 5)
val lines = ssc.socketTextStream("localhost", 9999)

// 构建黑名单
val blacks = new ListBuffer[(String, Boolean)]()
blacks.append(("huhu", true))
val blacksRDD = ssc.sparkContext.parallelize(blacks)

// 从流中获取访问日志,并对黑名单中的数据进行过滤
lines.map(x => {(x.split(",")(0), x)})
.transform(rdd => {
rdd.leftOuterJoin(blacksRDD)
.filter(_._2._2.getOrElse(false) != true)
.map(x => x._2._1)
}).print()

1.12  updateStateByKey(func)

原文地址:https://www.cnblogs.com/yyy-blog/p/12672322.html

时间: 2024-09-30 11:29:02

流处理 —— Spark Streaming中的操作函数的相关文章

Spark Streaming中的操作函数分析

根据Spark官方文档中的描述,在Spark Streaming应用中,一个DStream对象可以调用多种操作,主要分为以下几类 Transformations Window Operations Join Operations Output Operations 一.Transformations 1.map(func) map操作需要传入一个函数当做参数,具体调用形式为 val b = a.map(func) 主要作用是,对DStream对象a,将func函数作用到a中的每一个元素上并生成新

Spark Streaming中的基本操作函数实例

官网文档中,大概可分为这几个 TransformationsWindow OperationsJoin OperationsOutput Operations 请了解一些基本信息: DStream是Spark Streaming提供的基本抽象.它表示连续的数据流,可以是从源接收的输入数据流,也可以是通过转换输入流生成的已处理数据流.在内部,DStream由一系列连续的RDD表示,这是Spark对不可变分布式数据集的抽象.DStream中的每个RDD都包含来自特定时间间隔的数据,如下图所示 Tra

.Spark Streaming(上)--实时流计算Spark Streaming原理介

Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming原理介绍 http://www.cnblogs.com/shishanyuan/p/4747735.html 1.Spark Streaming简介 1.1 概述 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理.支持从多种数据源获取数据,包括Kafk.Flume.Twitter.ZeroMQ.Kinesis 以及TCP

Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming介绍

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送–Spark入门实战系列>获取 1 Spark Streaming简介 1.1 概述 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理.支持从多种数据源获取数据,包括Kafk.Flume.Twitter.ZeroMQ.Kinesis 以及TCP sockets,从数据源获取数据之后,可以使用诸如map.reduce.join和window等高级函数进行复杂算法的处理

Spark Streaming通过JDBC操作数据库

本文记录了学习使用Spark Streaming通过JDBC操作数据库的过程,源数据从Kafka中读取. Kafka从0.10版本提供了一种新的消费者API,和0.8不同,因此Spark Streaming也提供了两种API与之对应,其中spark-streaming-kafka-0-8支持Kafka 0.8.2.1以后的Broker:spark-streaming-kafka-0-10支持0.10.0以上Broker,处于实验阶段.两者的对比如下表所示. |spark-streaming-ka

第95课:通过Spark Streaming的window操作实战模拟新浪微博、百度、京东等热点搜索词案例实战

背景描述:在社交网络(例如微博).电子商务(例如京东).搜索引擎(例如百度)等人们核心关注的内容之一就是我所关注的内容中大家正在最关注什么或者说当前的热点是什么,这在实际企业级应用中是非常有价值的.例如我们关系过去30分钟大家正在热搜索什么,并且每5分钟更新一次,这就使得热点内容是动态更新,当然也是更有价值. 我们知道在SparkStreaming中可以设置batchInterval,让SparkStreaming每隔batchInterval时间提交一次Job,假设batchInterval设

Spark入门实战系列--7.Spark Streaming(下)--实时流计算Spark Streaming实战

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.实例演示 1.1 流数据模拟器 1.1.1 流数据说明 在实例演示中模拟实际情况,需要源源不断地接入流数据,为了在演示过程中更接近真实环境将定义流数据模拟器.该模拟器主要功能:通过Socket方式监听指定的端口号,当外部程序通过该端口连接并请求数据时,模拟器将定时将指定的文件数据随机获取发送给外部程序. 1.1.2 模拟器代码 import java.io.{PrintWriter} impor

Spark Streaming中动态Batch Size深入及RateController解析

本期内容 : BatchDuration与 Process Time 动态Batch Size Spark Streaming中有很多算子,是否每一个算子都是预期中的类似线性规律的时间消耗呢? 例如:join操作和普通Map操作的处理数据的时间消耗是否会呈现出一致的线性规律呢,也就是说,并非数据量规模越大就是简单加大BatchDuration 就可以解决问题的,数据量是一个方面,计算的算子也是一个考量的因素. 使用BatchSize来适配我们的流处理程序 : 线上的处理程序越来越重要,流入的数据

Spark Streaming中向flume拉取数据

在这里看到的解决方法 https://issues.apache.org/jira/browse/SPARK-1729 请是个人理解,有问题请大家留言. 其实本身flume是不支持像KAFKA一样的发布/订阅功能的,也就是说无法让spark去flume拉取数据,所以老外就想了个取巧的办法. 在flume中其实sinks是向channel主动拿数据的,那么就让就自定义sinks进行自监听,然后使sparkstreaming先和sinks连接在一起, 让streaming来决定是否拿数据及拿数据的频