Flink与Spark Streaming在与kafka结合的区别!

本文主要是想聊聊flink与kafka结合。当然,单纯的介绍flink与kafka的结合呢,比较单调,也没有可对比性,所以的准备顺便帮大家简单回顾一下Spark Streaming与kafka的结合。

看懂本文的前提是首先要熟悉kafka,然后了解spark Streaming的运行原理及与kafka结合的两种形式,然后了解flink实时流的原理及与kafka结合的方式。

kafka

kafka作为一个消息队列,在企业中主要用于缓存数据,当然,也有人用kafka做存储系统,比如存最近七天的数据。kafka的基本概念请参考:kafka入门介绍

更多kafka的文章请关注浪尖公众号,阅读。

首先,我们先看下图,这是一张生产消息到kafka,从kafka消费消息的结构图。

当然, 这张图很简单,拿这张图的目的是从中可以得到的跟本节文章有关的消息,有以下两个:

1,kafka中的消息不是kafka主动去拉去的,而必须有生产者往kafka写消息。

2,kafka是不会主动往消费者发布消息的,而必须有消费者主动从kafka拉取消息。

spark Streaming结合kafka

Spark
Streaming现在在企业中流处理也是用的比较广泛,但是大家都知道其不是真正的实时处理,而是微批处理。

在spark 1.3以前,SPark
Streaming与kafka的结合是基于Receiver方式,顾名思义,我们要启动1+个Receiver去从kafka里面拉去数据,拉去的数据会每隔200ms生成一个block,然后在job生成的时候,取出该job处理时间范围内所有的block,生成blockrdd,然后进入Spark
core处理。

自Spark1.3以后,增加了direct Stream
API,这种呢,主要特点是去掉了Receiver,在生成job,去取rdd的时候,计算每个partition要取数据的offset范围,然后生成一个kafkardd,该rdd特点是与kafka的分区是一一对应的。

有上面的特点可以看出,Spark
Streaming是要生成rdd,然后进行处理的,rdd数据集我们可以理解为静态的,然每个批次,都会生成一个rdd,该过程就体现了批处理的特性,由于数据集时间段小,数据小,所以又称微批处理,那么就说明不是真正的实时处理。

flink结合kafka

大家都知道flink是真正的实时处理,他是基于事件触发的机制进行处理,而不是像spark
Streaming每隔若干时间段,生成微批数据,然后进行处理。那么这个时候就有了个疑问,在前面kafka小节中,我们说到了kafka是不会主动往消费者里面吐数据的,需要消费者主动去拉去数据来处理。那么flink是如何做到基于事件实时处理kafka的数据呢?在这里浪尖带着大家看一下源码,flink1.5.0为例。

1,flink与kafka结合的demo。

val
env=StreamExecutionEnvironment.getExecutionEnvironmentenv.getConfig.disableSysoutLoggingenv.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))// create
a checkpoint every 5 secondsenv.enableCheckpointing(5000)// make
parameters available in the web
interfaceenv.getConfig.setGlobalJobParameters(params)

// create a Kafka streaming source consumer for Kafka
0.10.xval
kafkaConsumer=new
FlinkKafkaConsumer010(?params.getRequired("input-topic"),?new SimpleStringSchema,?params.getProperties)

val
messageStream=env?.addSource(kafkaConsumer)?.map(in=> prefix +
in)

// create a Kafka producer for Kafka
0.10.xval
kafkaProducer=new
FlinkKafkaProducer010(?params.getRequired("output-topic"),?new SimpleStringSchema,?params.getProperties)

// write data into
KafkamessageStream.addSink(kafkaProducer)

env.execute("Kafka 0.10 Example")

从上面的demo可以看出,数据源的入口就是FlinkKafkaConsumer010,当然这里面只是简单的构建了一个对象,并进行了一些配置的初始化,真正source的启动是在其run方法中run方法的调用过程在这里不讲解,后面会出教程讲解。

首先看一下类的继承关系

