第90讲,Spark streaming基于kafka 以Receiver方式获取数据 原理和案例实战

1:SparkSteaming基于kafka获取数据的方式,主要有俩种,即Receiver和Derict,基于Receiver的方式,是sparkStreaming给我们提供了kafka访问的高层api的封装,而基于Direct的方式,就是直接访问,在sparkSteaming中直接去操作kafka中的数据,不需要前面的高层api的封装。而Direct的方式,可以对kafka进行更好的控制!同时性能也更好。

2:实际上做kafka receiver的时候,通过receiver来获取数据,这个时候,kafka receiver是使用的kafka高层次的comsumer api来实现的。receiver会从kafka中获取数据,然后把它存储到我们具体的Executor内存中。然后spark streaming也就是driver中,会根据这获取到的数据,启动job去处理。

3:注意事项:

1)在通过kafka receiver去获取kafka的数据,在正在获取数据的过程中,这台机器有可能崩溃了。如果来不及做备份,数据就会丢失,切换到另外一台机器上,也没有相关数据。这时候,为了数据安全,采用WAL的方式。write  ahead log,预写日志的方式会同步的将接收到的kafka数据,写入到分布式文件系统中。但是预写日志的方式消耗时间,所以存储时建议Memory_and_Disc,不要2.如果是写到hdfs,会自动做副本。如果是写到本地,这其实有个风险,就是如果这台机器崩溃了,再想恢复过来,这个是需要时间的。

2):我们的kafka receiver接收数据的时候,通过线程或者多线程的方式,kafka中的topic是以partition的方式存在的。sparkstreaming中的kafka receiver接收kafka中topic中的数据,也是通过线程并发的方式去获取的不同的partition,例如用五条线程同时去读取kafka中的topics中的不同的partition数据,这时你这个读取数据的并发线程数,和RDD实际处理数据的并发线程数是没任何关系的。因为获取数据时都还没产生RDD呢。RDD是Driver端决定产生RDD的。

3)默认情况下,一个Executor中是不是只有一个receiver去接收kafka中的数据。那能不能多找一些Executor去更高的并发度,就是使用更多的机器去接收数据,当然可以,基于kafa的api去创建更多的Dstream就可以了。很多的Dstream接收kafka不同topics中的不同的数据,最后你计算的时候,再把他优联就行了。其实这是非常灵活的,因为可以自由的组合。

kafka + spark streaming 集群

前提:

spark 安装成功,spark 1.6.0

zookeeper 安装成功

kafka 安装成功

启动集群和zookeeper和kafka

步骤:

1:创建topic为test

kafka-topics.sh --create --zookeeper master1:2181,work1:2181,work2:2181
--replication-factor 3 --partitions 1 --topic test

在worker1中启动kafka 生产者:

[email protected]:/usr/local/kafka_2.10-0.9.0.1# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

在worker2中启动消费者:

[email protected]:/usr/local/kafka_2.10-0.9.0.1# bin/kafka-console-consumer.sh --zookeeper master1:2181 --topic test

生产者生产的消息,消费者可以消费到。说明kafka集群没问题。进入下一步。

在master中启动spark-shell

./spark-shell --master  local[2] --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.0

笔者用的spark 是 1.6.0 ,读者根据自己版本调整。

shell中的逻辑代码(wordcount),启动完成,把下面代码直接丢进去:

import org.apache.spark.SparkConf

import org.apache.spark.streaming._

import org.apache.spark.streaming.kafka._

import org.apache.spark.streaming.kafka.KafkaUtils

import org.apache.spark.streaming.{Durations, StreamingContext}

val ssc = new StreamingContext(sc, Durations.seconds(5))

// 第二个参数是zk集群信息,zk的client host:port,生动的说明了kafka读取数据获取offset

//等元数据等信息,是从zk里面获取的。所以要连zk

// 第三个参数是Consumer groupID,随便写的

//第4个参数是消费的topic,以及并发读取topic中Partition的线程数,这个Map指定了你

//要消费什么topic,以及怎么消费topic

KafkaUtils.createStream(ssc, "master:2181,worker1:2181,worker2:2181", "StreamingWordCountSelfKafkaScala", Map("test" -> 1)).map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

生产者再生产消息:

spark streaming的反应:

返回worker2查看消费者

可见,groupId不一样,相互之间没有互斥。

上述是使用 createStream 方式链接kafka

还有更高效的方式,请使用createDirectStream

参考:

http://spark.apache.org/docs/latest/streaming-kafka-integration.html

时间: 2024-11-07 23:02:25

第90讲,Spark streaming基于kafka 以Receiver方式获取数据 原理和案例实战的相关文章

spark streaming集成kafka

