Structured Streaming教程(3) —— 与Kafka的集成

Structured Streaming最主要的生产环境应用场景就是配合kafka做实时处理,不过在Strucured Streaming中kafka的版本要求相对搞一些,只支持0.10及以上的版本。就在前一个月,我们才从0.9升级到0.10,终于可以尝试structured streaming的很多用法,很开心~

引入

如果是maven工程,直接添加对应的kafka的jar包即可:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

读取kafka的数据

以流的形式查询

读取的时候,可以读取某个topic,也可以读取多个topic,还可以指定topic的通配符形式:

读取一个topic

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

读取多个topic

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

读取通配符形式的topic组

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

以批的形式查询

关于Kafka的offset,structured streaming默认提供了几种方式:

设置每个分区的起始和结束值

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

配置起始和结束的offset值(默认)

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Schema信息

读取后的数据的Schema是固定的,包含的列如下:

Column Type 说明
key binary 信息的key
value binary 信息的value(我们自己的数据)
topic string 主题
partition int 分区
offset long 偏移值
timestamp long 时间戳
timestampType int 类型

source相关的配置

无论是流的形式,还是批的形式,都需要一些必要的参数:

  • kafka.bootstrap.servers kafka的服务器配置,host:post形式,用逗号进行分割,如host1:9000,host2:9000
  • assign,以json的形式指定topic信息
  • subscribe,通过逗号分隔,指定topic信息
  • subscribePattern,通过java的正则指定多个topic
    assign、subscribe、subscribePattern同时之中能使用一个。

其他比较重要的参数有:

  • startingOffsets, offset开始的值,如果是earliest,则从最早的数据开始读;如果是latest,则从最新的数据开始读。默认流是latest,批是earliest
  • endingOffsets,最大的offset,只在批处理的时候设置,如果是latest则为最新的数据
  • failOnDataLoss,在流处理时,当数据丢失时(比如topic被删除了,offset在指定的范围之外),查询是否报错,默认为true。这个功能可以当做是一种告警机制,如果对丢失数据不感兴趣,可以设置为false。在批处理时,这个值总是为true。
  • kafkaConsumer.pollTimeoutMs,excutor连接kafka的超时时间,默认是512ms
  • fetchOffset.numRetries,获取kafka的offset信息时,尝试的次数;默认是3次
  • fetchOffset.retryIntervalMs,尝试重新读取kafka offset信息时等待的时间,默认是10ms
  • maxOffsetsPerTrigger,trigger暂时不会用,不太明白什么意思。Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.

写入数据到Kafka

Apache kafka仅支持“至少一次”的语义,因此,无论是流处理还是批处理,数据都有可能重复。比如,当出现失败的时候,structured streaming会尝试重试,但是不会确定broker那端是否已经处理以及持久化该数据。但是如果query成功,那么可以断定的是,数据至少写入了一次。比较常见的做法是,在后续处理kafka数据时,再进行额外的去重,关于这点,其实structured streaming有专门的解决方案。

保存数据时的schema:

  • key,可选。如果没有填,那么key会当做null,kafka针对null会有专门的处理(待查)。
  • value,必须有
  • topic,可选。(如果配置option里面有topic会覆盖这个字段)

下面是sink输出必须要有的参数:

  • kafka.bootstrap.servers,kafka的集群地址,host:port格式用逗号分隔。

流处理的数据写入

// 基于配置指定topic
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

// 在字段中包含topic
val ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()

批处理的数据写入

跟流处理其实一样

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()

df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save()

kafka的特殊配置

针对Kafka的特殊处理,可以通过DataStreamReader.option进行设置。

