Spark-Streaming kafka count 案例

Streaming 统计来自 kafka 的数据,这里涉及到的比较,kafka 的数据是使用从 flume 获取到的,这里相当于一个小的案例。

1. 启动 kafka

Spark-Streaming hdfs count 案例

2. 启动 flume

flume-ng agent -c conf -f conf/kafka_test.conf -n a1 -Dflume.root.logger=INFO,console

  flume 配置文件如下

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /root/code/flume_exec_test.txt

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList=master:9092
a1.sinks.k1.topic=kaka
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  这里 flume 是的数据是来自一个文件,只要这个文件有数据进入,就会被flume监控到,测试的时候只需要往这个文件里写数据就可以了。

3. 启动 kafka 消费者来观察

kafka-console-consumer.sh --bootstrap-server master:9092 --topic kaka

4. 下面就是 Streaming 的统计代码

package com.hw.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}

object KafkaWordCount {
  def main(args: Array[String]): Unit = {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }

    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(",")(1))
//    窗口大小10秒,滑动大小2秒,这里的窗口大小一定要是滑动大小的倍数关系才行
    val wordCounts = words.map((_, 1L)).reduceByKeyAndWindow(_ + _,_ - _,Seconds(10), Seconds(2))
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }

}

5. 执行脚本

# kafka count bash
$SPARK_HOME/bin/spark-submit        --class com.hw.streaming.KafkaWordCount        --master yarn-cluster         --executor-memory 1G         --total-executor-cores 2         --files $HIVE_HOME/conf/hive-site.xml         --jars $HIVE_HOME/lib/mysql-connector-java-5.1.25-bin.jar,$SPARK_HOME/jars/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/jars/datanucleus-core-3.2.10.jar,$SPARK_HOME/jars/datanucleus-rdbms-3.2.9.jar,$SPARK_HOME/jars/guava-14.0.1.jar         ./SparkPro-1.0-SNAPSHOT-jar-with-dependencies.jar         master:2181 group_id_1 kaka 1

6. 写数据,写到对应flume 监控的文件就行

import random
import time
readFileName="/root/orders.csv"
writeFileName="/root/code/flume_exec_test.txt"
with open(writeFileName,‘a+‘)as wf:
    with open(readFileName,‘rb‘) as f:
        for line in f.readlines():
            for word in line.split(" "):
                ss = line.strip()
                if len(ss)<1:
                    continue
                wf.write(ss+‘\n‘)
            rand_num = random.random()
            time.sleep(rand_num)

7. 观察消费者是否消费到数据,在执行脚本的时候发现以下错误,一个是窗口时间的问题,一个是要设置 checkpoint。

窗口时间设置不对,会报以下错误

User class threw exception: java.lang.IllegalArgumentException: requirement failed: The window duration of ReducedWindowedDStream (3000 ms) must be multiple of the slide duration of parent DStream (10000 ms)
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.streaming.dstream.ReducedWindowedDStream.<init>(ReducedWindowedDStream.scala:39)
at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$reduceByKeyAndWindow$6.apply(PairDStreamFunctions.scala:348)
at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$reduceByKeyAndWindow$6.apply(PairDStreamFunctions.scala:343)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:693)
at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:265)
at org.apache.spark.streaming.dstream.PairDStreamFunctions.reduceByKeyAndWindow(PairDStreamFunctions.scala:343)
at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$reduceByKeyAndWindow$5.apply(PairDStreamFunctions.scala:311)
at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$reduceByKeyAndWindow$5.apply(PairDStreamFunctions.scala:311)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:693)
at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:265)
at org.apache.spark.streaming.dstream.PairDStreamFunctions.reduceByKeyAndWindow(PairDStreamFunctions.scala:310)
at com.badou.streaming.KafkaWordCount$.main(KafkaWordCount.scala:22)
at com.badou.streaming.KafkaWordCount.main(KafkaWordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)

错误修改,需要将窗口时间设置成滑动时间的倍数。上面给出的脚本已经是修改过的,如果安装上面的步骤操作,就不会报这个错误了。

如果没有增加 checkpoint,也会报错,报错如下:

requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().

设置相应的 checkpoint 即可。

# 在统计代码中加入下面这个语句
# val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.setCheckPoint("/root/checkpoint")

如果以上执行完成,可以在浏览器中查看日志,会看到对应的统计信息。 

# 登录 192.168.56.122:8080
# 查看对应的日志信息

总结,在测试的时候,启动 flume 的时候遇到了一个错误,错误如下:

[WARN - kafka.utils.Logging$class.warn(Logging.scala:83)]
Error while fetching metadata     partition 4     leader: none    replicas:       isr
:    isUnderReplicated: false for topic partition [default-flume-topic,4]:
[class kafka.common.LeaderNotAvailableException]

遇到这个错误的原因主要是 flume 配置文件中,设置的 kafka sink 不对导致的,可以看到本应该监听的 topic 是 kaka,但是这里监控的却是默认的 default-flume-topic,经过检查终于发现错误是由于不细心导致的,把 sinks 写成 sink 了,一定要注意细节,一定要学会看日志。

