sparkStreaming 读kafka的数据

目标:sparkStreaming每2s中读取一次kafka中的数据,进行单词计数。

topic:topic1

broker list:192.168.1.126:9092,192.168.1.127:9092,192.168.1.128:9092

1、首先往一个topic中实时生产数据。

  代码如下: 代码功能:每秒向topic1发送一条消息,一条消息里包含4个单词,单词之间用空格隔开。

 1 package kafkaProducer
 2
 3 import java.util.HashMap
 4
 5 import org.apache.kafka.clients.producer._
 6
 7
 8 object KafkaProducer {
 9 def main(args: Array[String]) {
10   val topic="topic1"
11   val brokers="192.168.1.126:9092,192.168.1.127:9092,192.168.1.128:9092"
12   val messagesPerSec=1 //每秒发送几条信息
13   val wordsPerMessage =4 //一条信息包括多少个单词
14   // Zookeeper connection properties
15     val props = new HashMap[String, Object]()
16     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
17     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
18       "org.apache.kafka.common.serialization.StringSerializer")
19     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
20       "org.apache.kafka.common.serialization.StringSerializer")
21     val producer = new KafkaProducer[String, String](props)
22     // Send some messages
23      while(true) {
24       (1 to messagesPerSec.toInt).foreach { messageNum =>
25         val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
26           .mkString(" ")
27         val message = new ProducerRecord[String, String](topic, null, str)
28         producer.send(message)
29         println(message)
30       }
31       Thread.sleep(1000)
32     }
33   }
34 }

打包运行命令:hadoop jar jar包  (注意jar包是可运行的jar包)

消费者消费命令: ./kafka-console-consumer.sh  --zookeeper zk01:2181,zk02:2181  --topic topic1 --from-beginning

可以正常消费。

2、编写SparkStreaming代码读kafka中的数据,每2s读一次

  代码如下:

 1 package kafkaSparkStream
 2
 3 import org.apache.spark.SparkConf
 4 import org.apache.spark.streaming.StreamingContext
 5 import org.apache.spark.streaming.Seconds
 6 import org.apache.spark.streaming.kafka.KafkaUtils
 7 import kafka.serializer.StringDecoder
 8 /**
 9  * sparkStreaming读取kafka中topic的数据
10  */
11 object KafkaToSpark {
12 def main(args: Array[String]) {
13   if (args.length<2) {
14   System.err.println("Usage: <brokers> <topics>");
15   System.exit(1)
16   }
17   val Array(brokers,topics)=args
18   //2s从kafka中读取一次
19   val conf=new SparkConf().setAppName("KafkaToSpark");
20   val scc=new StreamingContext(conf,Seconds(2))
21   // Create direct kafka stream with brokers and topics
22   val topicSet=topics.split(",").toSet
23   val kafkaParams=Map[String,String]("metadata.broker.list"->brokers)
24   //获取信息
25   val messages=KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
26       scc,kafkaParams,topicSet)
27   // Get the lines, split them into words, count the words and print
28  val lines= messages.map(_._2)
29  val words=lines.flatMap(_.split(" "))
30  val wordCouts=words.map(x =>(x,1L)).reduceByKey(_+_)
31  wordCouts.print
32  //开启计算
33  scc.start()
34  scc.awaitTermination()
35 }
36
37 }

打包运行命令:./spark-submit --class kafkaSparkStream.KafkaToSpark --master yarn-client /home/hadoop/sparkJar/kafkaToSpark.jar 192.168.1.126:9092,192.168.1.127:9092,192.168.1.128:9092 topic1

运行成功!

时间: 2024-09-29 16:29:12

sparkStreaming 读kafka的数据的相关文章

spark-streaming读kafka数据到hive遇到的问题

