spark streaming集成kafka

Kakfa起初是由LinkedIn公司开发的一个分布式的消息系统,后成为Apache的一部分,它使用Scala编写,以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark等都支持与Kafka集成。

Spark streaming集成kafka是企业应用中最为常见的一种场景。

一、安装kafka

参考文档:

http://kafka.apache.org/quickstart#quickstart_createtopic

1、安装java

2、安装zookeeper集群

参考:http://www.cnblogs.com/wcwen1990/p/6652105.html

3、安装scala

4、安装kafka

下载kafka安装文件:

https://archive.apache.org/dist/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz

解压kafka安装包:

# tar -zxvf kafka_2.10-0.8.2.1.tgz -C /opt/cdh-5.3.6/

# chown -R hadoop:hadoop /opt/cdh-5.3.6/kafka_2.10-0.8.2.1/

删除kafka libs/zookeeper jar包,拷贝自己安装集群zookeeper jar包到kafka libs目录下:

$ rm libs/zookeeper-3.4.6.jar –rf

$ cp /opt/cdh-5.3.6/zookeeper-3.4.5-cdh5.3.6/zookeeper-3.4.5-cdh5.3.6.jar libs/

5、定义kafka配置文件

5.1)定义server.properties:

host.name=chavin.king

log.dirs=/opt/cdh-5.3.6/kafka_2.10-0.8.2.1/kafka-logs

zookeeper.connect=chavin.king:2181

定义producer.properties:

metadata.broker.list=chavin.king:9092

定义consumer.properties:

zookeeper.connect=chavin.king:2181

5.2)启动kafka server

$ bin/kafka-server-start.sh config/server.properties

$ jps

14020 NameNode

57749 Jps

14776 QuorumPeerMain

57690 Kafka

14507 NodeManager

14235 ResourceManager

14093 DataNode

14686 JobHistoryServer

57663 ZooKeeperMain

[zk: localhost:2181(CONNECTED) 3] ls /

[controller, controller_epoch, brokers, zookeeper, admin, consumers, config, hbase]

5.3)创建一个topic

$ bin/kafka-topics.sh --create --zookeeper chavin.king:2181 --replication-factor 1 --partitions 1 --topic test

$ bin/kafka-topics.sh --list --zookeeper chavin.king:2181

5.4)创建一个生产者,产生数据

$ bin/kafka-console-producer.sh --broker-list chavin.king:9092 --topic test

5.5)创建一个消费者,消费数据

$ bin/kafka-console-consumer.sh --zookeeper chavin.king:2181 --topic test --from-beginning

在生产者shell窗口输入数据,在消费者窗口可以看到数据输出到界面上。

二、spark streaming与kafka集成

参考文档:http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html

一)准备工作

1、编译spark,获得集成kafka jar包:

参考文档:http://www.cnblogs.com/wcwen1990/p/7688027.html

说明:spark streaming集成flume或者kafka需要一些支持jar包,这些jar包在编译spark过程中会自动在external目录下生成相应的jar文件,因此,这里需要编译spark来获得这些jar包。

Spark streaming集成kafka主要需要:spark-streaming-kafka_2.10-1.3.0.jar包。

2、集成相关jar包

$ cp external/kafka/target/spark-streaming-kafka_2.10-1.3.0.jar /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/

$ cp libs/kafka_2.10-0.8.2.1.jar libs/kafka-clients-0.8.2.1.jar libs/zkclient-0.3.jar libs/metrics-core-2.2.0.jar /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/

[externalLibs]$ ls

kafka_2.10-0.8.2.1.jar

kafka-clients-0.8.2.1.jar

metrics-core-2.2.0.jar

spark-streaming-kafka_2.10-1.3.0.jar

zkclient-0.3.jar

二)集成方式1:Receiver-based Approach

1、编写spark streaming集成kafka的wordcount

import java.util.HashMap

import org.apache.spark._

import org.apache.spark.streaming._

import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.streaming.kafka._

val ssc = new StreamingContext(sc, Seconds(5))

val topicMap = Map("test" -> 1)

// read data

