2016年大数据Spark“蘑菇云”行动之spark streaming消费flume采集的kafka数据Directf方式

王家林老师的课程:2016年大数据Spark“蘑菇云”行动之spark streaming消费flume采集的kafka数据Directf方式作业。

    一、基本背景

Spark-Streaming获取kafka数据的两种方式Receiver与Direct的方式,本文介绍Direct的方式。具体的流程是这样的:

1、Direct方式是直接连接到kafka的节点上获取数据了。

2、基于Direct的方式:周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。

3、当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

这种方式有如下优点:

1、简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。;

2、高性能:不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复;

3、一次且仅一次的事务机制:Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。

Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

二、配置文件及编码

     flume版本:1.6.0,此版本直接支持到kafka,不用在单独安装插件。

     kafka版本2.10-0.8.2.1,必须是0.8.2.1,刚开始我用的是0.10,结果出现了下

      四、各类错误大全的第2个错误。

     spark版本:1.6.1。

     

    

      kafka配文件:producer.properties,红色文字为特别要注意的配置坑,呵呵

    

#agentsection

producer.sources= s

producer.channels= c

producer.sinks= r

#sourcesection

producer.sources.s.type= exec

producer.sources.s.command= tail -f -n+1 /opt/test/test.log

producer.sources.s.channels= c

# Eachsink‘s type must be defined

producer.sinks.r.type= org.apache.flume.plugins.KafkaSink

producer.sinks.r.metadata.broker.list=192.168.0.10:9092

producer.sinks.r.partition.key=0

producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition

producer.sinks.r.serializer.class=kafka.serializer.StringEncoder

producer.sinks.r.request.required.acks=0

producer.sinks.r.max.message.size=1000000

producer.sinks.r.producer.type=sync

producer.sinks.r.custom.encoding=UTF-8

producer.sinks.r.custom.topic.name=flume2kafka2streaming930

#Specifythe channel the sink should use

producer.sinks.r.channel= c

# Eachchannel‘s type is defined.

producer.channels.c.type= memory

producer.channels.c.capacity= 1000

producer.channels.c.transactionCapacity= 100

核心代码如下:

 SparkConf conf = SparkConf().setMaster().
              setAppName()
              .setJars(String[] {
                      })Map<StringString> kafkaParameters = HashMap<StringString>()kafkaParameters.put()Set<String> topics =  HashSet<String>()topics.add()JavaPairInputDStream<StringString> lines = KafkaUtils.(jscString.String.StringDecoder.StringDecoder.kafkaParameterstopics)JavaDStream<String> words = lines.flatMap(FlatMapFunction<Tuple2<StringString>String>() { Iterable<String> (Tuple2<StringString> tuple) Exception {
              Arrays.(tuple..split())}
      })JavaPairDStream<StringInteger> pairs = words.mapToPair(PairFunction<StringStringInteger>() {

          Tuple2<StringInteger> (String word) Exception {
              Tuple2<StringInteger>(word)}
      })JavaPairDStream<StringInteger> wordsCount = pairs.reduceByKey(Function2<IntegerIntegerInteger>() { Integer (Integer v1Integer v2) Exception {
              v1 + v2}
      })wordsCount.print()jsc.start()jsc.awaitTermination()jsc.close()

  三、启动脚本

启动zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties &

启动kafka broker

bin/kafka-server-start.sh config/server.properties &

创建topic

bin/kafka-topics.sh --create --zookeeper 192.168.0.10:2181 --replication-factor 1 --partitions 1 --topic flume2kafka2streaming930

启动flume

bin/flume-ng agent --conf conf/  -f conf/producer.properties  -n producer -Dflume.root.logger=INFO,console

bin/spark-submit --class com.dt.spark.sparkstreaming.SparkStreamingOnKafkaDirected  --jars /lib/kafka_2.10-0.8.2.1/kafka-clients-0.8.2.1.jar,/lib/kafka_2.10-