在项目中使用spark-stream读取kafka数据源的数据,然后转成dataframe,再后通过sql方式来进行处理,然后放到hive表中, 遇到问题如下,hive-metastor在没有做高可用的情况下,有时候会出现退出,这个时候,spark streaminG的微批作业就会失败, 然后再启重动hive-metastore进程后,作业继续正常执行,数据就有丢失. 分析如下: 第一步,观察日志发现, 我原来的代码这么写的: xx.foreachRdd(rdd=> processRdd(rdd

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十一)定制一个arvo格式文件发送到kafka的topic,通过sparkstreaming读取kafka的数据

定制avro schema: { "type": "record", "name": "userlog", "fields": [ {"name": "ip","type": "string"}, {"name": "identity","type":"str

Spark Streaming使用Kafka保证数据零丢失

来自: https://community.qingcloud.com/topic/344/spark-streaming使用kafka保证数据零丢失 spark streaming从1.2开始提供了数据的零丢失,想享受这个特性,需要满足如下条件: 数据输入需要可靠的sources和可靠的receivers 应用metadata必须通过应用driver checkpoint WAL(write ahead log) 可靠的sources和receivers spark streaming可以通过

SparkStreaming消费kafka数据

概要:本例子为SparkStreaming消费kafka消息的例子,实现的功能是将数据实时的进行抽取.过滤.转换,然后存储到HDFS中. 实例代码 package com.fwmagic.test import com.alibaba.fastjson.{JSON, JSONException} import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf imp

第91讲:sparkStreaming基于kafka的Direct详解

1:Direct方式特点: 1)Direct的方式是会直接操作kafka底层的元数据信息,这样如果计算失败了,可以把数据重新读一下,重新处理.即数据一定会被处理.拉数据,是RDD在执行的时候直接去拉数据. 2)由于直接操作的是kafka,kafka就相当于你底层的文件系统.这个时候能保证严格的事务一致性,即一定会被处理,而且只会被处理一次.而Receiver的方式则不能保证,因为Receiver和ZK中的数据可能不同步,spark Streaming可能会重复消费数据,这个调优可以解决,但显然没

Spark实战(一)SparkStreaming集成Kafka

Spark Streaming + Kafka集成指南 Kafka项目在版本0.8和0.10之间引入了一个新的消费者API,因此有两个独立的相应Spark Streaming包可用.请选择正确的包, 请注意,0.8集成与后来的0.9和0.10代理兼容,但0.10集成与早期的代理不兼容. 注意:从Spark 2.3.0开始,不推荐使用Kafka 0.8支持. Spark Streaming从Kafka接收数据,转换为spark streaming中的数据结构Dstream.数据接收方式有两种 :1

SparkStreaming整合kafka的补充

(1)SparkStreaming 整合 kafka 两种方式对比 Direct 方式的优缺点分析 : 优点: 简化并行(Simplified Parallelism).不现需要创建以及 union 多输入源,Kafka topic 的partition 与 RDD 的 partition 一一对应. 高效(Efficiency).基于 Receiver-based 的方式保证数据零丢失(zero-data loss)需要配置 spark.streaming.receiver.writeAhea

SparkStreaming与Kafka整合遇到的问题及解决方案

前言 最近工作中是做日志分析的平台,采用了sparkstreaming+kafka,采用kafka主要是看中了它对大数据量处理的高性能,处理日志类应用再好不过了,采用了sparkstreaming的流处理框架 主要是考虑到它本身是基于spark核心的,以后的批处理可以一站式服务,并且可以提供准实时服务到elasticsearch中,可以实现准实时定位系统日志. 实现 Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式. 一. 基于Receiver方式

sparkStreaming读取kafka的两种方式

概述 Spark Streaming 支持多种实时输入源数据的读取,其中包括Kafka.flume.socket流等等.除了Kafka以外的实时输入源,由于我们的业务场景没有涉及,在此将不会讨论.本篇文章主要着眼于我们目前的业务场景,只关注Spark Streaming读取Kafka数据的方式. Spark Streaming 官方提供了两种方式读取Kafka数据: 一是Receiver-based Approach.该种读取模式官方最先支持,并在Spark 1.2提供了数据零丢失(zero-d