13.spark streaming之快速入门

简介

  Spark Streaming是Spark核心API的扩展,可以实现可伸缩、高吞吐量、具备容错机制的实时流时数据的处理。支持多种数据源,比如Kafka、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets。

  可以使用诸如map、reduce、join和window等高级函数进行复杂算法(比如,机器学习和图计算)的处理。最后还可以将处理结果存储到文件系统,数据库和仪表盘。

架构与抽象

抽象

  Spark Streaming接收实时流的数据,并根据一定的时间间隔拆分成一批批的数据,然后通过Spark Engine处理这些批数据,最终得到处理后的一批批结果数据。

  Spark Streaming提供了一个叫做DStream(discretized stream,离散流)的抽象概念,DStream由一系列的RDD组成,表示每个批次中连续的数据流。DStream可以从输入源(比如,Kafka、Flume、Kinesis等)中创建,也可以从其他DStream中使用高级算子操作转换生成。

  DStream的所有操作其实都是对DStream中所有RDD的操作。比如,在单词统计案例中,flatMap转化操作会应用到每个行RDD上来生成单词RDD。

架构

  • Receiver:Spark Streaming内置的数据流接收器或自定义接收器,用于从数据源接收源源不断的数据流。
  • CurrentBuffer:用于缓存输入流接收器接收的数据流。
  • BlockIntervalTimer:一个定时器,用于将CurrentBuffer中缓存的数据流封装为Block后放入blocksForPushing队列中。
  • BlocksForPushing:待处理的Block
  • BlockPushingThread:此线程每隔100毫秒从BlocksForPushing队列中取出一个Block存入存储系统,并缓存到ReceivedBlockQueue队列中。
  • Block Batch:Block批次,按照批次时间间隔,从ReceivedBlockQueue队列中获取一批Block。
  • JobGenerator:Job生成器,用于给每一批Block生成一个Job。

DStream 转化操作

  DStream转化操作分为无状态(stateless)和有状态(stateful)两种。

  • 无状态转化操作中,每个批次的处理不依赖于之前批次的数据。
  • 有状态转化操作需要使用之前批次的数据或中间结果来计算当前批次的数据。

无状态转化操作

  无状态转化操作就是把简单的RDD转化操作应用到每个批次上,转化DStream中的每个RDD。

常用的无状态转化操作

