Flume和Kafka完成实时数据的采集

Flume和Kafka完成实时数据的采集

写在前面

Flume和Kafka在生产环境中,一般都是结合起来使用的。可以使用它们两者结合起来收集实时产生日志信息,这一点是很重要的。如果,你不了解flume和kafka,你可以先查看我写的关于那两部分的知识。再来学习,这部分的操作,也是可以的。

实时数据的采集,就面临一个问题。我们的实时数据源,怎么产生呢?因为我们可能想直接获取实时的数据流不是那么的方便。我前面写过一篇文章,关于实时数据流的python产生器,文章地址:http://blog.csdn.net/liuge36/article/details/78596876

你可以先看一下,如何生成一个实时的数据...

思路??如何开始呢??

分析:我们可以从数据的流向着手,数据一开始是在webserver的,我们的访问日志是被nginx服务器实时收集到了指定的文件,我们就是从这个文件中把日志数据收集起来,即:webserver=>flume=>kafka

webserver日志存放文件位置

这个文件的位置,一般是我们自己设置的

我们的web日志存放的目录是在:

/home/hadoop/data/project/logs/access.log下面

[[email protected] logs]$ pwd
/home/hadoop/data/project/logs
[[email protected] logs]$ ls
access.log
[[email protected] logs]$ 

Flume

做flume,其实就是写conf文件,就面临选型的问题

source选型?channel选型?sink选型?

这里我们选择 exec source memory channel kafka sink

怎么写呢?

按照之前说的那样1234步骤

从官网中,我们可以找到我们的选型应该如何书写:

1) 配置Source

exec source

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/project/logs/access.log
a1.sources.r1.shell = /bin/sh -c

2) 配置Channel

memory channel

a1.channels.c1.type = memory

3) 配置Sink

kafka sink

flume1.6版本可以参照http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.7.0/FlumeUserGuide.html#kafka-sink

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = hadoop000:9092
a1.sinks.k1.topic = flume_kafka
a1.sinks.k1.batchSize = 5
a1.sinks.k1.requiredAcks =1

4) 把以上三个组件串起来

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

我们new一个文件叫做test3.conf

把我们自己分析的代码贴进去:

[[email protected] conf]$ vim test3.conf
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/project/logs/access.log
a1.sources.r1.shell = /bin/sh -c

a1.channels.c1.type = memory

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = hadoop000:9092
a1.sinks.k1.topic = flume_kafka
a1.sinks.k1.batchSize = 5
a1.sinks.k1.requiredAcks =1

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

这里我们先不启动,因为其中涉及到kafka的东西,必须先把kafka部署起来,,

Kafka的部署

kafka如何部署呢??

参照官网的说法,我们首先启动一个zookeeper进程,接着,才能够启动kafka的server

Step 1: Start the zookeeper

[[email protected] ~]$
[[email protected] ~]$ jps
29147 Jps
[[email protected] ~]$ zkServer.sh start
JMX enabled by default
Using config: /home/hadoop/app/zk/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[[email protected] ~]$ jps
29172 QuorumPeerMain
29189 Jps
[[email protected] ~]$

Step 2: Start the server

[[email protected] ~]$ kafka-server-start.sh $KAFKA_HOME/config/server.properties
#外开一个窗口,查看jps
[[email protected] ~]$ jps
29330 Jps
29172 QuorumPeerMain
29229 Kafka
[[email protected] ~]$

如果,这部分不是很熟悉,可以参考http://blog.csdn.net/liuge36/article/details/78592169

Step 3: Create a topic

