Sparkstreaming and Kafka

简介

Kafka 0.10的Spark Streaming集成设计与0.8 Direct Stream方法类似。 它提供了简单的并行性,Kafka分区和Spark分区之间的1:1对应关系,以及对偏移量和元数据的访问。 但是,由于较新的集成使用新的Kafka消费者API而不是简单的API,所以在使用上存在显着差异。 这个版本的集成被标记为实验,所以API可能会有变化。

LINK(依赖)

对于使用SBT / Maven项目定义的Scala / Java应用程序,请将您的流应用程序与以下工件链接起来。

1 groupId = org.apache.spark
2 artifactId = spark-streaming-kafka-0-10_2.11
3 version = 2.2.0

不要在org.apache.kafka构件上手动添加依赖项(例如kafka-clients)。 spark-streaming-kafka-0-10已经具有适当的传递依赖性,不同的版本可能在诊断的方式上不兼容。

Creating a Direct Stream

请注意,导入的名称空间包含版本org.apache.spark.streaming.kafka010

 1 import org.apache.kafka.clients.consumer.ConsumerRecord
 2 import org.apache.kafka.common.serialization.StringDeserializer
 3 import org.apache.spark.streaming.kafka010._
 4 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
 5 import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
 6
 7 val kafkaParams = Map[String, Object](
 8   "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
 9   "key.deserializer" -> classOf[StringDeserializer],
10   "value.deserializer" -> classOf[StringDeserializer],
11   "group.id" -> "use_a_separate_group_id_for_each_stream",
12   "auto.offset.reset" -> "latest",
13   "enable.auto.commit" -> (false: java.lang.Boolean)
14 )
15
16 val topics = Array("topicA", "topicB")
17 val stream = KafkaUtils.createDirectStream[String, String](
18   streamingContext,
19   PreferConsistent,
20   Subscribe[String, String](topics, kafkaParams)
21 )
22
23 stream.map(record => (record.key, record.value))

流中的每个项目都是一个ConsumerRecord(消费者记录)

有关可能的kafkaParams,请参阅Kafka使用者配置文档。 如果您的Spark批处理持续时间大于默认Kafka心跳会话超时(30秒),请适当增加heartbeat.interval.ms和session.timeout.ms。 对于大于5分钟的批次,这将需要更改代理上的group.max.session.timeout.ms。 请注意,该示例将enable.auto.commit设置为false,有关讨论,请参阅下面的“存储偏移量”。

LocationStrategies(位置策略)

新的Kafka消费者API将预取消息到缓冲区中。因此,性能方面的原因是Spark集成将缓存的消费者保留在执行者上(而不是为每个批次重新创建它们),并且倾向于在具有相应使用者的主机位置上调度分区。

在大多数情况下,您应该使用LocationStrategies.PreferConsistent,如上所示。这会将分区平均分配给可用的执行者。如果您的执行者(executors )与您的Kafka经纪人(brokers)位于同一个主机上,请使用PreferBrokers,它将优先为该分区的Kafka领导安排分区。最后,如果分区间负载存在明显偏移,请使用PreferFixed。这允许您指定分区到主机的明确映射(任何未指定的分区将使用一致的位置)。

消费者缓存的默认最大大小为64.如果您希望处理多于(执行者数量为64 *)的Kafka分区,则可以通过spark.streaming.kafka.consumer.cache.maxCapacity更改此设置。

如果您想禁用Kafka使用者的缓存,则可以将spark.streaming.kafka.consumer.cache.enabled设置为false。可能需要禁用缓存来解决SPARK-19185中描述的问题。 SPARK-19185解决后,该属性可能会在更高版本的Spark中删除。

缓存由topicpartition和group.id键入,因此每次调用createDirectStream时都要使用一个单独的group.id。

ConsumerStrategies(消费者策略)

新的Kafka消费者API有许多不同的方式来指定主题,其中一些需要大量的后对象实例化设置。 ConsumerStrategies提供了一个抽象,允许Spark从检查点重新启动后即可获得正确配置的使用者。

ConsumerStrategies.Subscribe,如上所示,允许您订阅固定的主题集合。 SubscribePattern允许您使用正则表达式来指定感兴趣的主题。 请注意,与0.8集成不同,使用Subscribe或SubscribePattern应该在正在运行的流中添加分区。 最后,Assign允许你指定一个固定的分区集合。 所有这三种策略都有重载的构造函数,允许您指定特定分区的起始偏移量。

如果您具有以上选项不能满足的特定消费者设置需求,则ConsumerStrategy是可以扩展的公共类。

Creating an RDD

如果您有一个更适合批处理的用例,则可以为已定义的偏移范围创建一个RDD。

1 // Import dependencies and create kafka params as in Create Direct Stream above
2
3 val offsetRanges = Array(
4   // topic, partition, inclusive starting offset, exclusive ending offset
5   OffsetRange("test", 0, 0, 100),
6   OffsetRange("test", 1, 0, 100)
7 )
8
9 val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)

