Spark Streaming整合Kafka

0)摘要

  主要介绍了Spark Streaming整合Kafka,两种整合方式:Receiver-based和Direct方式。这里使用的是Kafka broker version 0.8.2.1,官方文档地址:(http://spark.apache.org/docs/2.2.0/streaming-kafka-0-8-integration.html)。

1)Kafka准备

  • 启动zookeeper

    ./zkServer.sh start

  • 启动kafka

    ./kafka-server-start.sh -daemon ../config/server.properties //后台启动
  • 创建topic

    ./kafka-topics.sh --create --zookeeper hadoop:2181 --replication-factor 1 --partitions 1 --topic test
  • 通过控制台测试topic能否正常的生产和消费

  启动生产者脚本:
         ./kafka-console-producer.sh --broker-list hadoop:9092 --topic test

   启动消费者脚本:

    ./kafka-console-consumer.sh --zookeeper hadoop:2181 --topic test --from-beginning

  准备工作已经就绪。

2)Receiver-based方式整合

注意:这种方式为了保证数据不会丢失,需要开启Write Ahead Logs机制,开启后,接收数据的正确性只有被预写到日志以后Receive才会确认,可以从日志中恢复数据,会增加额外的开销。如何开启?设置SparkConf的“Spark Streaming writeAheadLog.enable”属性为“true”,这种模式基本被淘汰

1 添加kafka依赖

        <!--        kafka依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

2 本地代码编写

 1 package flume_streaming
 2
 3 import org.apache.spark.SparkConf
 4 import org.apache.spark.streaming.kafka._
 5 import org.apache.spark.streaming.{Durations, StreamingContext}
 6
 7 /**
 8  * @Author: SmallWild
 9  * @Date: 2019/10/30 10:00
10  * @Desc:
11  */
12
13 object kafkaReceiveWordCount {
14   def main(args: Array[String]): Unit = {
15     if (args.length != 4) {
16       System.err.println("错误参数")
17       System.exit(1)
18     }
19     //接收参数
20     //numPartitions 线程数
21     val Array(zkQuorum, groupId, topics, numPartitions) = args
22     //一定不能使用local[1]
23     val sparkConf = new SparkConf().setMaster("local[2]").setAppName("kafkaReceiveWordCount")
24     val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
25     //设置日志级别
26     ssc.sparkContext.setLogLevel("WARN")
27     //多个topic用,分开
28     val topicMap = topics.split(",").map((_, numPartitions.toInt)).toMap
29     //TODO 业务逻辑,简单进行wordcount,输出到控制台
30     /**
31      * * @param ssc       StreamingContext object
32      * * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
33      * * @param groupId   The group id for this consumer topic所在的组,可以设置为自己想要的名称
34      * * @param topics    Map of (topic_name to numPartitions) to consume. Each partition is consumed
35      * *                  in its own thread
36      * * @param storageLevel  Storage level to use for storing the received objects
37      * *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
38      */
39     val lineMap = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap)
40     lineMap.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
41     ssc.start()
42     ssc.awaitTermination()
43   }
44 }

3 提交到服器上运行

  如果生产中没有联网,需要使用  --jars 传入kafka的jar包

  • 把项目达成jar包
  • 使用local模式提交,提交的脚本:
提交到服务器上运行
 ./spark-submit --master local[2] /
 --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 /
 --class flume_streaming.kafkaReceiveWordCount /
 /smallwild/app/SparkStreaming-1.0.jar /
 hadoop:2181 1 sparkStreaming 1

4 运行结果

  首先在控制台,启动kafka生产者,输入一些单词,然后,启动SparkStreaming程序。

 

3)Direct方式整合

使用的是:Simple Consumer API,自己管理offset,把kfka看成存储数据的地方,根据offset去读。没有使用zk管理消费者的offset,spark自己管理,默认的offset在内存中,如果设置了checkpoint,那么也也有一份,一般要设置。Direct模式生成的Dstream中的RDD的并行度与读取的topic中的partition一致(增加topic的partition个数)

注意点:

  • 没有使用receive,直接查询的kafka偏移量

1 添加kafka依赖

        <!--        kafka依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

2 代码编写

 1 package kafka_streaming
 2
 3 import kafka.serializer.StringDecoder
 4 import org.apache.spark.SparkConf
 5 import org.apache.spark.streaming.{Durations, StreamingContext}
 6 import org.apache.spark.streaming.kafka.KafkaUtils
 7
 8 /**
 9  * @Author: SmallWild
10  * @Date: 2019/10/31 21:21
11  * @Desc:
12  */
13 object kafkaDirectWordCount {
14
15   def main(args: Array[String]): Unit = {
16     if (args.length != 2) {
17       System.err.println("错误参数")
18       System.exit(1)
19     }
20     //接收参数
21     //numPartitions 线程数
22     val Array(brokers, topics) = args
23     //一定不能使用local[1]
24     val sparkConf = new SparkConf().setMaster("local[2]").setAppName("kafkaDirectWordCount")
25     val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
26     //设置日志级别
27     ssc.sparkContext.setLogLevel("WARN")
28     //多个topic用,分开
29     val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers
30     )
31     val topicsa = topics.split(",").toSet
32     /**
33      *
34      */
35     val lineMap = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsa)
36     //TODO 业务逻辑,简单进行wordcount,输出到控制台
37     lineMap.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
38     ssc.start()
39     ssc.awaitTermination()
40   }
41
42 }