0.8.2.1/kafka_2.10-0.8.2.1.jar,/lib/kafka_2.10-0.8.2.1/metrics-core-2.2.0.jar,/lib/spark-1.6.1/spark-streaming-kafka_2.10-1.6.1.jar --master local[5] SparkApps.jar

echo "hadoop spark hive storm spark hadoop hdfs" >> /opt/test/test.log

echo "hive storm " >> /opt/test/test.log

echo "hdfs" >> /opt/test/test.log

echo "hadoop spark hive storm spark hadoop hdfs" >> /opt/test/test.log

    输出结果如下:

* 结果如下:* -------------------------------------------* Time: 1475282360000 ms* -------------------------------------------*(spark,8)*(storm,4)*(hdfs,4)*(hive,4)*(hadoop,8)


    四、各类错误大全

    1、Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils
        at com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected.main

        一概是没有提交jar包,一概会报错,无法执行,一概在submit脚本里添加:

       

bin/spark-submit --class com.dt.spark.sparkstreaming.SparkStreamingOnKafkaDirected  --jars /lib/kafka_2.10-0.8.2.1/kafka-clients-0.8.2.1.jar,/lib/kafka_2.10-

0.8.2.1/kafka_2.10-0.8.2.1.jar,/lib/kafka_2.10-0.8.2.1/metrics-core-2.2.0.jar,/lib/spark-1.6.1/spark-streaming-kafka_2.10-1.6.1.jar --master local[5] SparkApps.jar

   2、Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker。

         上stackoverflow.com及spark官网查询,这个是因为版本不兼容引起。官网提供的版本:Spark Streaming 1.6.1 is compatible with Kafka 0.8.2.1

王家林_DT大数据梦工厂

简介: 王家林:DT大数据梦工厂创始人和首席专家.微信公众号DT_Spark .

联系邮箱[email protected]

电话:18610086859

微信号:18610086859

微博为:http://weibo.com/ilovepains

时间: 2024-08-05 14:58:28

2016年大数据Spark“蘑菇云”行动之spark streaming消费flume采集的kafka数据Directf方式的相关文章

利用Flume将MySQL表数据准实时抽取到HDFS

转自:http://blog.csdn.net/wzy0623/article/details/73650053 一.为什么要用到Flume 在以前搭建HAWQ数据仓库实验环境时,我使用Sqoop抽取从MySQL数据库增量抽取数据到HDFS,然后用HAWQ的外部表进行访问.这种方式只需要很少量的配置即可完成数据抽取任务,但缺点同样明显,那就是实时性.Sqoop使用MapReduce读写数据,而MapReduce是为了批处理场景设计的,目标是大吞吐量,并不太关心低延时问题.就像实验中所做的,每天定

2016年大数据Spark“蘑菇云”行动之flume整合spark streaming

近期,听了王家林老师的2016年大数据Spark"蘑菇云"行动,需要将flume,kafka和Spark streaming进行整合. 感觉一时难以上手,还是先从简单着手吧:我的思路是这样的,flume产生数据,然后输出到spark streaming,flume的源数据是netcat(地址:localhost,端口22222),输出是avro(地址:localhost,端口是11111).Spark streaming的处理是直接输出有几个events. 一.配置文件 Flume 配