val lines = KafkaUtils.createStream(ssc, "chavin.king:2181", "testWordCountGroup", topicMap).map(_._2)

val words = lines.flatMap(_.split(" "))

val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

wordCounts.print()

ssc.start() // Start the computation

ssc.awaitTermination() // Wait for the computation to terminate

2、spark-shell local模式启动,并运行步骤1程序

bin/spark-shell --master local[2] --jars \

/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/spark-streaming-kafka_2.10-1.3.0.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/kafka_2.10-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/kafka-clients-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/zkclient-0.3.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/metrics-core-2.2.0.jar

scala> import java.util.HashMap

import java.util.HashMap

scala> import org.apache.spark._

import org.apache.spark._

scala> import org.apache.spark.streaming._

import org.apache.spark.streaming._

scala> import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.streaming.StreamingContext._

scala> import org.apache.spark.streaming.kafka._

import org.apache.spark.streaming.kafka._

scala> val ssc = new StreamingContext(sc, Seconds(5))

ssc: org.apache.spark.streaming.StreamingContext = [email protected]

scala> val topicMap = Map("test" -> 1)

topicMap: scala.collection.immutable.Map[String,Int] = Map(test -> 1)

scala> val lines = KafkaUtils.createStream(ssc, "chavin.king:2181", "testWordCountGroup", topicMap).map(_._2)

lines: org.apache.spark.streaming.dstream.DStream[String] = [email protected]

scala>

scala> val words = lines.flatMap(_.split(" "))

words: org.apache.spark.streaming.dstream.DStream[String] = [email protected]

scala> val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = [email protected]

scala> wordCounts.print()

scala> ssc.start()

scala>ssc.awaitTermination()

3、测试

在kafka生产者shell端输入:

hadoop oracle mysql mysql mysql

这是我们在kafka消费者端可以看到如下输出:

hadoop oracle mysql mysql mysql

同时在spark streaming端也可以看到如下输出:

-------------------------------------------

Time: 1500021590000 ms

-------------------------------------------

(mysql,3)

(oracle,1)

(hadoop,1)

三)集成方式2:Direct Approach (No Receivers)

1、编写spark streaming集成kafka的wordcount

import kafka.serializer.StringDecoder

import org.apache.spark._

import org.apache.spark.streaming._

import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.streaming.kafka._

val ssc = new StreamingContext(sc, Seconds(5))

val kafkaParams = Map[String, String]("metadata.broker.list" -> "chavin.king:9092")

val topicsSet = Set("test")

// read data

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

val lines = messages.map(_._2)

val words = lines.flatMap(_.split(" "))

val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

wordCounts.print()

ssc.start() // Start the computation

ssc.awaitTermination() // Wait for the computation to terminate

2、spark-shell local模式启动,并运行步骤1程序

bin/spark-shell --master local[2] --jars \

/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/spark-streaming-kafka_2.10-1.3.0.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/kafka_2.10-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/kafka-clients-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/zkclient-0.3.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/metrics-core-2.2.0.jar

scala> import kafka.serializer.StringDecoder

import kafka.serializer.StringDecoder

scala> import org.apache.spark._

import org.apache.spark._

scala> import org.apache.spark.streaming._

import org.apache.spark.streaming._

scala> import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.streaming.StreamingContext._

scala> import org.apache.spark.streaming.kafka._

import org.apache.spark.streaming.kafka._

scala>

scala> val ssc = new StreamingContext(sc, Seconds(5))

ssc: org.apache.spark.streaming.StreamingContext = [email protected]

scala>

scala> val kafkaParams = Map[String, String]("metadata.broker.list" -> "chavin.king:9092")

kafkaParams: scala.collection.immutable.Map[String,String] = Map(metadata.broker.list -> chavin.king:9092)

scala> val topicsSet = Set("test")

topicsSet: scala.collection.immutable.Set[String] = Set(test)

scala>

scala> // read data

scala> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

17/07/14 16:59:31 INFO VerifiableProperties: Verifying properties

17/07/14 16:59:31 INFO VerifiableProperties: Property group.id is overridden to