函数名称 作用 scala示例
map() 对DStream中的每个元素应用指定函数,返回由各元素输出的元素组成的DStream ds.map(x => x+1)
flatMap() 对DStream中的每个元素应用指定函数,返回由各元素输出的迭代器组成的DStream ds.flatMap(x => x.split(" "))
filter 返回由给定DStream中通过筛选的元素组成的DStream ds.filter(x => x!=1)
repartition() 改变DStream的分区数 ds.repartition(10)
reduceByKey 将每个批次中键相同的记录聚合 ds.reduceByKey((x,y) => x+y)
groupByKey 将每个批次中的记录根据键分组 ds.groupByKey()
  • 使用map()和reduceByKey()在每个时间区间中对日志根据IP地址进行计数。

    • scala
    //假设ApacheAccessingLog是用来从Apache日志中解析条目的工具类
    val accessLogDStream = logData.map(line => ApacheAccessingLog.parseFromLogLine(line))
    val ipDStream = accessLogsDStream.map(entry => (entry.getIpAddress(), 1)
    val ipCountsDStream = ipDStream.reduceByKey((x,y) => x+y)
    • java
    //假设ApacheAccessingLog是用来从Apache日志中解析条目的工具类
    static final class IpTuple implements PairFunction<ApacheAccessLog, String, Long> {
        public Tuple2<String, Long> call(ApacheAccessLog log) {
            return new Tuple2<>(log.getIpAddress(), 1L);
        }
    }
    
    JavaDStream<ApacheAccessLog> accessLogDStream = logData.map(new ParseFromLogLine());
    JavaPairDStream<String, Long> ipDStream = accessLogDStream.mapToPair(new IpTuple());
    JavaPairDStream(String, Long) ipCountsDStream = ipDStream.reduceByKey(new LongSumReducer());
  • 以IP地址为键,将请求计数的数据和传输数据量的数据连接起来
    • scala
    val ipBytesDStream = accessLogsDStream.map(entry => (entry.getIpAddress(), entry.getContentSize()))
    val ipBytesSumDStream = ipBytesDStream.reduceByKey((x,y) => x+y)
    val ipBytesRequestCountDStream = ipCountsDStream.join(ipBytesSumDStream)
    • java
    JavaPairDStream<String, Long> ipBytesDStream = accessLogsDStream.mapToPair(new IpContentTuple());
    JavaPairDStream<String, Long> ipBytesSumDStream = ipBytesDStream.reduceByKey(new LongSumReducer());
    JavaPairDStream<String, Tuple2<Long,Long>> ipBytesRequestCountDStream = ipCountsDStream.join(ipBytesSumDStream);
  • 使用transform()操作实现自定义转化操作,从日志记录中提取异常值。
    • scala
    val outlierDStream = accessLogsDStream.transform{
        rdd => extractOutliers(rdd)
    }
    • java
    JavaPairDStream<String, Long> ipRawDStream = accessLogsDStream.transform(
        new Function<JavaPairRDD<ApacheAccessLog>, JavaRDD<ApacheAccessLog>>() {
            public JavaPairRDD<ApacheAccessLog> call(JavaRDD<ApacheAccessLog> rdd) {
                return extractOutliers(rdd);
            }
        }
    );

有状态转化操作

  DStream的有状态转化操作是跨时间区间跟踪数据的操作,先前批次的数据也被用来在新的批次中计算结果。

  有状态转化操作主要有两种类型:滑动窗口和updateStateByKey()。前者以一个时间阶段为滑动窗口进行操作,后者用来跟踪每个键的状态变化。

设置检查点

  有状态转化操作需要在StreamingContext中打开检查点机制确保容错性。

ssc.checkpoint("hdfs://...")

基于窗口的转化操作

简介

  基于窗口的操作会在一个比StreamingContext批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。

  基于窗口的转化操作需要两个参数,分别是窗口时长和滑动时长。两者都是批次间隔的整数倍。

  • 窗口时长:控制每次计算最近的windowDuration/batchInterval个批次的数据。
  • 滑动步长:默认值与批次间隔相等。用来控制对新DStream进行计算的时间间隔。
简单案例
  • 使用window()对窗口进行计数

    • scala
    val accessLogsWindow = accessLogsDStream.window(Seconds(30), Seconds(10))
    val windowCounts = accessLogsWindow.count()
    • java
    JavaDStream<ApacheAccessLog> accessLogsWindow = accessLogsDStream.window(Durations.seconds(30), Duration.seconds(10));
    JavaDStream<Integer> windowCounts = accessLogsWindow.count();
  • 使用reduceByKeyAndWindow对每个IP地址的访问量计数
    • scala
    val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1))
    val ipCountDStream = ipDStream.reduceByKeyAndWindow(
        {(x,y) => x+y}, //加入新进入窗口的批次中的元素
        {(x,y) => x-y}, //移除离开窗口的老批次中的元素
        Seconds(30), //窗口时长
        Seconds(10) //滑动步长
    )
    • java
    class ExtractIp extends PairFunction<ApacheAccessLog, String, Long> {
        public Tuple2<String, Long> call(ApacheAccessLog entry) {
            return new Tuple2(entry.getIpAddress(), 1L);
        }
    }
    
    class AddLongs extends Function2<Long, Long, Long>() {
        public Long call(Long v1, Long v2) {
            return v1 + v2;
        }
    }
    
    class SubtractLongs extends Function2<Long, Long, Long>() {
        public Long call(Long v1, Long v2) {
            return v1 - v2;
        }
    }
    
    JavaPairDStream<String, Long> ipAddressPairDStream = accessLogsDStream.mapToPair(new ExtractIp());
    JavaPairDStream<String, Long> ipCountDStream = ipAddressPairDStream.reduceByKeyAndWindow(
        new AddLongs(), //加上新进入窗口的批次中的元素
        new SubtractLongs(), //移除离开窗口的老批次中的元素
        Durations.seconds(30), //窗口时长
        Durations.seconds(10) //滑动步长
    )
  • 使用countByWindow和countByValueAndWindow对窗口计数
    • scala

      val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()}
      val ipAddre***equestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10))
      val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))
    • java
    JavaDStream<String> ip = accessLogsDStream.map(new Function<ApacheAccessLog, String>() {
       public String call(ApacheAccessLog entry) {
            return entry.getIpAddress();
       }
    });
    
    JavaDStream<Long> requestCount = accessLogsDStream.countByWindow(Dirations.seconds(30), Durations.seconds(10));
    JavaPairDStream<String, Long> ipAddre***equestCount = ip.countByValueAndWindow(Dirations.seconds(30), Durations.seconds(10));

