[Spark]-结构化流之输出篇

5.结构化流的输出

 一旦定义好了streaming DataFrame/Dataset的最终结果,剩下的就是一些计算输出了.为此,必须使用 DataStreamWriter通过 Dataset.writeStream() 返回.此时必须以下一个或多个

   输出落地 的详细信息:  Data format, location 等等

   输出模式(Output mode)

   查询名称(Query name)  可选,指定用于标识的查询的唯一名称

   触发器间隔(Trigger interval):可选地,指定触发间隔。如果没有指定,系统将检查尽快获得新数据前处理完成。如果一个触发时间错过了,因为前面的处理还没有完成,则系统将触发立即处理

   检查点位置(Checkpoint location) 对于可以保证 end-to-end fault-tolerance (端对端容错)能力的某些 output sinks ,请指定系统将写入所有 checkpoint (检查点)信息的位置。

                   这应该是与 HDFS 兼容的容错文件系统中的目录.

  5.1 输出模式

    追加(append)(默认) 这是默认的模式.在上次触发后,只有新行添加到结果表(result-table)才会触发。

             这是只支持那些查询,行添加到结果表永远不会改变,因为这种模式保证每一行将只输出一次。例如,只有选择查询,地图,flatMap,过滤,加入等将支持附加模式

    完全(complete) 每次触发时都会将整个结果表(result-table)输出.例如聚合结果等

    更新(update) 只有结果表在上次被触发后被更新才会触发

    不同的查询支持不同的输出模式,具体见下:

查询类型   支持模式 描述
带聚合的查询 聚合带事件时间的水印 Append, Update, Complete
Append 使用水印来删除旧的聚集状态 窗口将在水印最终阈值的时候聚合.所以结果将在水印过期,最终结果确定的情况下添加进结果表

Update 使用水印来删除旧的聚集状态

Complete 不会使用水印来删除旧的聚集状态

不带水印的聚合 Complete, Update 由于没有使用水印,旧的聚集状态不会被删除.不支持append,因为对整数据的聚合结果会不断更新,不符合append的语义
mapGroupsWithState Update  
flatMapGroupsWithState 追加操作模式 Append flatMapGroupsWithState 之后允许 Aggregations (聚合)
  更新操作模式 Update flatMapGroupsWithState 之后不允许 Aggregations (聚合)。
带join的查询 Append join不支持Update,Complete模式
其它查询 Append, Update 不支持Complete.因为非聚合数据的结果表,全部保存进结果表时不允许的

  5.2 输出落地

      5.2.1 落地文件

    5.2.2 落地Kafka

    5.2.3 落地控制台(debug使用)

    5.2.4 落地Memory接收器(debug使用)

    5.2.5 foreach

       foreach落地,是用户自定义输出的一种方式.foreach需要用户实现 ForeachWriter 类.实际处理取决于用户如何实现.(foreach只能使用在Scala/Java中)

      它将在触发器(trigger)之后生成结果行时调用一个用户实现.

      注意: foreach的实现载体是多个executor中的某一个

      5.2.5.1 实现

        open: writer 初始化时执行(例如打开连接,开启事务等).它具有两个入参: version=>每个触发器单调递增的ID  partitionId =>分区ID.

           open将返回一个布尔值.当为false时,后续将不会产生任何调用.用户可以根据自己的逻辑,用这个返回值指出本次输出后续是否还有必要执行

        proccess:根据open的返回值决定是否需要执行.

        close:无论open返回什么值,close必然会执行.这里适合用户做一些资源回收操作

    5.2.6 输出落地可以承载的输出模式

接收器 支持的输出模式 可选项 容错机制 备注
文件 Append path 支持容错(有且仅有一次) 支持分区
kafka Append, Update, Complete 详见kafka专章 支持容错(最少一次)  
foreach Append, Update, Complete   取决于foreach的实现  
控制台 Append, Update, Complete numRows:每次打印的行数(默认20),truncate:输出太长时截断(默认true) 不支持容错  
内存 Append, Complete   不支持容错,但重启时重建整张结果表  

原文地址:https://www.cnblogs.com/NightPxy/p/9278881.html

时间: 2024-10-09 22:03:19

[Spark]-结构化流之输出篇的相关文章

[Spark]-结构化流之用法篇