public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T>
public class
FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T>

其中,run方法就在FlinkKafkaConsumerBase里,当然其中open方法里面对kafka相关内容进行里初始化。

从输入到计算到输出完整的计算链条的调用过程,后面浪尖会出文章介绍。在这里只关心flink如何从主动消费数据,然后变成事件处理机制的过程。

由于其FlinkKafkaConsumerBase的run比较长,我这里只看重要的部分,首先是会创建Kafka09Fetcher

KAFKA_CONSUMER_METRICS_GROUP),? ?
?useMetrics);

接着下面有段神器,

final AtomicReference<Exception> discoveryLoopErrorRef=new AtomicReference<>();
this.discoveryLoopThread =new Thread(new
Runnable() {
? @Override
? public void
run() {
? ? ?try {
? ? ? ? //
--------------------- partition discovery loop
---------------------

? ? ? ? List<KafkaTopicPartition>
discoveredPartitions;

? ? ? ?
// throughout the loop, we always eagerly
check if we are still running before
? ?
? ? // performing the next operation, so that we can escape the loop as soon as
possible

? ? ? ? while
(running) {
? ? ? ? ? ?if (LOG.isDebugEnabled()) {
? ?
? ? ? ? ? LOG.debug("Consumer subtask {} is trying to discover new partitions
...",
getRuntimeContext().getIndexOfThisSubtask());
? ? ? ? ?
?}

? ? ? ? ? ?try {
? ? ? ?
? ? ? discoveredPartitions=partitionDiscoverer.discoverPartitions();
? ? ? ? ?
?} catch
(AbstractPartitionDiscoverer.WakeupException |
AbstractPartitionDiscoverer.ClosedException e) {
? ? ? ? ? ? ? // the partition discoverer may have been closed or woken
up before or during the discovery;
? ? ?
? ? ? ? // this would only happen if the consumer was canceled; simply escape
the loop
? ? ? ? ? ? ? break;
? ? ? ? ?
?}

? ? ? ? ? ?// no need to add the
discovered partitions if we were closed during the meantime
? ? ? ? ? ?if
(running &&
!discoveredPartitions.isEmpty()) {
? ? ? ? ? ? ? kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
? ? ? ? ?
?}

? ? ? ? ? ?// do not waste any
time sleeping if we‘re not running anymore
? ? ? ? ? ?if
(running && discoveryIntervalMillis !=0) {
? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ?Thread.sleep(discoveryIntervalMillis);
? ? ? ? ? ? ?
} catch (InterruptedException iex)
{
? ? ? ? ? ? ? ? ?// may be interrupted if the
consumer was canceled midway; simply escape the loop
? ? ? ? ? ? ? ? ?break;
? ? ? ? ? ?
? }
? ? ? ? ? ?}
? ? ? ? }
? ? ?} catch (Exception e) {
? ? ? ? discoveryLoopErrorRef.set(e);
? ? ?}
finally {
? ? ? ? // calling cancel will also let the fetcher loop
escape
? ? ? ? // (if not running,
cancel() was already called)
? ? ? ?
if (running) {
? ? ? ? ? ?cancel();
? ? ? ?
}
? ? ?}
? }
}, "Kafka Partition Discovery for " +
getRuntimeContext().getTaskNameWithSubtasks());

它定义了一个线程池对象,去动态发现kafka新增的topic(支持正则形式指定消费的topic),或者动态发现kafka新增的分区。

接着肯定是启动动态发现分区或者topic线程,并且

kafkaFetcher.runFetchLoop();

//
--------------------------------------------------------------------

// make sure that
the partition discoverer is properly closedpartitionDiscoverer.close();discoveryLoopThread.join();

接着,我们进入


// kick off the actual Kafka consumer
consumerThread.start();

这个线程是在构建kafka09Fetcher的时候创建的