updateStateByKey转化操作

简介

  updateStateByKey提供了跨批次维护状态的功能,用于键值对形式的DStream。

  updateStateByKey提供了一个update(events, oldState)函数,接收与某键相关的事件及该键之前对应的状态,返回该键对应的新状态。

  • events:当前批次中收到的事件列表
  • oldState:一个可选的状态对象,存放在Option内;如果一个键没有之前的状态,这个值为空。
  • newState:由函数返回,也以Option形式存在;可以返回一个空的Option表示删除该状态。
简单案例

  使用updateStateByKey()跟踪日志消息中各HTTP响应代码的计数。

  • scala
def updateRunningSum(values: Seq[Long], state: Option[Long]) = {
    Some(state.getOrElse(0L) + values.size)
}

val responseCodeDStream = accessLogsDStream.map(log => (log.getResponseCode(), 1L))
val responseCodeCountDStream = responseCodeDStream.updateStateByKey(updateRunningSum _)
  • java
class UpdateRunningSum implements Function2<List<Long>, Optional<Long>, Optional<Long>> {
    public Optional<Long> call(List<Long> nums, Optional<Long> current) {
        long sum = current.or(0L);
        return Optional.of(sum + nums.size());
    }
};

JavaPairDStream<Integer, Long> responseCodeCountDStream = accessLogsDStream.mapToPair(
    new PairFunction<ApacheAccessLog, Integer, Long>() {
        public Tuple2<Integer, Long> call(ApacheAccessLog log) {
            return new Tuple2(log.getResponseCode(), 1L);
        }
    }
).updateStateByKey(new UpdateRunningSum());

DStream 行动操作

  DStream行动操作同RDD的行动操作。比如,将DStream保存为SequenceFile文件。

  • scala
val writableIpAddre***equestCount = ipAddre***equestCount.map{
    (ip, count) => <new Text(ip), new LongWritable(count))
}

writableIpAddre***equestCount.saveAsHadoopFiles[SequenceFileOutputFormat[Text, LongWritable]]("outputDir", "txt")
}
  • java
JavaPairDStream<Text, LongWritable> writableDStream = ipDStream.mapToPair(
    new PairFunction<Tuple2<String, Long>, Text, LongWritable>() {
        public Tuple2<Text, LongWritable> call(Tuple2<String, Long> e) {
            return new Tuple2(new Text(e._1()), new LongWritable(e._2()));
        }
    }
);

writableDStream.saveAsHadoopFiles("outputDir", "txt", Text.class, LongWritable.class, SequenceFileOutputFormat.class);

原文地址:http://blog.51cto.com/12967015/2172874

时间: 2024-07-30 22:16:04

13.spark streaming之快速入门的相关文章

10.spark sql之快速入门

前世今生 Hive&Shark ??随着大数据时代的来临,Hadoop风靡一时.为了使熟悉RDBMS但又不理解MapReduce的技术人员快速进行大数据开发,Hive应运而生.Hive是当时唯一运行在Hadoop上的SQL-on-Hadoop工具. ??但是MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的I/O,降低的运行效率.为了提高SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具开始产生,其中表现较为突出的是: MapR的Drill Cloudera的Im

