Flume+kakfa+sparkStream实时处理数据测试

flume:从数据源拉取数据

kafka:主要起到缓冲从flume拉取多了的数据

sparkStream:对数据进行处理

一.flume拉取数据

1.源数据文件读取配置

在flume目录的conf目录下配置读取数据源的配置,配置一个test.properties文件,内容如下:

a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/hjh/spark/test.txt
a1.sources.r1.restartThrottle = 1000
a1.sources.r1.logStdErr = true
#a1.sources.r1.restart = true
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c1.keepalive = 100
a1.sinks.k1.type =org.apache.flume.plugins.KafkaSink
a1.sinks.k1.metadata.broker.list=192.168.22.7:9092,192.168.22.8:9092,192.168.22.9:9092
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
a1.sinks.k1.request.required.acks=1
a1.sinks.k1.max.message.size=1000000
a1.sinks.k1.producer.type=sync
a1.sinks.k1.custom.encoding=UTF-8
a1.sinks.k1.custom.topic.name=test
a1.sinks.k1.channel=c1
a1.sinks.k1.product.source.name=6

配置读取源文件的读取路径如下:

a1.sources.r1.command = tail -F /home/hadoop/hjh/spark/test.txt

读取的数据传到kafka的哪个topic下:

a1.sinks.k1.custom.topic.name=test

2.启动flume读取数据

bin/flume-ng  agent -c conf -f conf/test.properties -n a1 -Dflume.root.logger=INFO,console

二.kafka缓冲数据

   1.启动zookeeper服务

bin/zookeeper-server-start.sh config/zookeeper.properties

   2.启动kafka服务

bin/kafka-server-start.sh config/server.properties &
3.创建一个topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
集群情况下,localhost换成集群的master地址
4.查看kafka的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
三.SparkStream处理数据
1.用spark中自带例子进行测试
进入spark目录
bin/run-example org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 my-consumer-group test 1

zoo01,zoo02,zoo03替换为集群的zookeeper地址

2.往源文件中加入数据
echo "test test" >> test.txt
sparkStream会统计源数据中单词的数量并输出
时间: 2024-08-03 16:28:11

Flume+kakfa+sparkStream实时处理数据测试的相关文章

Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

[TOC] 1 大数据处理的常用方法 前面在我的另一篇文章中<大数据采集.清洗.处理:使用MapReduce进行离线数据分析完整案例>中已经有提及到,这里依然给出下面的图示: 前面给出的那篇文章是基于MapReduce的离线数据分析案例,其通过对网站产生的用户访问日志进行处理并分析出该网站在某天的PV.UV等数据,对应上面的图示,其走的就是离线处理的数据处理方式,而这里即将要介绍的是另外一条路线的数据处理方式,即基于Storm的在线处理,在下面给出的完整案例中,我们将会完成下面的几项工作: 1

Kakfa的设计思想

Kafka Kafka是最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统.低延迟的实时系统.storm/Spark流式处理引擎,web/nginx日志.访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目. 1.前言 消息队列的性能好坏

大数据flume日志采集系统详解

一.flume介绍 flume 是一个cloudera提供的 高可用高可靠,分布式的海量日志收集聚合传输系统.Flume支持日志系统中定制各类数据发送方,用于收集数据.同时flume提供对数据进行简单处理,并写到各种数据接收方(可定制)的能力. 二.功能介绍   日志收集 Flume最早是Cloudera提供的日志收集系统,目前是Apache下的一个孵化项目,Flume支持在日志系统中定制各类数据发送方,用于收集数据. 流程:恒生数据接收中间件---file.txt  哪个端口进行监控 ---

大数据测试笔记(1)-测试的3条建议

大数据,咋一听起来都觉得很神秘,很高大上,从2013年开始听得越来越多,什么数据挖掘,数据分析.机器学习.算法,让我等听起来天马行空,雾里看花,有幸接触到了大数据项目,让我拨开云雾,原来大数据其实简单,真的简单,大量数据嘛,就是我们说的大数据,基于数据分析,获得有价值的信息. 目前我理解大数据,有数据采集.数据存储.数据分析.数据应用,前两者是基础,后两者是价值,采集存储数据不是目的,利用数据分析有价值的信息,才是我们选择的. 我们不展开聊,作为测试,我关心的是我要测试什么,如何测试,怎么衡量产

日志系统之基于flume收集docker容器日志

最近我在日志收集的功能中加入了对docker容器日志的支持.这篇文章简单谈谈策略选择和处理方式. 关于docker的容器日志 docker 我就不多说了,这两年火得发烫.最近我也正在把日志系统的一些组件往docker里部署.很显然,组件跑在容器里之后很多东西都会受到容器的制约,比如日志文件就是其中之一. 当一个组件部署到docker中时,你可以通过如下命令在标准输出流(命令行)中查看这个组件的日志: docker logs ${containerName} 日志形如: 但这种方式并不能让你实时获

Storm与Spark:谁才是我们的实时处理利器

实时商务智能这一构想早已算不得什么新生事物(早在2006年维基百科中就出现了关于这一概念的页面).然而尽管人们多年来一直在对此类方案进行探讨,我却发现很多企业实际上尚未就此规划出明确发展思路.甚至没能真正意识到其中蕴含的巨大效益. 为什么会这样?一大原因在于目前市场上的实时商务智能与分析工具仍然非常有限.传统数据仓库环境针对的主要是批量处理流程,这类方案要么延迟极高.要么成本惊人——当然,也可能二者兼具. 然而已经有多款强大而且易于使用的开源平台开始兴起,欲彻底扭转目前的不利局面.其中最值得关注

【采集层】Kafka 与 Flume 如何选择

采集层 主要可以使用Flume, Kafka两种技术. Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API. Kafka:Kafka是一个可持久化的分布式的消息队列. Kafka 是一个非常通用的系统.你可以有许多生产者和很多的消费者共享多个主题Topics.相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase发送数据.它对HDFS有特殊的优化,并且集成了Hadoop的安全特性.所以,Cloudera 建议如果数据被多个系统消费的话,使用

Apache Storm 与 Spark:对实时处理数据,如何选择【翻译】

原文地址 实时商务智能这一构想早已算不得什么新生事物(早在2006年维基百科中就出现了关于这一概念的页面).然而尽管人们多年来一直在对此类方案进行探讨,我却发现很多企业实际上尚未就此规划出明确发展思路.甚至没能真正意识到其中蕴含的巨大效益. 为什么会这样?一大原因在于目前市场上的实时商务智能与分析工具仍然非常有限.传统数据仓库环境针对的主要是批量处理流程,这类方案要么延迟极高.要么成本惊人--当然,也可能二者兼具. 然而已经有多款强大而且易于使用的开源平台开始兴起,欲彻底扭转目前的不利局面.其中

flume+kafka+hdfs构建实时消息处理系统

flume是一个实时消息收集系统,它定义了多种的source.channel.sink,可以根据实际情况选择. Flume下载及文档: http://flume.apache.org/ Kafka kafka是一种高吞吐量的分布式发布订阅消息系统,她有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能. 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息. 支持通过kafka服务器和消费机集群来分区消息. 支持Ha