[[email protected] ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flume_kafka
WARNING: Due to limitations in metric names, topics with a period (‘.‘) or underscore (‘_‘) could collide. To avoid issues it is best to use either, but not both.
Created topic "flume_kafka".
[[email protected] ~]$

Step 4: 开启之前的agent

  [[email protected] conf]$ flume-ng agent --name a1 --conf . --conf-file ./test3.conf -Dflume.root.logger=INFO,console

Step 5: Start a consumer

kafka-console-consumer.sh --zookeeper hadoop000:2181 –topic flume-kafka

上面的第五步执行之后,就会收到刷屏的结果,哈哈哈!!

上面的消费者会一直一直的刷屏,还是很有意思的!!!

这里的消费者是把接收到的数据数据到屏幕上

后面,我们会介绍,使用SparkStreaming作为消费者实时接收数据,并且接收到的数据做简单数据清洗的开发,从随机产生的日志中筛选出我们需要的数据.....

原文地址:https://www.cnblogs.com/liuge36/p/9883008.html

时间: 2024-10-20 03:07:26

Flume和Kafka完成实时数据的采集的相关文章

Flume整合Kafka完成实时数据采集

agent选择 agent1 exec source + memory channel + avro sink agent2 avro source + memory channel 模拟实际工作中的场景,agent1 为A机器,agent2 为B机器. avro source: 监听avro端口,并且接收来自外部avro信息, avro sink:一般用于跨节点传输,主要绑定数据移动目的地的ip和port 在创建agent2配置文件 cd /app/flume/flume/conf vi te

flume从kafka中读取数据

a1.sources = r1 a1.sinks = k1 a1.channels = c1 #使用内置kafka source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource #kafka连接的zookeeper a1.sources.r1.zookeeperConnect = localhost:2181 a1.sources.r1.topic = kkt-test-topic a1.sources.r1.batc

大数据数据仓库-基于大数据体系构建数据仓库(Hive,Flume,Kafka,Azkaban,Oozie,SparkSQL)

背景 接着上个文章数据仓库简述,想写一篇数据仓库常用模型的文章,但是自己对数据仓库模型的理解程度和建设架构并没有下面这个技术专家理解的深刻,并且自己去组织语言,可能会有不准确的地方,怕影响大家对数据仓库建模的理解,数据仓库属于一个工程学科,在设计上要体验出工程严谨性,所以这次向大家推荐这篇文章,毕竟IBM在数据仓库和数据集市方面已经做得很成熟了,已经有成型的商业数据仓库组件,这篇文章写的很好,可以让大家很好的理解数据仓库. 版权 作者 周三保([email protected]) IBM 软件部

新闻网大数据实时分析可视化系统项目——9、Flume+HBase+Kafka集成与开发

1.下载Flume源码并导入Idea开发工具 1)将apache-flume-1.7.0-src.tar.gz源码下载到本地解压 2)通过idea导入flume源码 打开idea开发工具,选择File——>Open 然后找到flume源码解压文件,选中flume-ng-hbase-sink,点击ok加载相应模块的源码. 2.官方flume与hbase集成的参数介绍 3.下载日志数据并分析 到搜狗实验室下载用户查询日志 1)介绍 搜索引擎查询日志库设计为包括约1个月(2008年6月)Sogou搜索

Flume简介与使用(三)——Kafka Sink消费数据之Kafka安装

前面已经介绍了如何利用Thrift Source生产数据,今天介绍如何用Kafka Sink消费数据. 其实之前已经在Flume配置文件里设置了用Kafka Sink消费数据 agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink agent1.sinks.kafkaSink.topic = TRAFFIC_LOG agent1.sinks.kafkaSink.brokerList = 10.208.129.3:90

flume 读取kafka 数据

本文介绍flume读取kafka数据的方法 代码: /******************************************************************************* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with

使用flume将kafka数据sink到HBase【转】

1. hbase sink介绍 1.1 HbaseSink 1.2 AsyncHbaseSink 2. 配置flume 3. 运行测试flume 4. 使用RegexHbaseEventSerializer来处理些HBASE的值 5. 效率测试 1. hbase sink介绍 如果还不了解flume请查看我写的其他flume下的博客. 接下来的内容主要来自flume官方文档的学习. 顺便也强烈推荐flume 1.6 官方API hbase的sink主要有以下两种.两种方式都提供和HBASE一样的

Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数

1.创建Maven项目 创建的过程参考:http://blog.csdn.net/tototuzuoquan/article/details/74571374 2.启动Kafka A:安装kafka集群:http://blog.csdn.net/tototuzuoquan/article/details/73430874 B:创建topic等:http://blog.csdn.net/tototuzuoquan/article/details/73430874 3.编写Pom文件 <?xml v

flume从kafka读取数据到hdfs中的配置

#source的名字 agent.sources = kafkaSource # channels的名字,建议按照type来命名 agent.channels = memoryChannel # sink的名字,建议按照目标来命名 agent.sinks = hdfsSink # 指定source使用的channel名字 agent.sources.kafkaSource.channels = memoryChannel # 指定sink需要使用的channel的名字,注意这里是channel