请注意,您不能使用PreferBrokers,因为如果没有流,则不存在驱动程序方面的消费者为您自动查找代理元数据。 如有必要,请使用PreferFixed与您自己的元数据查找。

Obtaining Offsets

1 stream.foreachRDD { rdd =>
2   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
3   rdd.foreachPartition { iter =>
4     val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
5     println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
6   }
7 }

请注意,HasOffsetRanges的类型转换只会在createDirectStream的结果中调用的第一个方法中完成,而不是稍后向下的一系列方法。 请注意,RDD分区与Kafka分区之间的一对一映射在任何混洗或重新分区的方法(例如, reduceByKey()或window()。

Storing Offsets

Kafka交付语义在失败的情况下取决于如何以及何时存储偏移量。 spark输出操作至少一次。 所以,如果你想要相当于一次的语义,你必须在幂等输出之后存储偏移量,或者在输出中存储原子事务的偏移量。 通过这种集成,您可以选择3个选项,以提高可靠性(和代码复杂度),以便如何存储偏移量。

Checkpoints

如果启用了Spark检查点,偏移量将被存储在检查点中。 这很容易启用,但也有缺点。 你的输出操作必须是幂等的,因为你将得到重复的输出; 转换不是一种选择。 此外,如果应用程序代码已更改,则无法从检查点恢复。 对于计划升级,可以通过在旧代码的同时运行新代码来缓解这种情况(因为无论如何输出需要是幂等的,它们不应该发生冲突)。 但是对于需要更改代码的意外故障,除非您有另外的方法来识别已知的良好起始偏移量,否则将会丢失数据。

Kafka itself

Kafka有一个偏移提交API,用于在特殊的Kafka主题中存储偏移量。 默认情况下,新的使用者将定期自动提交偏移量。 这几乎肯定不是你想要的,因为由消费者成功轮询的消息可能还没有导致Spark输出操作,导致未定义的语义。 这就是为什么上面的流示例将“enable.auto.commit”设置为false的原因。 但是,在知道输出已存储之后,可以使用commitAsync API将偏移量提交给Kafka。 与检查点相比,好处在于,无论应用程序代码如何变化,Kafka都是耐用的商店。 然而,Kafka不是转换型的,所以你的输出仍然是幂等的。

1 stream.foreachRDD { rdd =>
2   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
3
4   // some time later, after outputs have completed
5   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
6 }

和HasOffsetRanges一样,如果调用createDirectStream的结果,而不是在转换之后,转换为CanCommitOffsets将会成功。 commitAsync调用是线程安全的,但是如果你想要有意义的语义,则必须在输出之后进行。

Your own data store

对于支持事务的数据存储,即使在出现故障的情况下,也可以在同一个事务中保存偏移量,以保持两者同步。 如果仔细检测重复或跳过的偏移量范围,回滚事务将防止重复或丢失的消息影响结果。 这给出了相当于一次的语义。 甚至可以使用这种策略,即使对于通常难以产生幂等性的聚合产生的输出也是如此。

 1 // The details depend on your data store, but the general idea looks like this
 2
 3 // begin from the the offsets committed to the database
 4 val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
 5   new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
 6 }.toMap
 7
 8 val stream = KafkaUtils.createDirectStream[String, String](
 9   streamingContext,
10   PreferConsistent,
11   Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
12 )
13
14 stream.foreachRDD { rdd =>
15   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
16
17   val results = yourCalculation(rdd)
18
19   // begin your transaction
20
21   // update results
22   // update offsets where the end of existing offsets matches the beginning of this batch of offsets
23   // assert that offsets were updated correctly
24
25   // end your transaction
26 }

SSL / TLS

新的Kafka使用者支持SSL。 要启用它,请在传递给createDirectStream / createRDD之前适当地设置kafkaParams。 请注意,这仅适用于Spark和Kafkabroker之间的沟通; 您仍负责单独确保Spark节点间通信。

1 val kafkaParams = Map[String, Object](
2   // the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
3   "security.protocol" -> "SSL",
4   "ssl.truststore.location" -> "/some-directory/kafka.client.truststore.jks",
5   "ssl.truststore.password" -> "test1234",
6   "ssl.keystore.location" -> "/some-directory/kafka.client.keystore.jks",
7   "ssl.keystore.password" -> "test1234",
8   "ssl.key.password" -> "test1234"
9 )

Deploying

与任何Spark应用程序一样,spark-submit用于启动您的应用程序。

对于Scala和Java应用程序,如果您使用SBT或Maven进行项目管理,请将spark-streaming-kafka-0-10_2.11及其依赖项打包到应用程序JAR中。 确保spark-core_2.11和spark-streaming_2.11被标记为提供的依赖关系,因为这些已经存在于Spark安装中。 然后使用spark-submit启动您的应用程序(请参阅主编程指南中的部署)。

 
时间: 2024-10-31 20:27:55