原文地址:https://www.cnblogs.com/hanwen1014/p/11260456.html

时间: 2024-08-29 20:10:09

Spark-Streaming kafka count 案例的相关文章

spark streaming kafka example

// scalastyle:off println package org.apache.spark.examples.streaming import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.stream

Spark streaming + Kafka 流式数据处理,结果存储至MongoDB、Solr、Neo4j(自用)

KafkaStreaming.scala文件 import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka.{KafkaManagerAdd, KafkaUtils} import org.json4s.Defau

第99课:使用Spark Streaming+Kafka实战对论坛网站动态行为的多维度分析及java.lang.NoClassDefFoundError问题解决完整内幕版本解密

第99课:使用Spark Streaming 实战对论坛网站动态行为的多维度分析 /* 王家林老师授课http://weibo.com/ilovepains  每天晚上20:00YY频道现场授课频道68917580*/ /** * *第99课:使用Spark Streaming 实战对论坛网站动态行为的多维度分析 * 论坛数据自动生成代码,该生成的数据会作为Producer的方式发送给Kafka,然后SparkStreaming程序会从 * Kafka中在线Pull到论坛或者网站的用户在线行为信

160728、Spark Streaming kafka 实现数据零丢失的几种方式

定义 问题开始之前先解释下流处理中的一些概念: At most once - 每条数据最多被处理一次(0次或1次) At least once - 每条数据最少被处理一次 (1次或更多) Exactly once - 每条数据只会被处理一次(没有数据会丢失,并且没有数据会被多次处理) High Level API   如果不做容错,将会带来数据丢失因为receiver一直在接收数据,在其没有处理的时候(已通知zk数据接收到),executor突然挂掉(或是driver挂掉通知executor关闭

第82课 Spark Streaming第一课 案例动手实战并在电光石火间理解其工作原理

本课内容提要: (1)什么是流处理以及Spark Streaming主要介绍 (2)Spark Streaming初体验 一.什么是流处理以及Spark Streaming主要介绍 流(Streaming),在大数据时代为数据流处理,就像水流一样,是数据流:既然是数据流处理,就会想到数据的流入.数据的加工.数据的流出. 日常工作.生活中数据来源很多不同的地方.例如:工业时代的汽车制造.监控设备.工业设备会产生很多源数据:信息时代的电商网站.日志服务器.社交网络.金融交易系统.黑客攻击.垃圾邮件.

下载基于大数据技术推荐系统实战教程(Spark ML Spark Streaming Kafka Hadoop Mahout Flume Sqoop Redis)

地址:http://pan.baidu.com/s/1c2tOtwc  密码:yn2r 82课高清完整版,转一播放码. 互联网行业是大数据应用最前沿的阵地,目前主流的大数据技术,包括 hadoop,spark等,全部来自于一线互联网公司.从应用角度讲,大数据在互联网领域主要有三类应用:搜索引擎(比如百度,谷歌等),广告系统(比如百度凤巢,阿里妈妈等)和推荐系统(比如阿里巴巴天猫推荐,优酷视频推荐等). 本次培训以商业实战项目作为驱动来学习大数据技术在推荐系统项目中的应用.使得学员能够亲身体会大数

spark streaming kafka

SparkStreaming+Kafka •kafka是什么,有哪些特点 •SparkStreaming+Kafka有什么好处 –解耦 –缓冲 消息列队的特点 生产者消费者模式 •可靠性保证 –自己不丢数据 –消费者不丢数据:“至少一次,严格一次” broker n. 经纪人,掮客 vt. 以中间人等身分安排... vi. 作为权力经纪人进行谈判 kafka部署 node2,3,4 基于zookeeper 启动 三台 zookeeper /opt/sxt/zookeeper-3.4.6/bin/

【转】Spark Streaming 实时计算在甜橙金融监控系统中的应用及优化

系统架构介绍 整个实时监控系统的架构是先由 Flume 收集服务器产生的日志 Log 和前端埋点数据, 然后实时把这些信息发送到 Kafka 分布式发布订阅消息系统,接着由 Spark Streaming 消费 Kafka 中的消息,同时消费记录由 Zookeeper 集群统一管理,这样即使 Kafka 宕机重启后也能找到上次的消费记录继而进行消费.在这里 Spark Streaming 首先从 MySQL 读取规则然后进行 ETL 清洗并计算多个聚合指标,最后将结果的一部分存储到 Hbase

Spark Streaming、Kafka结合Spark JDBC External DataSouces处理案例

场景:使用Spark Streaming接收Kafka发送过来的数据与关系型数据库中的表进行相关的查询操作: Kafka发送过来的数据格式为:id.name.cityId,分隔符为tab 1 zhangsan 1 2 lisi 1 3 wangwu 2 4 zhaoliu 3 MySQL的表city结构为:id int, name varchar 1 bj 2 sz 3 sh 本案例的结果为:select s.id, s.name, s.cityId, c.name from student s