scala spark-streaming整合kafka (spark 2.3 kafka 0.10)

Maven组件如下:

<dependency>    <groupId>org.apache.spark</groupId>    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>    <version>2.3.0</version></dependency>

官网代码如下:

pasting

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements.  See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */

// scalastyle:off printlnpackage org.apache.spark.examples.streaming

import org.apache.spark.SparkConfimport org.apache.spark.streaming._import org.apache.spark.streaming.kafka010._

/** * Consumes messages from one or more topics in Kafka and does wordcount. * Usage: DirectKafkaWordCount <brokers> <topics> *   <brokers> is a list of one or more Kafka brokers *   <topics> is a list of one or more kafka topics to consume from * * Example: *    $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port \ *    topic1,topic2 */object DirectKafkaWordCount {  def main(args: Array[String]) {    if (args.length < 2) {      System.err.println(s"""        |Usage: DirectKafkaWordCount <brokers> <topics>        |  <brokers> is a list of one or more Kafka brokers        |  <topics> is a list of one or more kafka topics to consume from        |        """.stripMargin)      System.exit(1)    }

    StreamingExamples.setStreamingLogLevels()

    val Array(brokers, topics) = args

    // Create context with 2 second batch interval    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create direct kafka stream with brokers and topics    val topicsSet = topics.split(",").toSet    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)    val messages = KafkaUtils.createDirectStream[String, String](      ssc,      LocationStrategies.PreferConsistent,      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

    // Get the lines, split them into words, count the words and print    val lines = messages.map(_.value)    val words = lines.flatMap(_.split(" "))    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)    wordCounts.print()

    // Start the computation    ssc.start()    ssc.awaitTermination()  }}// scalastyle:on println

运行以上代码出现如下错误等:

Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "bootstrap.servers" which has no default value.

解决方法:

由错误可见,是因为没有设置kafka相关参数。

把官网代码修改如下:

package cn.xdf.userprofile.streamimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.kafka010._

import scala.collection.mutable

object DirectKafka {  def main(args: Array[String]): Unit = {    if (args.length < 2) {      System.err.println(        s"""           |Usage: DirectKafkaWordCount <brokers> <topics>           |  <brokers> is a list of one or more Kafka brokers           |  <topics> is a list of one or more kafka topics to consume from           |        """.stripMargin)      System.exit(1)    }      val Array(brokers,topics)=args

      var conf = new SparkConf()        .setAppName("DirectKafka")          .setMaster("local[2]")

      val ssc = new StreamingContext(conf, Seconds(2))

      val topicsSet=topics.split(",").toSet      val kafkaParams=mutable.HashMap[String,String]()       //必须添加以下参数,否则会报错         kafkaParams.put("bootstrap.servers" ,brokers)          kafkaParams.put("group.id", "group1")         kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")         kafkaParams.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer")      val messages=KafkaUtils.createDirectStream [String,String](        ssc,        LocationStrategies.PreferConsistent,        ConsumerStrategies.Subscribe[String,String](topicsSet,kafkaParams          )      )      // Get the lines, split them into words, count the words and print      val lines = messages.map(_.value)      val words = lines.flatMap(_.split(" "))      val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)      wordCounts.print()

      // Start the computation      ssc.start()      ssc.awaitTermination()

  }}

运行如下:

启动kafka

bin/kafka-server-start ./etc/kafka/server.properties &

[2018-10-22 11:24:14,748] INFO [GroupCoordinator 0]: Stabilized group group1 generation 1 (__consumer_offsets-40) (kafka.coordinator.group.GroupCoordinator)

[2018-10-22 11:24:14,761] INFO [GroupCoordinator 0]: Assignment received from leader for group group1 for generation 1 (kafka.coordinator.group.GroupCoordinator)

[2018-10-22 11:24:14,779] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: __consumer_offsets-40. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)

[2018-10-22 11:28:19,010] INFO [GroupCoordinator 0]: Preparing to rebalance group group1 with old generation 1 (__consumer_offsets-40) (kafka.coordinator.group.GroupCoordinator)

[2018-10-22 11:28:19,013] INFO [GroupCoordinator 0]: Group group1 with generation 2 is now empty (__consumer_offsets-40) (kafka.coordinator.group.GroupCoordinator)

[2018-10-22 11:29:29,424] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 11 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

[2018-10-22 11:39:29,414] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

[2018-10-22 11:49:29,414] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

运行spark

/usr/local/spark-2.3.0/bin/spark-submit --class cn.xdf.userprofile.stream.DirectKafka --master yarn --driver-memory 2g     --num-executors 1      --executor-memory 2g     --executor-cores 1  userprofile2.0.jar localhost:9092 test

2018-10-22 11:28:16 INFO  DAGScheduler:54 - Submitting 1 missing tasks from ResultStage 483 (ShuffledRDD[604] at reduceByKey at DirectKafka.scala:46) (first 15 tasks are for partitions Vector(1))

2018-10-22 11:28:16 INFO  TaskSchedulerImpl:54 - Adding task set 483.0 with 1 tasks

2018-10-22 11:28:16 INFO  TaskSetManager:54 - Starting task 0.0 in stage 483.0 (TID 362, localhost, executor driver, partition 1, PROCESS_LOCAL, 7649 bytes)

2018-10-22 11:28:16 INFO  Executor:54 - Running task 0.0 in stage 483.0 (TID 362)

2018-10-22 11:28:16 INFO  ShuffleBlockFetcherIterator:54 - Getting 0 non-empty blocks out of 1 blocks

2018-10-22 11:28:16 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 0 ms

2018-10-22 11:28:16 INFO  Executor:54 - Finished task 0.0 in stage 483.0 (TID 362). 1091 bytes result sent to driver

2018-10-22 11:28:16 INFO  TaskSetManager:54 - Finished task 0.0 in stage 483.0 (TID 362) in 4 ms on localhost (executor driver) (1/1)

2018-10-22 11:28:16 INFO  TaskSchedulerImpl:54 - Removed TaskSet 483.0, whose tasks have all completed, from pool

2018-10-22 11:28:16 INFO  DAGScheduler:54 - ResultStage 483 (print at DirectKafka.scala:47) finished in 0.008 s

2018-10-22 11:28:16 INFO  DAGScheduler:54 - Job 241 finished: print at DirectKafka.scala:47, took 0.009993 s

-------------------------------------------

Time: 1540178896000 ms

-------------------------------------------

启动生产者

[[email protected] kafka_2.11-1.0.0]# bin/kafka-console-producer.sh --topic test --broker-list localhost:9092

>  hello you

>  hello me

查看结果:

(hello,2)

(me,1)

(you,1)

2018-10-22 11:57:08 INFO  JobScheduler:54 - Finished job streaming job 1540180628000 ms.0 from job set of time 1540180628000 ms

2018-10-22 11:57:08 INFO  JobScheduler:54 - Total delay: 0.119 s for time 1540180628000 ms (execution: 0.072 s)

2018-10-22 11:57:08 INFO  ShuffledRDD:54 - Removing RDD 154 from persistence list

2018-10-22 11:57:08 INFO  MapPartitionsRDD:54 - Removing RDD 153 from persistence list

2018-10-22 11:57:08 INFO  BlockManager:54 - Removing RDD 153

2018-10-22 11:57:08 INFO  BlockManager:54 - Removing RDD 154

2018-10-22 11:57:08 INFO  MapPartitionsRDD:54 - Removing RDD 152 from persistence list

2018-10-22 11:57:08 INFO  BlockManager:54 - Removing RDD 152

2018-10-22 11:57:08 INFO  MapPartitionsRDD:54 - Removing RDD 151 from persistence list

2018-10-22 11:57:08 INFO  BlockManager:54 - Removing RDD 151

2018-10-22 11:57:08 INFO  KafkaRDD:54 - Removing RDD 150 from persistence list

2018-10-22 11:57:08 INFO  BlockManager:54 - Removing RDD 150

原文地址:https://www.cnblogs.com/abcdwxc/p/9829385.html

时间: 2024-11-05 17:24:39

scala spark-streaming整合kafka (spark 2.3 kafka 0.10)的相关文章

Spark 系列(十六)—— Spark Streaming 整合 Kafka

一.版本说明 Spark 针对 Kafka 的不同版本,提供了两套整合方案:spark-streaming-kafka-0-8 和 spark-streaming-kafka-0-10,其主要区别如下: spark-streaming-kafka-0-8 spark-streaming-kafka-0-10 Kafka 版本 0.8.2.1 or higher 0.10.0 or higher AP 状态 Deprecated从 Spark 2.3.0 版本开始,Kafka 0.8 支持已被弃用

Spark Streaming整合Kafka

0)摘要 主要介绍了Spark Streaming整合Kafka,两种整合方式:Receiver-based和Direct方式.这里使用的是Kafka broker version 0.8.2.1,官方文档地址:(http://spark.apache.org/docs/2.2.0/streaming-kafka-0-8-integration.html). 1)Kafka准备 启动zookeeper ./zkServer.sh start 启动kafka ./kafka-server-star

spark streaming从指定offset处消费Kafka数据

spark streaming从指定offset处消费Kafka数据 2017-06-13 15:19 770人阅读 评论(2) 收藏 举报 分类: spark(5) 原文地址:http://blog.csdn.net/high2011/article/details/53706446 首先很感谢原文作者,看到这篇文章我少走了很多弯路,转载此文章是为了保留一份供复习用,请大家支持原作者,移步到上面的连接去看,谢谢 一.情景:当Spark streaming程序意外退出时,数据仍然再往Kafka中

Spark 系列(十五)—— Spark Streaming 整合 Flume

一.简介 Apache Flume 是一个分布式,高可用的数据收集系统,可以从不同的数据源收集数据,经过聚合后发送到分布式计算框架或者存储系统中.Spark Straming 提供了以下两种方式用于 Flume 的整合. 二.推送式方法 在推送式方法 (Flume-style Push-based Approach) 中,Spark Streaming 程序需要对某台服务器的某个端口进行监听,Flume 通过 avro Sink 将数据源源不断推送到该端口.这里以监听日志文件为例,具体整合方式如

Spark Streaming整合Flume

1 目的 Spark Streaming整合Flume.参考官方整合文档(http://spark.apache.org/docs/2.2.0/streaming-flume-integration.html) 2 整合方式一:基于推 2.1 基本要求 flume和spark一个work节点要在同一台机器上,flume会在本机器上通过配置的端口推送数据 streaming应用必须先启动,receive必须要先监听推送数据的端口后,flume才能推送数据 添加如下依赖 groupId = org.

spark streaming优化:spark.default.parallelism调整处理并行度

官方是这么说的: Cluster resources can be under-utilized if the number of parallel tasks used in any stage of the computation is not high enough. For example, for distributed reduce operations like reduceByKey and reduceByKeyAndWindow, the default number of

整合Kafka到Spark Streaming——代码示例和挑战

作者Michael G. Noll是瑞士的一位工程师和研究员,效力于Verisign,是Verisign实验室的大规模数据分析基础设施(基础Hadoop)的技术主管.本文,Michael详细的演示了如何将Kafka整合到Spark Streaming中. 期间, Michael还提到了将Kafka整合到 Spark Streaming中的一些现状,非常值得阅读,虽然有一些信息在Spark 1.2版本中已发生了一些变化,比如HA策略: 通过Spark Contributor.Spark布道者陈超我

Spark Streaming和Kafka整合保证数据零丢失

当我们正确地部署好Spark Streaming,我们就可以使用Spark Streaming提供的零数据丢失机制.为了体验这个关键的特性,你需要满足以下几个先决条件: 1.输入的数据来自可靠的数据源和可靠的接收器: 2.应用程序的metadata被application的driver持久化了(checkpointed ); 3.启用了WAL特性(Write ahead log). 下面我将简单地介绍这些先决条件. 可靠的数据源和可靠的接收器 对于一些输入数据源(比如Kafka),Spark S

spark streaming集成kafka

Kakfa起初是由LinkedIn公司开发的一个分布式的消息系统,后成为Apache的一部分,它使用Scala编写,以可水平扩展和高吞吐率而被广泛使用.目前越来越多的开源分布式处理系统如Cloudera.Apache Storm.Spark等都支持与Kafka集成. Spark streaming集成kafka是企业应用中最为常见的一种场景. 一.安装kafka 参考文档: http://kafka.apache.org/quickstart#quickstart_createtopic 1.安

Spark学习八:spark streaming与flume和kafka集成

Spark学习八:spark streaming与flume和kafka集成 标签(空格分隔): Spark Spark学习八spark streaming与flume和kafka集成 一Kafka 二flume和kafka的集成 三kafka和spark streaming的集成方式一kafka推送 四kafka和spark streaming的集成方式一spark streaam主动获取 五spark stream的高级应用updateStateByKey实现累加功能 六spark stre