Sparkstreaming and Kafka的相关文章

SparkStreaming与Kafka整合遇到的问题及解决方案

前言 最近工作中是做日志分析的平台,采用了sparkstreaming+kafka,采用kafka主要是看中了它对大数据量处理的高性能,处理日志类应用再好不过了,采用了sparkstreaming的流处理框架 主要是考虑到它本身是基于spark核心的,以后的批处理可以一站式服务,并且可以提供准实时服务到elasticsearch中,可以实现准实时定位系统日志. 实现 Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式. 一. 基于Receiver方式

sparkStreaming 读kafka的数据

目标:sparkStreaming每2s中读取一次kafka中的数据,进行单词计数. topic:topic1 broker list:192.168.1.126:9092,192.168.1.127:9092,192.168.1.128:9092 1.首先往一个topic中实时生产数据. 代码如下: 代码功能:每秒向topic1发送一条消息,一条消息里包含4个单词,单词之间用空格隔开. 1 package kafkaProducer 2 3 import java.util.HashMap 4

SparkStreaming消费kafka数据

概要:本例子为SparkStreaming消费kafka消息的例子,实现的功能是将数据实时的进行抽取.过滤.转换,然后存储到HDFS中. 实例代码 package com.fwmagic.test import com.alibaba.fastjson.{JSON, JSONException} import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf imp

SparkStreaming整合kafka的补充

(1)SparkStreaming 整合 kafka 两种方式对比 Direct 方式的优缺点分析 : 优点: 简化并行(Simplified Parallelism).不现需要创建以及 union 多输入源,Kafka topic 的partition 与 RDD 的 partition 一一对应. 高效(Efficiency).基于 Receiver-based 的方式保证数据零丢失(zero-data loss)需要配置 spark.streaming.receiver.writeAhea

spark-streaming读kafka数据到hive遇到的问题

在项目中使用spark-stream读取kafka数据源的数据,然后转成dataframe,再后通过sql方式来进行处理,然后放到hive表中, 遇到问题如下,hive-metastor在没有做高可用的情况下,有时候会出现退出,这个时候,spark streaminG的微批作业就会失败, 然后再启重动hive-metastore进程后,作业继续正常执行,数据就有丢失. 分析如下: 第一步,观察日志发现, 我原来的代码这么写的: xx.foreachRdd(rdd=> processRdd(rdd

第91讲:sparkStreaming基于kafka的Direct详解

1:Direct方式特点: 1)Direct的方式是会直接操作kafka底层的元数据信息,这样如果计算失败了,可以把数据重新读一下,重新处理.即数据一定会被处理.拉数据,是RDD在执行的时候直接去拉数据. 2)由于直接操作的是kafka,kafka就相当于你底层的文件系统.这个时候能保证严格的事务一致性,即一定会被处理,而且只会被处理一次.而Receiver的方式则不能保证,因为Receiver和ZK中的数据可能不同步,spark Streaming可能会重复消费数据,这个调优可以解决,但显然没

Spark实战(一)SparkStreaming集成Kafka

Spark Streaming + Kafka集成指南 Kafka项目在版本0.8和0.10之间引入了一个新的消费者API,因此有两个独立的相应Spark Streaming包可用.请选择正确的包, 请注意,0.8集成与后来的0.9和0.10代理兼容,但0.10集成与早期的代理不兼容. 注意:从Spark 2.3.0开始,不推荐使用Kafka 0.8支持. Spark Streaming从Kafka接收数据,转换为spark streaming中的数据结构Dstream.数据接收方式有两种 :1

第89课:SparkStreaming on Kafka之Kafka解析和安装实战

本篇博文将从以下方面组织内容: 1. Kafka解析 2. 消息组件Kafka 3. Kafka安装 实验搭建所需要的软件: kafka_2.10-0.9.0.1 Zookeeper集群已经安装好.在上一篇博文有安装步骤,不清楚的朋友可以参考下. 一:Kafka解析 1. Kafka是生产者和消费者模式中广播概念,Kafka也可以实现队列的方式. 2. Kafka不仅是一个消息中间键,还是一个存储系统,可以将流进来的数据存储一段时间.这就与传统的流式处理不一样,传统的流式处理处理完数据之后就消失

SparkStreaming结合Kafka使用

spark自带的example中就有streaming结合kafka使用的案例: $SPARK_HOME/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala 使用方法参见代码描述: Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads> <zkQuorum> is a

sparkStreaming读取kafka的两种方式

概述 Spark Streaming 支持多种实时输入源数据的读取,其中包括Kafka.flume.socket流等等.除了Kafka以外的实时输入源,由于我们的业务场景没有涉及,在此将不会讨论.本篇文章主要着眼于我们目前的业务场景,只关注Spark Streaming读取Kafka数据的方式. Spark Streaming 官方提供了两种方式读取Kafka数据: 一是Receiver-based Approach.该种读取模式官方最先支持,并在Spark 1.2提供了数据零丢失(zero-d