2016年大数据Spark“蘑菇云”行动代码学习之AdClickedStreamingStats模块分析

    系统背景:用户使用终端设备(IPAD.手机.浏览器)等登录系统,系统采用js脚本发送用户信息和广告点击信息到后台日志,进入flume监控,通过kafka消息中间件传输数据,由Spark Streaming消费后将信息存储到后台.本模块主要就是实现将kafka发送的信息进行过滤,动态更新黑名单,生成有效的广告点击数据,形成广告点击趋势,将相关信息通过数据库连接池写入数据库MySql.     实现思路:由于kafka传输的广告点击数据中有一些是无效数据,需要根据一定的规则进行过滤(本方案采

决胜大数据时代:Hadoop&amp;Yarn&amp;Spark企业级最佳实践(8天完整版脱产式培训版本)

Hadoop.Yarn.Spark是企业构建生产环境下大数据中心的关键技术,也是大数据处理的核心技术,是每个云计算大数据工程师必修课. 课程简介 大数据时代的精髓技术在于Hadoop.Yarn.Spark,是大数据时代公司和个人必须掌握和使用的核心内容. Hadoop.Yarn.Spark是Yahoo!.阿里淘宝等公司公认的大数据时代的三大核心技术,是大数据处理的灵魂,是云计算大数据时代的技术命脉之所在,以Hadoop.Yarn.Spark为基石构建起来云计算大数据中心广泛运行于Yahoo!.阿

大数据时代到底Hadoop和Spark谁是王者!

在现在这个大数据时代,Hadoop和Spark是最潮流的两个词汇,Hadoop是一种分布式计算框架,由Google提出,主要用于搜索领域,解决海量数据的计算问题,Hadoop中的MapReduce包括两个阶段:Mapper阶段和Reducer阶段,用户只需要实现map函数和reduce函数即可实现分布式计算,非常简单.而近几年Spark新兴框架的产生,以不可挡之势席卷中国,其核心内部结构RDD以超强的弹性机制更加的引人注目!越来越多的人认为Spark终有一天要取代Hadoop,但是事实究竟如何呢

大数据学习系列之七 ----- Hadoop+Spark+Zookeeper+HBase+Hive集群搭建 图文详解

引言 在之前的大数据学习系列中,搭建了Hadoop+Spark+HBase+Hive 环境以及一些测试.其实要说的话,我开始学习大数据的时候,搭建的就是集群,并不是单机模式和伪分布式.至于为什么先写单机的搭建,是因为作为个人学习的话,单机已足以,好吧,说实话是自己的电脑不行,使用虚拟机实在太卡了... 整个的集群搭建是在公司的测试服务搭建的,在搭建的时候遇到各种各样的坑,当然也收获颇多.在成功搭建大数据集群之后,零零散散的做了写笔记,然后重新将这些笔记整理了下来.于是就有了本篇博文. 其实我在搭

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程 视频课程包含: 38套大数据和人工智能精品高级课包含:大数据,云计算,架构,数据挖掘实战,实时推荐系统实战,电视收视率项目实战,实时流统计项目实战,离线电商分析项目实战,Spark大型项目实战用户分析,智能客户系统项目实战,Linux基础,Hadoop,Spark,Storm,Docker,Mapreduce,Kafka,Flume,OpenStack,Hiv

大数据学习系列之七 ----- Hadoop+Spark+Zookeeper+HBase+Hive集

引言 在之前的大数据学习系列中,搭建了Hadoop+Spark+HBase+Hive 环境以及一些测试.其实要说的话,我开始学习大数据的时候,搭建的就是集群,并不是单机模式和伪分布式.至于为什么先写单机的搭建,是因为作为个人学习的话,单机已足以,好吧,说实话是自己的电脑不行,使用虚拟机实在太卡了... 整个的集群搭建是在公司的测试服务搭建的,在搭建的时候遇到各种各样的坑,当然也收获颇多.在成功搭建大数据集群之后,零零散散的做了写笔记,然后重新将这些笔记整理了下来.于是就有了本篇博文. 其实我在搭

【原创】大数据基础之Spark(7)spark读取文件split过程(即RDD分区数量)

spark 2.1.1 spark初始化rdd的时候,需要读取文件,通常是hdfs文件,在读文件的时候可以指定最小partition数量,这里只是建议的数量,实际可能比这个要大(比如文件特别多或者特别大时),也可能比这个要小(比如文件只有一个而且很小时),如果没有指定最小partition数量,初始化完成的rdd默认有多少个partition是怎样决定的呢? 以SparkContext.textfile为例来看下代码: org.apache.spark.SparkContext /** * Re