17/07/14 16:59:31 INFO VerifiableProperties: Property zookeeper.connect is overridden to

messages: org.apache.spark.streaming.dstream.InputDStream[(String, String)] = [email protected]0

scala>

scala> val lines = messages.map(_._2)

lines: org.apache.spark.streaming.dstream.DStream[String] = [email protected]

scala> val words = lines.flatMap(_.split(" "))

words: org.apache.spark.streaming.dstream.DStream[String] = [email protected]

scala> val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = [email protected]

scala> wordCounts.print()

scala> ssc.start()

scala>ssc.awaitTermination()

3、测试

在kafka生产者shell端输入:

hadoop oracle mysql mysql mysql

这是我们在kafka消费者端可以看到如下输出:

hadoop oracle mysql mysql mysql

同时在spark streaming端也可以看到如下输出:

-------------------------------------------

Time: 1500021590000 ms

-------------------------------------------

(mysql,3)

(oracle,1)

(hadoop,1)

至此,spark streaming集成kafka两种方式演示OK。但是通过上述案例我们可以发现,目前的spark streaming仅仅对每次的输入值进行一次计算,而企业应用中,可能更需要将多次的输入值进行累加,那么该怎么实现呢?看下面案例?

四)使用UpdataStateByKey实现spark streaming多次输入值的累加操作

1、创建文件udsb.scala文件,输入如下内容:

$ cat udsb.scala

import kafka.serializer.StringDecoder

import org.apache.spark._

import org.apache.spark.streaming._

import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.streaming.kafka._

val ssc = new StreamingContext(sc, Seconds(5))

ssc.checkpoint(".")

val kafkaParams = Map[String, String]("metadata.broker.list" -> "chavin.king:9092")

val topicsSet = Set("test")

val updateFunc = (values: Seq[Int], state: Option[Int]) => {

val currentCount = values.sum

val previousCount = state.getOrElse(0)

Some(currentCount + previousCount)

}

// read data

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

val lines = messages.map(_._2)

val words = lines.flatMap(_.split(" "))

val wordDstream = words.map(x => (x, 1))

val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)

stateDstream.print()

ssc.start()

ssc.awaitTermination()

2、spark-shell local模式启动,并运行步骤1程序

bin/spark-shell --master local[2] --jars \

/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/spark-streaming-kafka_2.10-1.3.0.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/kafka_2.10-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/kafka-clients-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/zkclient-0.3.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externalLibs/metrics-core-2.2.0.jar

scala> :load /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/udsb.scala

3、测试

在kafka生产者shell端输入:

3.1)第一次输入:hadoop oracle mysql

Spark streaming端可以看到如下输出:

-------------------------------------------

Time: 1500023985000 ms

-------------------------------------------

(mysql,1)

(oracle,1)

(hadoop,1)

3.2)第二次输入:hadoop oracle mysql

Spark streaming端可以看到如下输出:

-------------------------------------------

Time: 1500023985000 ms

-------------------------------------------

(mysql,2)

(oracle,2)

(hadoop,2)

3.3)第三次输入:hadoop oracle mysql

Spark streaming端可以看到如下输出:

-------------------------------------------

Time: 1500023985000 ms

-------------------------------------------

(mysql,3)

(oracle,3)

(hadoop,3)

时间: 2024-10-07 22:26:03

spark streaming集成kafka的相关文章

SBT 构建 spark streaming集成kafka (scala版本)

前言: 最近在研究spark 还有 kafka , 想通过kafka端获取的数据,利用spark streaming进行一些计算,但搭建整个环境着实不易,故特此写下该过程,分享给大家,希望大家可以少走点弯路,能帮到大家! 环境准备:    操作系统 : ubuntu14.04 LTS hadoop 2.7.1   伪分布式搭建 sbt-0.13.9 kafka_2.11-0.8.2.2 spark-1.3.1-bin-hadoop2.6 scala 版本 : 2.10.4 注: 请重视版本问题,

第89课:Spark Streaming on Kafka解析和安装实战

