学习Spark2.0中的Structured Streaming(一)

转载自:http://lxw1234.com/archives/2016/10/772.htm

Spark2.0新增了Structured Streaming,它是基于SparkSQL构建的可扩展和容错的流式数据处理引擎,使得实时流式数据计算可以和离线计算采用相同的处理方式(DataFrame&SQL)。Structured Streaming顾名思义,它将数据源和计算结果都映射成一张”结构化”的表,在计算的时候以结构化的方式去操作数据流,大大方便和提高了数据开发的效率。

Spark2.0之前,流式计算通过Spark Streaming进行:

使用Spark Streaming每次只能消费当前批次内的数据,当然可以通过window操作,消费过去一段时间(多个批次)内的数据。举个简例子,需要每隔10秒,统计当前小时的PV和UV,在数据量特别大的情况下,使用window操作并不是很好的选择,通常是借助其它如Redis、HBase等完成数据统计。

Structured Streaming将数据源和计算结果都看做是无限大的表,数据源中每个批次的数据,经过计算,都添加到结果表中作为行。

先试试官方给的例子,在本地启动NetCat: nc -lk 9999

在另一个会话中:

cd $SPARK_HOME/bin
./spark-shell(以local模式进入spark-shell命令行),运行下面的程序:
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()

    import spark.implicits._
    val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()

    val words = lines.as[String].flatMap(_.split(" "))
    val wordCounts = words.groupBy("value").count()

    val query = wordCounts.writeStream.outputMode("complete").format("console").start()
    query.awaitTermination()

在NetCat会话中输入”apache spark”,spark-shell中显示:

在NetCat会话中分两次再输入”apache hadoop”,”lxw1234.com hadoop spark”, spark-shell中显示:

可以看到,每个Batch显示的结果,都是完整的WordCount统计结果,这便是结算结果输出中的完整模式(Complete Mode)。

关于结算结果的输出,有三种模式:

  1. Complete Mode:输出最新的完整的结果表数据。
  2. Append Mode:只输出结果表中本批次新增的数据,其实也就是本批次中的数据;
  3. Update Mode(暂不支持):只输出结果表中被本批次修改的数据;

这些Output,可以直接通过连接器(如MySQL JDBC、HBase API等)写入外部存储系统。

再看看Append模式,

注意:Append模式不支持基于数据流上的聚合操作(Append output mode not supported when there
are streaming aggregations on streaming DataFrames/DataSets);

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()

    import spark.implicits._
    val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()

    val words = lines.as[String].flatMap(_.split(" "))

    val query = words.writeStream.outputMode("append").format("console").start()
    query.awaitTermination()
     

在NetCat中分三次输入:

apache spark

apache hadoop

lxw1234.com hadoop spark

spark-shell中显示:

只有当前批次的数据。

时间: 2024-10-11 15:11:45

学习Spark2.0中的Structured Streaming(一)的相关文章

Apache Spark 2.2.0 中文文档 - Structured Streaming 编程指南 | ApacheCN

Structured Streaming 编程指南 概述 快速示例 Programming Model (编程模型) 基本概念 处理 Event-time 和延迟数据 容错语义 API 使用 Datasets 和 DataFrames 创建 streaming DataFrames 和 streaming Datasets Input Sources (输入源) streaming DataFrames/Datasets 的模式接口和分区 streaming DataFrames/Dataset

IBM专家亲自解读 Spark2.0 操作指南

Spark 背景介绍 1.什么是Spark 在Apache的网站上,有非常简单的一句话,"Spark is a fast and general engine ",就是Spark是一个统一的计算引擎,而且突出了fast.那么具体是做什么的呢?是做large-scale的processing,即大数据的处理. "Spark is a fast and general engine for large-scale processing"这句话非常简单,但是它突出了Spa

Spark2.0机器学习系列之6:GBDT(梯度提升决策树)、GBDT与随机森林差异、参数调试及Scikit代码分析

概念梳理 GBDT的别称 GBDT(Gradient Boost Decision Tree),梯度提升决策树.     GBDT这个算法还有一些其他的名字,比如说MART(Multiple Additive Regression Tree),GBRT(Gradient Boost Regression Tree),Tree Net等,其实它们都是一个东西(参考自wikipedia – Gradient Boosting),发明者是Friedman. 研究GBDT一定要看看Friedman的pa

基于spark2.0整合spark-sql + mysql + parquet + HDFS

一.概述 spark 2.0做出的改变大家可以参考官网以及其他资料,这里不再赘述由于spark1.x的sqlContext在spark2.0中被整合到sparkSession,故而利用spark-shell客户端操作会有些许不同,具体如下文所述 二.spark额外配置 1. 正常配置不再赘述,这里如果需要读取MySQL数据,则需要在当前用户下的环境变量里额外加上JDBC的驱动jar包 例如我的是:mysql-connector-java-5.1.18-bin.jar 存放路径是$SPARK_HO

Spark2.x(六十):在Structured Streaming流处理中是如何查找kafka的DataSourceProvider?

本章节根据源代码分析Spark Structured Streaming(Spark2.4)在进行DataSourceProvider查找的流程,首先,我们看下读取流数据源kafka的代码: SparkSession sparkSession = SparkSession.builder().getOrCreate(); Dataset<Row> sourceDataset = sparkSession.readStream().format("kafka").option

Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN

Spark Streaming 编程指南 概述 一个入门示例 基础概念 依赖 初始化 StreamingContext Discretized Streams (DStreams)(离散化流) Input DStreams 和 Receivers(接收器) DStreams 上的 Transformations(转换) DStreams 上的输出操作 DataFrame 和 SQL 操作 MLlib 操作 缓存 / 持久性 Checkpointing Accumulators, Broadcas

MVC3.0 中Razor 学习

C# 的主要 Razor 语法规则 Razor 代码封装于 @{ ... } 中 行内表达式(变量和函数)以 @ 开头 代码语句以分号结尾 字符串由引号包围 C# 代码对大小写敏感 C# 文件的扩展名是 .cshtml MVC3.0 中Razor 学习 随着MVC3.0RTM版本的发布,最近将公司的项目从MVC2.0升级到MVC3.0.同时打算在MVC3中全面使用Razor模板引擎.现将Razor学习拿出来和大家分享,如果存在不足的地方欢迎您指出. 其实在使用<%= %>在html中调用C#代

有关OpenCV1.0中GUI命令的几个函数学习总结

1.修改窗口背景色或者光标形状 在OpenCV1.0版本利用函数int cvNamedWindow( const char* name, int flags )初始化创建一个窗口后,窗口的背景色是灰色,光标形状是十字线,通过如下方法改变这些窗口属性: 第一种方法是从源头直接修改.(1) 打开OpenCV安装目录下的_make文件夹,使用VC6.0打开opencv.dsw工程,打开文件”…\\highgui\\window_win32.cpp” 找到函数CV_IMPL int cvInitSyst

《从零开始学Swift》学习笔记(Day 7)——Swift 2.0中的print函数几种重载形式

原创文章,欢迎转载.转载请注明:关东升的博客 Swift 2.0中的print函数有4种重载形式: print(_:).输出变量或常量到控制台,并且换行. print(_:_:).输出变量或常量到指定类型的流中,并且换行. print(_:appendNewline:).输出变量或常量到控制台,appendNewline参数是布尔值,true表示换行,false表示不换行. print(_:_:appendNewline:).输出变量或常量指定类型的流中,appendNewline参数是布尔值,