Spark Streaming 读取 Kafka 数据的两种方式

在Spark1.3之前,默认的Spark接收Kafka数据的方式是基于Receiver的,在这之后的版本里,推出了Direct Approach,现在整理一下两种方式的异同。

1. Receiver-based Approach

val kafkaStream = KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] )

2. Direct Approach (No Receivers)

val directKafkaStream = KafkaUtils.createDirectStream[

源码实现

1、 KafkaUtils.createStream

首先从源码层面来看,其主要调用栈顺序:

KafkaUtils.createStream---KafkaInputDStream--KafkaReceiver

KafkaReceiver类继承了Receiver,当Reciver被调用起来时,执行onStart()方法,MessageHandler负责将收到的数据进行存储。执行流程如下:

  1. 创建createStream,Receiver被调起执行
  2. 连接ZooKeeper,读取相应的Consumer、Topic配置信息等
  3. 通过consumerConnector连接到Kafka集群,收取指定topic的数据
  4. 创建KafkaMessageHandler线程池来对数据进行处理,通过ReceiverInputDStream中的方法,将数据转换成BlockRDD,供后续计算

2、 KafkaUtils.createDirectStream

主要调用栈顺序:

KafkaUtils.createDirectStream—> new DirectKafkaInputDStream

执行流程如下:

  1. 实例化KafkaCluster,根据用户配置的Kafka参数,连接Kafka集群
  2. 通过Kafka API读取Topic中每个Partition最后一次读的Offset
  3. 接收成功的数据,直接转换成KafkaRDD,供后续计算

原文地址:https://www.cnblogs.com/geek-sharing/p/9339681.html

时间: 2024-07-30 04:55:45

Spark Streaming 读取 Kafka 数据的两种方式的相关文章

spark streaming 接收kafka消息之一 -- 两种接收方式

源码分析的spark版本是1.6. 首先,先看一下 org.apache.spark.streaming.dstream.InputDStream 的 类说明: This is the abstract base class for all input streams. This class provides methods start() and stop() which is called by Spark Streaming system to start and stop receivi

spark streaming读取kakfka数据手动维护offset

在spark streaming读取kafka的数据中,spark streaming提供了两个接口读取kafka中的数据,分别是KafkaUtils.createDstream,KafkaUtils.createDirectStream,前者会自动把offset更新到zk中,默认会丢数据,效率低,后者不会经过zk,效率更高,需要自己手动维护offse,通过维护护offset写到zk中,保障数据零丢失,只处理一次,下面来看看KafkaUtils.createDirectStream的使用,我把z

关于Mysql删除表数据的两种方式对比

1.delete from table_name 一行一行删除,只删除表数据,auto_increament仍停留在最后一天数据的下一个值. 2.truncate table_name 快捷删除表数据.先删除整个表,然后重新建表结构.auto_increament从1开始. 关于Mysql删除表数据的两种方式对比,布布扣,bubuko.com

向服务器传json数据的两种方式

接触到了向服务器传JSON数据,那么怎么把参数以JSON的形式,传到服务器呢.下面来说说,json字符串的得到方法.一共有两种方式来得到Json的字符串.当然,向服务器上传,不能传字符串,是要转成NSData的,也就是二进制的形式.这个在此不讨论,只说,怎么得到json的字符串. - (void)viewDidLoad { [super viewDidLoad]; // Do any additional setup after loading the view. //第一种方法,是把要传的参数

【代码笔记】向服务器传JSON数据的两种方式

一,代码. - (void)viewDidLoad { [super viewDidLoad]; // Do any additional setup after loading the view. //第一种方法,是把要传的参数组合成一个数组,然后由JSONKit把字典转为字符串 NSDictionary* jsonDic = [NSDictionary dictionaryWithObjectsAndKeys:@"李华",@"name",@"女&quo

[微信小程序] 微信小程序下拉滚动选择器picker绑定数据的两种方式

微信小程序下拉滚动选择器picker绑定数据的两种方式  本地数据绑定和wx.request(OBJECT) json数据绑定 1.本地数据绑定 (对象数组) Page({ data:{ //户型 这是一个本地的对象,然后绑定到页面上 pic_array: [ { id: 13, name: '1室1厅1卫' }, { id: 14, name: '1室2厅1卫' }, { id: 15, name: '2室1厅1卫' }, { id: 16, name: '3室1厅2卫' }, { id: 1

php处理无限极分类数据的两种方式

开发过程中,经常会遇到处理无限分类数据,这里就介绍一下处理无限分类数据的两种方式,有不对的地方,还望多多指正. //测试数据 $array=array( ['id'=>1,'parent_id'=>0,'name'=>1], ['id'=>2,'parent_id'=>0,'name'=>2], ['id'=>3,'parent_id'=>1,'name'=>3], ['id'=>4,'parent_id'=>1,'name'=>4]

Spark RDD转换成DataFrame的两种方式

Spark SQL支持两种方式将现有RDD转换为DataFrame.第一种方法使用反射来推断RDD的schema并创建DataSet然后将其转化为DataFrame.这种基于反射方法十分简便,但是前提是在您编写Spark应用程序时就已经知道RDD的schema类型.第二种方法是通过编程接口,使用您构建的StructType,然后将其应用于现有RDD.虽然此方法很麻烦,但它允许您在运行之前并不知道列及其类型的情况下构建DataSet 方法如下 1.将RDD转换成Rows 2.按照第一步Rows的结

oracle数据库删除数据的两种方式

当表中的数据不需要是,则应该删除该数据,并释放所占用的空间; 删除表中的数据有delete和truncate两种方式,下面分别介绍: 一.delete语句 (1)有条件删除    语法格式:delete [from]  table_name  [where condition]; 如:删除users表中的userid为'001'的数据:delete from users where userid='001'; (2)无条件删除整个表数据 语法格式:delete  table_name; 如:删除