关于(详细的kafka配置可以参考consumer的官方文档](http://kafka.apache.org/documentation.html#newconsumerconfigs)

以及kafka producer的配置

注意下面的参数是不能被设置的,否则kafka会抛出异常:

  • group.id kafka的source会在每次query的时候自定创建唯一的group id
  • auto.offset.reset 为了避免每次手动设置startingoffsets的值,structured streaming在内部消费时会自动管理offset。这样就能保证订阅动态的topic时不会丢失数据。startingOffsets在流处理时,只会作用于第一次启动时,之后的处理都会自定的读取保存的offset。
  • key.deserializer,value.deserializer,key.serializer,value.serializer 序列化与反序列化,都是ByteArraySerializer
  • enable.auto.commit kafka的source不会提交任何的offset
  • interceptor.classes 由于kafka source读取数据都是二进制的数组,因此不能使用任何拦截器进行处理。

参考

原文地址:https://www.cnblogs.com/xing901022/p/9141334.html

时间: 2024-10-07 01:55:12

Structured Streaming教程(3) —— 与Kafka的集成的相关文章

Structured Streaming教程(2) —— 常用输入与输出

上篇了解了一些基本的Structured Streaming的概念,知道了Structured Streaming其实是一个无下界的无限递增的DataFrame.基于这个DataFrame,我们可以做一些基本的select.map.filter操作,也可以做一些复杂的join和统计.本篇就着重介绍下,Structured Streaming支持的输入输出,看看都提供了哪些方便的操作. 数据源 Structured Streaming 提供了几种数据源的类型,可以方便的构造Steaming的Dat

Structured Streaming教程(1) —— 基本概念与使用

近年来,大数据的计算引擎越来越受到关注,spark作为最受欢迎的大数据计算框架,也在不断的学习和完善中.在Spark2.x中,新开放了一个基于DataFrame的无下限的流式处理组件--Structured Streaming,它也是本系列的主角,废话不多说,进入正题吧! 简单介绍 在有过1.6的streaming和2.x的streaming开发体验之后,再来使用Structured Streaming会有一种完全不同的体验,尤其是在代码设计上. 在过去使用streaming时,我们很容易的理解

Spark Structured Streaming框架(3)之数据输出源详解

Spark Structured streaming API支持的输出源有:Console.Memory.File和Foreach.其中Console在前两篇博文中已有详述,而Memory使用非常简单.本文着重介绍File和Foreach两种方式,并介绍如何在源码基本扩展新的输出方式. 1. File Structured Streaming支持将数据以File形式保存起来,其中支持的文件格式有四种:json.text.csv和parquet.其使用方式也非常简单只需设置checkpointLo

Spark Structured Streaming框架(4)之窗口管理详解

1. 结构 1.1 概述 Structured Streaming组件滑动窗口功能由三个参数决定其功能:窗口时间.滑动步长和触发时间. 窗口时间:是指确定数据操作的长度: 滑动步长:是指窗口每次向前移动的时间长度: 触发时间:是指Structured Streaming将数据写入外部DataStreamWriter的时间间隔. 图 11 1.2 API 用户管理Structured Streaming的窗口功能,可以分为两步完成: 1) 定义窗口和滑动步长 API是通过一个全局的window方法

Spark Structured Streaming框架(5)之进程管理

Structured Streaming提供一些API来管理Streaming对象.用户可以通过这些API来手动管理已经启动的Streaming,保证在系统中的Streaming有序执行. 1. StreamingQuery 在调用DataStreamWriter方法的start启动Streaming后,会返回一个StreamingQuery对象.所以用户就可以通过这个对象来管理Streaming. 如下所示: val query = df.writeStream.format("console

Spark Structured Streaming框架(2)之数据输入源详解

Spark Structured Streaming目前的2.1.0版本只支持输入源:File.kafka和socket. 1. Socket Socket方式是最简单的数据输入源,如Quick example所示的程序,就是使用的这种方式.用户只需要指定"socket"形式并配置监听的IP和Port即可. val scoketDF = spark.readStream .format("socket") .option("host","

Spark学习八:spark streaming与flume和kafka集成

Spark学习八:spark streaming与flume和kafka集成 标签(空格分隔): Spark Spark学习八spark streaming与flume和kafka集成 一Kafka 二flume和kafka的集成 三kafka和spark streaming的集成方式一kafka推送 四kafka和spark streaming的集成方式一spark streaam主动获取 五spark stream的高级应用updateStateByKey实现累加功能 六spark stre

Spark2.x(六十):在Structured Streaming流处理中是如何查找kafka的DataSourceProvider?

本章节根据源代码分析Spark Structured Streaming(Spark2.4)在进行DataSourceProvider查找的流程,首先,我们看下读取流数据源kafka的代码: SparkSession sparkSession = SparkSession.builder().getOrCreate(); Dataset<Row> sourceDataset = sparkSession.readStream().format("kafka").option

Apache Spark 2.2.0 中文文档 - Structured Streaming 编程指南 | ApacheCN

Structured Streaming 编程指南 概述 快速示例 Programming Model (编程模型) 基本概念 处理 Event-time 和延迟数据 容错语义 API 使用 Datasets 和 DataFrames 创建 streaming DataFrames 和 streaming Datasets Input Sources (输入源) streaming DataFrames/Datasets 的模式接口和分区 streaming DataFrames/Dataset