flume+kafka+hdfs构建实时消息处理系统

flume是一个实时消息收集系统,它定义了多种的source、channel、sink,可以根据实际情况选择。

Flume下载及文档:

http://flume.apache.org/

Kafka

kafka是一种高吞吐量的分布式发布订阅消息系统,她有如下特性:

  • 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
  • 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
  • 支持通过kafka服务器和消费机集群来分区消息。
  • 支持Hadoop并行数据加载。

kafka的目的是提供一个发布订阅解决方案,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。

kafka分布式订阅架构如下图:--取自Kafka官网

配置kafka的配置文件 server.properties ,其它可根据自己的情况修改。

启动kafka,启动之前先启动zookeeper,zookeeper的配置不再叙述。

# bin/kafka-server-start.sh config/server.properties

Create a topic

# bin/kafka-topics.sh --create --zookeeper localhost:2181 ----replication-factor 1 --partitions 1 --topic test

查看topic

# bin/kafka-topics.sh  --list  --zookeeper localhost:2181

测试是否能正常生产消费;验证流程正确性

# bin/kafka-console-producer.sh--broker-list localhost:9092 --topic test

# bin/kafka-console-consumer.sh--zookeeper localhost:2181 --topic test --from-beginning

接下来是框架之间的整合

flume和kafka整合

1.下载flume-kafka-plus:https://github.com/beyondj2ee/flumeng-kafka-plugin

2.提取插件中的flume-conf.properties文件

修改该文件:#source section

producer.sources.s.type = exec
producer.sources.s.command = tail -f -n+1 /mnt/hgfs/vmshare/test.log
producer.sources.s.channels = c

修改所有topic的值改为test

将改后的配置文件放进flume/conf目录下

在该项目中提取以下jar包放入环境中flume的lib下:

还有package目录的flumeng-kafka-plugin.jar包一并放到flume的lib目录下。

附上flume的配置文件

############################################

#  producer config

###########################################

#agent section

producer.sources = s

producer.channels = c

producer.sinks = r

#source section

producer.sources.s.type = exec

producer.sources.s.channels = c

producer.sources.s.command = tail -f  /var/log/messages

#producer.sources.s.type=spooldir

#producer.sources.s.spoolDir=/home/xiaojie.li

#producer.sources.s.fileHeader=false

#producer.sources.s.type=syslogtcp

#producer.sources.s.port=5140

#producer.sources.s.host=localhost

# Each sink‘s type must be defined

producer.sinks.r.type = org.apache.flume.plugins.KafkaSink

producer.sinks.r.metadata.broker.list=10.10.10.127:9092

producer.sinks.r.zk.connect=10.10.10.127:2181

producer.sinks.r.partition.key=0

producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition

producer.sinks.r.serializer.class=kafka.serializer.StringEncoder

producer.sinks.r.request.required.acks=0

producer.sinks.r.max.message.size=1000000

producer.sinks.r.producer.type=sync

producer.sinks.r.custom.encoding=UTF-8

producer.sinks.r.custom.topic.name=test

#Specify the channel the sink should use

producer.sinks.r.channel = c

# Each channel‘s type is defined.

producer.channels.c.type = memory

producer.channels.c.capacity = 1000

producer.channels.c.transactionCapacity=100

#producer.channels.c.type=file

#producer.channels.c.checkpointDir=/home/checkdir

#producer.channels.c.dataDirs=/home/datadir

验证flume和kafka组合

前面kafka已经启动,这里直接启动flume

# bin/flume-ng  agent -c conf -f conf/master.properties -n producer -Dflume.root.logger=INFO,console

使用kafka的kafka-console-consumer.sh脚本查看是否有flume有没有往Kafka传输数据;

可以看到tail /var/log/messages已经通过flume传到kafka里,说明flume+kafka组合已经成功了。

日志最终需要保存在hdfs里

还需要自己开发插件去实现,这里不再多说。

时间: 2024-10-05 15:11:13

flume+kafka+hdfs构建实时消息处理系统的相关文章

使用 Kafka 和 Spark Streaming 构建实时数据处理系统(转)

原文链接:http://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice2/index.html?ca=drs-&utm_source=tuicool 引言 在很多领域,如股市走向分析, 气象数据测控,网站用户行为分析等,由于数据产生快,实时性强,数据量大,所以很难统一采集并入库存储后再做处理,这便导致传统的数据处理架构不能满足需要.流计算的出现,就是为了更好地解决这类数据在处理过程中遇到的问题.与传统架构不同,流计算模型

flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统

一.Hadoop配置安装 注意:apache提供的hadoop-2.6.0的安装包是在32位操作系统编译的,因为hadoop依赖一些C++的本地库, 所以如果在64位的操作上安装hadoop-2.4.1就需要重新在64操作系统上重新编译 1.修改Linux主机名 2.修改IP 3.修改主机名和IP的映射关系 ######注意######如果你们公司是租用的服务器或是使用的云主机(如华为用主机.阿里云主机等) /etc/hosts里面要配置的是内网IP地址和主机名的映射关系 4.关闭防火墙 5.s

NoSQL:如何使用NoSQL架构构建实时广告系统

JDNoSQL平台是什么 JDNoSQL平台是一个分布式面向列的KeyValue毫秒级存储服务,存储结构化数据和非机构化数据,支持随机读写与更新,灵活的动态列机制,架构上支持水平扩容,提供高并发.低延迟.高可用.强一致数据库服务,可满足各种业务场景.完善的平台支持,支持业务自助化建表,查看监控,在线DDL等. 1.1 JDNoSQL所处生态的位置 从上图可以看出,JDNoSQL是一种构建在HDFS之上的分布式.面向列的存储系统.在需要实时读写.随机访问超大规模数据集时,可以使用JDNoSQL.目

flume+kafka+hdfs详解

flume架构图 单节点flume配置 flume-1.4.0  启动flume bin/flume-ng agent --conf ./conf  -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent -n表示配置文件中agent的名字 agent.sources = r1 agent.sinks = s1 agent.channels = c1 agent.sources.r1.channels = 

基于storm,kafka,mysql的实时统计系统

公司对客户开放多个系统,运营人员想要了解客户使用各个系统的情况,在此之前,数据平台团队已经建设好了统一的Kafka消息通道. 为了保证架构能够满足业务可能的扩张后的性能要求,选用storm来处理各个应用系统上传到kafka中的埋点数据并在Mysql中汇聚. 埋点数据上报的格式为json,会上报类似如下的数据 { "account": "001", "accountName": "旺财宝", "subaccount&q

使用inotify-tools与rsync构建实时备份系统

使用inotifywait监控文件变动 inotifywait是 inotify-tools 包中提供的一个工具,它使用 inotify API 来监控文件/目录中的变动情况. 在archlinux上,我们可以使用下面命令来安装 sudo pacman -S --noconfirm inotify-tools 平时 inotifywait 会挂起在那里,直到文件/目录发生了要引起关注的事件后,它会退出并输出事件发生的场所.事件的名称以及引起事件的文件(当事件发生在目录上时才会输出). inoti

Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

[TOC] 1 大数据处理的常用方法 前面在我的另一篇文章中<大数据采集.清洗.处理:使用MapReduce进行离线数据分析完整案例>中已经有提及到,这里依然给出下面的图示: 前面给出的那篇文章是基于MapReduce的离线数据分析案例,其通过对网站产生的用户访问日志进行处理并分析出该网站在某天的PV.UV等数据,对应上面的图示,其走的就是离线处理的数据处理方式,而这里即将要介绍的是另外一条路线的数据处理方式,即基于Storm的在线处理,在下面给出的完整案例中,我们将会完成下面的几项工作: 1

[转载] 利用flume+kafka+storm+mysql构建大数据实时系统

原文: http://mp.weixin.qq.com/s?__biz=MjM5NzAyNTE0Ng==&mid=205526269&idx=1&sn=6300502dad3e41a36f9bde8e0ba2284d&key=c468684b929d2be22eb8e183b6f92c75565b8179a9a179662ceb350cf82755209a424771bbc05810db9b7203a62c7a26&ascene=0&uin=Mjk1ODMy

Flume+Kafka+Storm+Redis实时分析系统基本架构

PS:历史原因作者账号名为:ymh198816,但事实上作者的生日并不是1988年1月6日 今天作者要在这里通过一个简单的电商网站订单实时分析系统和大家一起梳理一下大数据环境下的实时分析系统的架构模型.当然这个架构模型只是实时分析技术的一 个简单的入门级架构,实际生产环境中的大数据实时分析技术还涉及到很多细节的处理, 比如使用Storm的ACK机制保证数据都能被正确处理, 集群的高可用架构, 消费数据时如何处理重复数据或者丢失数据等问题,根据不同的业务场景,对数据的可靠性要求以及系统的复杂度的要