Expression Blend实例中文教程(13) - 控件模板快速入门ControlTemplates

上篇,介绍了控件样式(Style)和模板(Template)的基础概念,并且演示了使用Blend设计控件样式.本篇将继续介绍使用Blend设计自定义控件模板 - ControlTemplate.ControlTemplate可以称为控件模板,简单的理解为控件结构和行为的集合.在项目设计中,经常会使用ControlTemplate创建新的控件模板或者修改现成的控件模板,使用项目的UI具有独特性.如何使用Blend创建和修改ControlTemplate是本文即将讨论的话题. 概述 对于Contro

14.spark mllib之快速入门

简介 ??MLlib是Spark提供提供机器学习的库,专为在集群上并行运行的情况而设计.MLlib包含很多机器学习算法,可在Spark支持的所有编程语言中使用. ??MLlib设计理念是将数据以RDD的形式表示,然后在分布式数据集上调用各种算法.其实,MLlib就是RDD上一系列可供调用的函数的集合. 数据类型 ??MLlib包含一些特有的数据类型,位于org.apache.spark.mllib包(Java/Scala)或pyspark.mllib(Python)中.主要的几个类有: Vect

Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming介绍

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送–Spark入门实战系列>获取 1 Spark Streaming简介 1.1 概述 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理.支持从多种数据源获取数据,包括Kafk.Flume.Twitter.ZeroMQ.Kinesis 以及TCP sockets,从数据源获取数据之后,可以使用诸如map.reduce.join和window等高级函数进行复杂算法的处理

Spark入门实战系列--7.Spark Streaming(下)--实时流计算Spark Streaming实战

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.实例演示 1.1 流数据模拟器 1.1.1 流数据说明 在实例演示中模拟实际情况,需要源源不断地接入流数据,为了在演示过程中更接近真实环境将定义流数据模拟器.该模拟器主要功能:通过Socket方式监听指定的端口号,当外部程序通过该端口连接并请求数据时,模拟器将定时将指定的文件数据随机获取发送给外部程序. 1.1.2 模拟器代码 import java.io.{PrintWriter} impor

C#forUnity快速入门(连载13)-C#结构体

C# for Unity编程语言快速入门教程(连载13)_C#结构体 C#的"结构体"使用Struct 关键字来定义,是与"类"."接口"并列的编程单位.其设计的主要目的是自定义"值类型",即允许用户自定义值类型. 适用范围:   结构适合一些小型的数据结构,这些结构包含的数据以创建后不修改的数据为主.也适合数据打包,一次性定义多个变量. 结构体的限制很多.1> 除非字段声明为const .static,否则无法初始化.

Spark入门实战系列--7.Spark Streaming(下)--Spark Streaming实战

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送–Spark入门实战系列>获取 1 实例演示 1.1 流数据模拟器 1.1.1 流数据说明 在实例演示中模拟实际情况,需要源源不断地接入流数据,为了在演示过程中更接近真实环境将定义流数据模拟器.该模拟器主要功能:通过Socket方式监听指定的端口号,当外部程序通过该端口连接并请求数据时,模拟器将定时将指定的文件数据随机获取发送给外部程序. 1.1.2 模拟器代码 import java.io.{PrintWriter} import

Spark Streaming入门

1. Spark Streaming入门 1. 概述 Spark Streaming is an extension of the core Spark API that enables scalable(Spark Streaming是基于Spark Core的扩展) high-throughput(高可用) fault-tolerant(容错) stream processing of live data streams(作用在实时数据流上) Spark Streaming: 将不同的数据源

Spark修炼之道(进阶篇)——Spark入门到精通:第十三节 Spark Streaming—— Spark SQL、DataFrame与Spark Streaming

主要内容 Spark SQL.DataFrame与Spark Streaming 1. Spark SQL.DataFrame与Spark Streaming 源码直接参照:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala import org.apache.spark.SparkConf