LOG,? ?
?handover,? ?
?kafkaProperties,? ? ?unassignedPartitionsQueue,? ?
?createCallBridge(),? ? ?getFetcherName() + " for " + taskNameWithSubtasks,? ?
?pollTimeout,? ? ?useMetrics,? ?
?consumerMetricGroup,? ? ?subtaskMetricGroup);

KafkaConsumerThread 继承自Thread,然后在其run方法里,首先看到的是


// this is the means to talk to FlinkKafkaConsumer‘s main thread
final Handover handover=this.handover;

这个handover的作用呢暂且不提,接着分析run方法里面内容

1,获取消费者


try {
?
this.consumer
=getConsumer(kafkaProperties);
}

2,检测分区并且会重分配新增的分区

? }?
else {? ? ?// if no assigned partitions block until we get at least
one? ? ?// instead of hot spinning this
loop. We rely on a fact that? ? ?//
unassignedPartitionsQueue will be closed on a shutdown, so? ? ?// we don‘t block indefinitely? ? ?newPartitions=unassignedPartitionsQueue.getBatchBlocking();? }?
if (newPartitions !=null) {? ?
?reassignPartitions(newPartitions);? }

3,消费数据

? }?
catch (WakeupException we) {? ? ?continue;?
}}

4,通过handover将数据发出去

? records=null;}

由于kafkaFetcher的runFetchLoop方法的分析,我们在这里继续

1,拉取handover.producer生产的数据

byte[],
byte[]> records=handover.pollNext();

2,数据格式整理,并将数据整理好后,逐个Record发送,将循环主动批量拉取kafka数据,转化为事件触发。


// get the records for each topic partition
for
(KafkaTopicPartitionState<TopicPartition> partition :
subscribedPartitionStates()) {

? List<ConsumerRecord<byte[],
byte[]>> partitionRecords=
? ? ? ?
records.records(partition.getKafkaPartitionHandle());

? for (ConsumerRecord<byte[],
byte[]> record : partitionRecords) {
? ? ?final T
value=deserializer.deserialize(
?
? ? ? ? ?record.key(), record.value(),
? ? ? ? ?
?record.topic(),
record.partition(),
record.offset());

? ? ?if
(deserializer.isEndOfStream(value))
{
? ? ? ? // end of stream
signaled
? ? ? ? running =false;
? ? ? ?
break;
? ? ?}

? ? ?// emit the actual record. this also updates offset state
atomically
? ? ?// and deals with
timestamps and watermark generation
? ?
?emitRecord(value, partition, record.offset(),
record);
? }
}

肯定会注意到这行代码emitRecord(value, partition, record.offset(),
record);,英语美文从这里开始flink变成事件触发的流引擎。

handover-枢纽

handover是在构建kafkaFetcher的时候构建的


this.handover =new Handover();

handover是一个工具,将一组数据或者异常从生产者线程传输到消费者线程。它高效的扮演了一个阻塞队列的特性。该类运行于flink kafka consumer,用来在kafkaConsumer 类和主线程之间转移数据和异常。

handover有两个重要方法,分别是:

1,producer

producer是将kafkaConusmer获取的数据发送出去,在KafkaConsumerThread中调用。代码如上

2,pollnext

从handover里面拉去下一条数据,会阻塞的,行为很像是从一个阻塞队列里面拉去数据。

综述

kafkaConsumer批量拉去数据,flink将其经过整理之后变成,逐个Record发送的事件触发式的流处理。这就是flink与kafka结合事件触发时流处理的基本思路。

重要的事情再说一遍:Flink支持动态发现新增topic或者新增partition哦。具体实现思路,前面有代码为证,后面会对比spark Streaming的这块(不支持动态发现新增kafka topic或者partition),来详细讲解。

推荐阅读:

Flink on yarn初步讲解

Flink并行度

原文地址:https://www.cnblogs.com/wangfengxia/p/9626887.html

时间: 2024-08-01 23:21:10

Flink与Spark Streaming在与kafka结合的区别!的相关文章

(转)用Flink取代Spark Streaming!知乎实时数仓架构演进