3 提交到服务器上运行和第一种方式是上面一样

4 自己管理offset

  使用spark自己管理offset方便,但是当业务逻辑改变的时候,恢复就难了,需要自己手动编写代码管理offset

4)总结

  注意两种模式差别,receive模式几乎被淘汰,可以扩展的地方,1)使程序具备高可用的能力,挂掉之后,能否从上次的状态恢复过来,2)手动管理offset,改变了业务逻辑也能从上次的状态恢复过来

  

原文地址:https://www.cnblogs.com/truekai/p/11769705.html

时间: 2024-11-07 19:16:39

Spark Streaming整合Kafka的相关文章

Spark 系列(十六)—— Spark Streaming 整合 Kafka

一.版本说明 Spark 针对 Kafka 的不同版本,提供了两套整合方案:spark-streaming-kafka-0-8 和 spark-streaming-kafka-0-10,其主要区别如下: spark-streaming-kafka-0-8 spark-streaming-kafka-0-10 Kafka 版本 0.8.2.1 or higher 0.10.0 or higher AP 状态 Deprecated从 Spark 2.3.0 版本开始,Kafka 0.8 支持已被弃用

Spark Streaming和Kafka整合保证数据零丢失

当我们正确地部署好Spark Streaming,我们就可以使用Spark Streaming提供的零数据丢失机制.为了体验这个关键的特性,你需要满足以下几个先决条件: 1.输入的数据来自可靠的数据源和可靠的接收器: 2.应用程序的metadata被application的driver持久化了(checkpointed ); 3.启用了WAL特性(Write ahead log). 下面我将简单地介绍这些先决条件. 可靠的数据源和可靠的接收器 对于一些输入数据源(比如Kafka),Spark S

Spark Streaming和Kafka整合开发指南(一)

Apache Kafka是一个分布式的消息发布-订阅系统.可以说,任何实时大数据处理工具缺少与Kafka整合都是不完整的.本文将介绍如何使用Spark Streaming从Kafka中接收数据,这里将会介绍两种方法:(1).使用Receivers和Kafka高层次的API:(2).使用Direct API,这是使用低层次的KafkaAPI,并没有使用到Receivers,是Spark 1.3.0中开始引入的.这两种方法有不同的编程模型,性能特点和语义担保.下文将会一一介绍. 基于Receiver

【转】Spark Streaming和Kafka整合开发指南

基于Receivers的方法 这个方法使用了Receivers来接收数据.Receivers的实现使用到Kafka高层次的消费者API.对于所有的Receivers,接收到的数据将会保存在Spark executors中,然后由Spark Streaming启动的Job来处理这些数据. 然而,在默认的配置下,这种方法在失败的情况下会丢失数据,为了保证零数据丢失,你可以在Spark Streaming中使用WAL日志,这是在Spark 1.2.0才引入的功能,这使得我们可以将接收到的数据保存到WA

SBT 构建 spark streaming集成kafka (scala版本)

前言: 最近在研究spark 还有 kafka , 想通过kafka端获取的数据,利用spark streaming进行一些计算,但搭建整个环境着实不易,故特此写下该过程,分享给大家,希望大家可以少走点弯路,能帮到大家! 环境准备:    操作系统 : ubuntu14.04 LTS hadoop 2.7.1   伪分布式搭建 sbt-0.13.9 kafka_2.11-0.8.2.2 spark-1.3.1-bin-hadoop2.6 scala 版本 : 2.10.4 注: 请重视版本问题,

Spark Streaming使用Kafka保证数据零丢失

来自: https://community.qingcloud.com/topic/344/spark-streaming使用kafka保证数据零丢失 spark streaming从1.2开始提供了数据的零丢失,想享受这个特性,需要满足如下条件: 数据输入需要可靠的sources和可靠的receivers 应用metadata必须通过应用driver checkpoint WAL(write ahead log) 可靠的sources和receivers spark streaming可以通过

第89课:Spark Streaming on Kafka解析和安装实战

本课分2部分讲解: 第一部分,讲解Kafka的概念.架构和用例场景: 第二部分,讲解Kafka的安装和实战. 由于时间关系,今天的课程只讲到如何用官网的例子验证Kafka的安装是否成功.后续课程会接着讲解如何集成Spark Streaming和Kafka. 一.Kafka的概念.架构和用例场景 http://kafka.apache.org/documentation.html#introdution 1.Kafka的概念 Apache Kafka是分布式发布-订阅消息系统.它最初由Linked

spark streaming 对接kafka记录

spark streaming 对接kafka 有两种方式: 参考: http://group.jobbole.com/15559/ http://blog.csdn.net/kwu_ganymede/article/details/50314901 Approach 1: Receiver-based Approach 基于receiver的方案: 这种方式使用Receiver来获取数据.Receiver是使用Kafka的高层次Consumer API来实现的.receiver从Kafka中获

Spark Streaming、Kafka结合Spark JDBC External DataSouces处理案例

场景:使用Spark Streaming接收Kafka发送过来的数据与关系型数据库中的表进行相关的查询操作: Kafka发送过来的数据格式为:id.name.cityId,分隔符为tab 1 zhangsan 1 2 lisi 1 3 wangwu 2 4 zhaoliu 3 MySQL的表city结构为:id int, name varchar 1 bj 2 sz 3 sh 本案例的结果为:select s.id, s.name, s.cityId, c.name from student s