本课分2部分讲解: 第一部分,讲解Kafka的概念.架构和用例场景: 第二部分,讲解Kafka的安装和实战. 由于时间关系,今天的课程只讲到如何用官网的例子验证Kafka的安装是否成功.后续课程会接着讲解如何集成Spark Streaming和Kafka. 一.Kafka的概念.架构和用例场景 http://kafka.apache.org/documentation.html#introdution 1.Kafka的概念 Apache Kafka是分布式发布-订阅消息系统.它最初由Linked

第89讲:Spark Streaming on Kafka解析和安装实战

本课分2部分讲解: 第一部分,讲解Kafka的概念.架构和用例场景: 第二部分,讲解Kafka的安装和实战. 由于时间关系,今天的课程只讲到如何用官网的例子验证Kafka的安装是否成功.后续课程会接着讲解如何集成Spark Streaming和Kafka. 一.Kafka的概念.架构和用例场景 http://kafka.apache.org/documentation.html#introdution 1.Kafka的概念 Apache Kafka是分布式发布-订阅消息系统.它最初由Linked

Spark Streaming使用Kafka保证数据零丢失

来自: https://community.qingcloud.com/topic/344/spark-streaming使用kafka保证数据零丢失 spark streaming从1.2开始提供了数据的零丢失,想享受这个特性,需要满足如下条件: 数据输入需要可靠的sources和可靠的receivers 应用metadata必须通过应用driver checkpoint WAL(write ahead log) 可靠的sources和receivers spark streaming可以通过

Spark Streaming和Kafka整合保证数据零丢失

当我们正确地部署好Spark Streaming,我们就可以使用Spark Streaming提供的零数据丢失机制.为了体验这个关键的特性,你需要满足以下几个先决条件: 1.输入的数据来自可靠的数据源和可靠的接收器: 2.应用程序的metadata被application的driver持久化了(checkpointed ); 3.启用了WAL特性(Write ahead log). 下面我将简单地介绍这些先决条件. 可靠的数据源和可靠的接收器 对于一些输入数据源(比如Kafka),Spark S

spark streaming 对接kafka记录

spark streaming 对接kafka 有两种方式: 参考: http://group.jobbole.com/15559/ http://blog.csdn.net/kwu_ganymede/article/details/50314901 Approach 1: Receiver-based Approach 基于receiver的方案: 这种方式使用Receiver来获取数据.Receiver是使用Kafka的高层次Consumer API来实现的.receiver从Kafka中获

Spark Streaming、Kafka结合Spark JDBC External DataSouces处理案例

场景:使用Spark Streaming接收Kafka发送过来的数据与关系型数据库中的表进行相关的查询操作: Kafka发送过来的数据格式为:id.name.cityId,分隔符为tab 1 zhangsan 1 2 lisi 1 3 wangwu 2 4 zhaoliu 3 MySQL的表city结构为:id int, name varchar 1 bj 2 sz 3 sh 本案例的结果为:select s.id, s.name, s.cityId, c.name from student s

Spark Streaming和Kafka整合开发指南(一)

Apache Kafka是一个分布式的消息发布-订阅系统.可以说,任何实时大数据处理工具缺少与Kafka整合都是不完整的.本文将介绍如何使用Spark Streaming从Kafka中接收数据,这里将会介绍两种方法:(1).使用Receivers和Kafka高层次的API:(2).使用Direct API,这是使用低层次的KafkaAPI,并没有使用到Receivers,是Spark 1.3.0中开始引入的.这两种方法有不同的编程模型,性能特点和语义担保.下文将会一一介绍. 基于Receiver

【转】Spark Streaming和Kafka整合开发指南

基于Receivers的方法 这个方法使用了Receivers来接收数据.Receivers的实现使用到Kafka高层次的消费者API.对于所有的Receivers,接收到的数据将会保存在Spark executors中,然后由Spark Streaming启动的Job来处理这些数据. 然而,在默认的配置下,这种方法在失败的情况下会丢失数据,为了保证零数据丢失,你可以在Spark Streaming中使用WAL日志,这是在Spark 1.2.0才引入的功能,这使得我们可以将接收到的数据保存到WA