Kakfa起初是由LinkedIn公司开发的一个分布式的消息系统,后成为Apache的一部分,它使用Scala编写,以可水平扩展和高吞吐率而被广泛使用.目前越来越多的开源分布式处理系统如Cloudera.Apache Storm.Spark等都支持与Kafka集成. Spark streaming集成kafka是企业应用中最为常见的一种场景. 一.安装kafka 参考文档: http://kafka.apache.org/quickstart#quickstart_createtopic 1.安

第89讲:Spark Streaming on Kafka解析和安装实战

本课分2部分讲解: 第一部分,讲解Kafka的概念.架构和用例场景: 第二部分,讲解Kafka的安装和实战. 由于时间关系,今天的课程只讲到如何用官网的例子验证Kafka的安装是否成功.后续课程会接着讲解如何集成Spark Streaming和Kafka. 一.Kafka的概念.架构和用例场景 http://kafka.apache.org/documentation.html#introdution 1.Kafka的概念 Apache Kafka是分布式发布-订阅消息系统.它最初由Linked

第89课:Spark Streaming on Kafka解析和安装实战

本课分2部分讲解: 第一部分,讲解Kafka的概念.架构和用例场景: 第二部分,讲解Kafka的安装和实战. 由于时间关系,今天的课程只讲到如何用官网的例子验证Kafka的安装是否成功.后续课程会接着讲解如何集成Spark Streaming和Kafka. 一.Kafka的概念.架构和用例场景 http://kafka.apache.org/documentation.html#introdution 1.Kafka的概念 Apache Kafka是分布式发布-订阅消息系统.它最初由Linked

spark streaming 对接kafka记录

spark streaming 对接kafka 有两种方式: 参考: http://group.jobbole.com/15559/ http://blog.csdn.net/kwu_ganymede/article/details/50314901 Approach 1: Receiver-based Approach 基于receiver的方案: 这种方式使用Receiver来获取数据.Receiver是使用Kafka的高层次Consumer API来实现的.receiver从Kafka中获

Spark Streaming和Kafka整合开发指南(一)

Apache Kafka是一个分布式的消息发布-订阅系统.可以说,任何实时大数据处理工具缺少与Kafka整合都是不完整的.本文将介绍如何使用Spark Streaming从Kafka中接收数据,这里将会介绍两种方法:(1).使用Receivers和Kafka高层次的API:(2).使用Direct API,这是使用低层次的KafkaAPI,并没有使用到Receivers,是Spark 1.3.0中开始引入的.这两种方法有不同的编程模型,性能特点和语义担保.下文将会一一介绍. 基于Receiver

【转】Spark Streaming和Kafka整合开发指南

基于Receivers的方法 这个方法使用了Receivers来接收数据.Receivers的实现使用到Kafka高层次的消费者API.对于所有的Receivers,接收到的数据将会保存在Spark executors中,然后由Spark Streaming启动的Job来处理这些数据. 然而,在默认的配置下,这种方法在失败的情况下会丢失数据,为了保证零数据丢失,你可以在Spark Streaming中使用WAL日志,这是在Spark 1.2.0才引入的功能,这使得我们可以将接收到的数据保存到WA

SBT 构建 spark streaming集成kafka (scala版本)

前言: 最近在研究spark 还有 kafka , 想通过kafka端获取的数据,利用spark streaming进行一些计算,但搭建整个环境着实不易,故特此写下该过程,分享给大家,希望大家可以少走点弯路,能帮到大家! 环境准备:    操作系统 : ubuntu14.04 LTS hadoop 2.7.1   伪分布式搭建 sbt-0.13.9 kafka_2.11-0.8.2.2 spark-1.3.1-bin-hadoop2.6 scala 版本 : 2.10.4 注: 请重视版本问题,

Spark Streaming使用Kafka保证数据零丢失

来自: https://community.qingcloud.com/topic/344/spark-streaming使用kafka保证数据零丢失 spark streaming从1.2开始提供了数据的零丢失,想享受这个特性,需要满足如下条件: 数据输入需要可靠的sources和可靠的receivers 应用metadata必须通过应用driver checkpoint WAL(write ahead log) 可靠的sources和receivers spark streaming可以通过

Spark Streaming和Kafka整合保证数据零丢失

当我们正确地部署好Spark Streaming,我们就可以使用Spark Streaming提供的零数据丢失机制.为了体验这个关键的特性,你需要满足以下几个先决条件: 1.输入的数据来自可靠的数据源和可靠的接收器: 2.应用程序的metadata被application的driver持久化了(checkpointed ); 3.启用了WAL特性(Write ahead log). 下面我将简单地介绍这些先决条件. 可靠的数据源和可靠的接收器 对于一些输入数据源(比如Kafka),Spark S