转:https://mp.weixin.qq.com/s/e8lsGyl8oVtfg6HhXyIe4A AI 前线导读:“数据智能” (Data Intelligence) 有一个必须且基础的环节,就是数据仓库的建设,同时,数据仓库也是公司数据发展到一定规模后必然会提供的一种基础服务.从智能商业的角度来讲,数据的结果代表了用户的反馈,获取结果的及时性就显得尤为重要,快速的获取数据反馈能够帮助公司更快的做出决策,更好的进行产品迭代,实时数仓在这一过程中起到了不可替代的作用. 更多优质内容请关注微信

Spark Streaming 实现读取Kafka 生产数据

在kafka 目录下执行生产消息命令: ./kafka-console-producer  --broker-list nodexx:9092 --topic  201609 在spark bin 目录下执行 ./run-example streaming.JavaDirectKafkaWordCount nodexx:9092, nodexx:9092 201609 import java.util.HashMap; import java.util.HashSet; import java.

Exactly-once Spark Streaming from Apache Kafka

这篇文章我已经看过两遍了.收获颇多,抽个时间翻译下,先贴个原文链接吧.也给自己留个任务 http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/

关于IDEA开发环境下的Kafka+Spark Streaming的classpath配置方式

一.前言 在使用Spark Streaming中的Kafka Direct API进行Kafka消费的过程中,通过spark-submit的方式提交jar包,会出现如下错误信息,提示无法找到KafkaUtils. Exceptionin thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$ at com.zhkmxx.scala.app.KafkaStream

整合Kafka到Spark Streaming——代码示例和挑战

作者Michael G. Noll是瑞士的一位工程师和研究员,效力于Verisign,是Verisign实验室的大规模数据分析基础设施(基础Hadoop)的技术主管.本文,Michael详细的演示了如何将Kafka整合到Spark Streaming中. 期间, Michael还提到了将Kafka整合到 Spark Streaming中的一些现状,非常值得阅读,虽然有一些信息在Spark 1.2版本中已发生了一些变化,比如HA策略: 通过Spark Contributor.Spark布道者陈超我

spark streaming从指定offset处消费Kafka数据

spark streaming从指定offset处消费Kafka数据 2017-06-13 15:19 770人阅读 评论(2) 收藏 举报 分类: spark(5) 原文地址:http://blog.csdn.net/high2011/article/details/53706446 首先很感谢原文作者,看到这篇文章我少走了很多弯路,转载此文章是为了保留一份供复习用,请大家支持原作者,移步到上面的连接去看,谢谢 一.情景:当Spark streaming程序意外退出时,数据仍然再往Kafka中

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十七)待整理

redis按照正则批量删除key redis客户端--jedis 在Spark结构化流readStream.writeStream 输入输出,及过程ETL Spark Structured Streaming入门编程指南 Structured Streaming 实现思路与实现概述 Spark结构式流编程指南 spark streaming重复消费kafka记录,需要删除checkpoint保存目录. 原文地址:https://www.cnblogs.com/yy3b2007com/p/9315

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十九)待整理

redis按照正则批量删除key redis客户端--jedis 在Spark结构化流readStream.writeStream 输入输出,及过程ETL Spark Structured Streaming入门编程指南 Structured Streaming 实现思路与实现概述 Spark结构式流编程指南 spark streaming重复消费kafka记录,需要删除checkpoint保存目录. Kafka 如何读取offset topic内容 (__consumer_offsets) 原

Spark Streaming 第一课:案例动手实战并在电光石火间理解其工作原理

http://spark.apache.org/docs/latest/streaming-programming-guide.html 第一个观点:一切都应该是流处理.无处不在. 一定要做流式处理 Flink生不逢时 你选择Storm唯一的理由就是能够做到毫秒级的响应 2016年DT 大数据梦工厂制定版要把spark streaming 的延迟速度控制在100ms以内. 结果就是流处理Spark Streaming一统天下. 因为: 1.他提供的storm API非常容易实现业务的逻辑 2.数