4.用法 结构化流使用Datasets和DataFrames.从Spark2.0开始,Spark-SQL中的Datasets和DataFrames,就已经能很好表示静态(有界)数据,动态(无界)数据 4.1 数据源 结构化流提供了四种不中断数据源 file-system,kafka,socket.rate-source 4.1.1 socket 从一个socket连接中读取 UTF-8 的文本数据. <=注意这是一种不可容错的数据源,建议仅在测试环境中使用. 配置参数: host  连接地址  

[Spark]-结构化流之初始篇

1.什么是结构化流. 结构化流(Structured Streaming),是一种基于Spark-SQL引擎构建的,可容错的,可扩展的流处理引擎. 它以微批量计算的形式来表达流式计算,随着流式数据持续到达,它能持续的进行处理并更新最终计算结果. 它使用Spark-SQL带来的丰富的API,来表示流聚合(streaming aggregations),事件时间窗口( event-time windows),流到批处理连接(stream-to-batch joins)等,最终在同一个引擎(优化的Sp

[Spark]-结构化流之监控&amp;故障恢复篇

6 流的监控以及故障恢复 6.1.流的运行时数据    结构化流启动后返回的 StreamingQuery 对象. val query = df.writeStream.format("console").start() // get the query object query.id // get the unique identifier of the running query that persists across restarts from checkpoint data

spark结构化数据处理:Spark SQL、DataFrame和Dataset

本文讲解Spark的结构化数据处理,主要包括:Spark SQL.DataFrame.Dataset以及Spark SQL服务等相关内容.本文主要讲解Spark 1.6.x的结构化数据处理相关东东,但因Spark发展迅速(本文的写作时值Spark 1.6.2发布之际,并且Spark 2.0的预览版本也已发布许久),因此请随时关注Spark SQL官方文档以了解最新信息. 文中使用Scala对Spark SQL进行讲解,并且代码大多都能在spark-shell中运行,关于这点请知晓. 概述 相比于

[Spark]-结构化数据查询之数据源篇

7. 数据源 Spark-SQL 支持通过Dataframe接口对各种数据源进行操作 各种数据源的加载&保存 数据转换(relational transformations) 注册临时视图(temporary view),来允许SQL的形式直接对临时视图进行操作 7.1  数据源加载 Spark-SQL的默认数据源为parquet(spark.sql.sources.default设置),一些数据源加载的例子如下: /** * 加载parquet数据源 */ spark.read.load(&qu

【文智背后的奥秘】系列篇——结构化抽取平台

版权声明:本文由文智原创文章,转载请注明出处: 文章原文链接:https://www.qcloud.com/community/article/91 来源:腾云阁 https://www.qcloud.com/community 随着大数据时代的到来,一个大规模生成.分享.处理以及应用数据的时代正在开启.如果能将互联网上异源异构的非结构化或半结构化数据转换为更易处理的结构化数据,可以极大的降低获取数据的门槛,为信息检索和数据挖掘提供基础,更好的挖掘数据中蕴藏的价值. 单纯考虑网页这种半结构化数据

Hadoop经典案例Spark实现(七)——日志分析:分析非结构化文件

相关文章推荐 Hadoop经典案例Spark实现(一)--通过采集的气象数据分析每年的最高温度 Hadoop经典案例Spark实现(二)--数据去重问题 Hadoop经典案例Spark实现(三)--数据排序 Hadoop经典案例Spark实现(四)--平均成绩 Hadoop经典案例Spark实现(五)--求最大最小值问题 Hadoop经典案例Spark实现(六)--求最大的K个值并排序 Hadoop经典案例Spark实现(七)--日志分析:分析非结构化文件 1.需求:根据tomcat日志计算ur

Spark SQL是处理结构化的数据

Spark SQL是处理结构化的数据,可以存储在二维表中,类似数据库中的表一样存储数据 Spark1.x val sqlContext = new SparkContext(conf) val sqlContext = new SQLContext(sc) //将RDD和Schema信息关联到一起,1,RDD和case class 2,RDD和StructType //case class Person将RDD中的数据转换成case class 属性相对应的类型,然后设置到case class中

Salesforce开源TransmogrifAI:用于结构化数据的端到端AutoML库

AutoML 即通过自动化的机器学习实现人工智能模型的快速构建,它可以简化机器学习流程,方便更多人利用人工智能技术.近日,软件行业巨头 Salesforce 开源了其 AutoML 库 TransmogrifAI.Salesforce Einstein 数据科学高级总监 Shubha Nabar 在 Medium 上撰文介绍了该 AutoML 库,包括工作流程和设计原则等. GitHub 链接:https://github.com/salesforce/TransmogrifAI Transmo