Flume 学习笔记之 Flume NG+Kafka整合

Flume NG集群+Kafka集群整合:

修改Flume配置文件(flume-kafka-server.conf),让Sink连上Kafka

hadoop1:

#set Agent name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# other node,nna to nns
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop1
a1.sources.r1.port = 52020
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = Collector
a1.sources.r1.interceptors.i1.value = hadoop1
a1.sources.r1.channels = c1
#set sink to hdfs
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = ScalaTopic
a1.sinks.k1.brokerList = hadoop1:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel=c1

hadoop2:

#set Agent name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# other node,nna to nns
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop2
a1.sources.r1.port = 52020
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = Collector
a1.sources.r1.interceptors.i1.value = hadoop2
a1.sources.r1.channels = c1
#set sink to hdfs
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = ScalaTopic
a1.sinks.k1.brokerList = hadoop2:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel=c1

集群测试:

  1. 启动zookeeper(hadoop1,hadoop2,hadoop3)
  2. 启动kafka server和consumer(hadoop1,hadoop2)
  3. 启动Flume server(hadoop1,hadoop2):flume-ng agent --conf conf --conf-file /usr/local/flume/conf/flume-kafka-server.conf --name a1 -Dflume.root.logger=INFO,console
  4. 启动Flume client(hadoop3):flume-ng agent --conf conf --conf-file /usr/local/flume/conf/flume-client.conf --name agent1 -Dflume.root.logger=INFO,console
  5. 在hadoop3上追加一条日志记录
  6. kafka consumer收到记录,从则测试完毕。

hadoop3:

hadoop1:

测试完毕,这样Flume+kafka就整合起来了,即Flume+Kafka+Spark Streaming的实时日志分析系统就孕育而生了。

时间: 2024-08-10 17:09:03

Flume 学习笔记之 Flume NG+Kafka整合的相关文章

Flume 学习笔记之 Flume NG高可用集群搭建

Flume NG高可用集群搭建: 架构总图: 架构分配: 角色 Host 端口 agent1 hadoop3 52020 collector1 hadoop1 52020 collector2 hadoop2 52020 agent1配置(flume-client.conf): #agent1 name agent1.channels = c1 agent1.sources = r1 agent1.sinks = k1 k2 #set gruop agent1.sinkgroups = g1 #

Flume 学习笔记之 Flume NG概述及单节点安装

Flume NG概述: Flume NG是一个分布式,高可用,可靠的系统,它能将不同的海量数据收集,移动并存储到一个数据存储系统中.轻量,配置简单,适用于各种日志收集,并支持 Failover和负载均衡.其中Agent包含Source,Channel和 Sink,三者组建了一个Agent.三者的职责如下所示: Source:用来消费(收集)数据源到Channel组件中 Channel:中转临时存储,保存所有Source组件信息 Sink:从Channel中读取,读取成功后会删除Channel中的

mybatis学习笔记(14)-spring和mybatis整合

mybatis学习笔记(14)-spring和mybatis整合 mybatis学习笔记14-spring和mybatis整合 整合思路 整合环境 sqlSessionFactory 原始dao开发和spring整合后 mapper代理开发 遇到的问题 本文主要将如何将spring和mybatis整合,只是作简单的示例,没有使用Maven构建.并展示mybatis与spring整合后如何进行原始dao开发和mapper代理开发. 整合思路 需要spring通过单例方式管理SqlSessionFa

Flume学习笔记(二)问题整理

本文环境如下: 操作系统:CentOS 7.2.1511 64位 Flume版本:1.6.0 1. 当Flume与Hadoop不在同一服务器上 当Flume与Hadoop不在同一服务器上时,又配置了写HDFS,则Flume启动时会报找不到类的错误. 需要添加Hadoop相关的包到flume的classpath配置中(或者直接拷贝到flume的lib文件夹中). 具体需要的包,我是在maven项目中配置: <dependency> <groupId>org.apache.hadoop

flume学习一:flume基础知识

一.Flume使用的前提: Flume使用 java编写,其需要运行在 Java1.6或更高版本之上. 二.Flume的定义: Flume是一个分布式.可靠.高效可用的海量日志采集.聚合和传输系统,支持在系统中定制各类数据发送方,用于搜集数据:同时,flume提供对数据进行加单处理,并写到各种数据接受方(可定制)的能力. 核心一句话:将数据从数据源收集过来,再送到目的地.为了保证输送一定成功,在送到目的地之前,会先缓冲数据,待数据真正到的目的地后,删除自己缓冲的数据. 三.组件及作用: 1.so

Flume学习笔记

在HDFS中,文件只作为目录项存在,在文件关闭前,其长度一直显示为0.如果在一段时间内将数据写到文件中,但却没有将其关闭,那么一旦客户端出现网络中断,什么都得不到,只有一个空白的文件. Flume的agent由三个部件构成:source.channel.sink. 其结构图如下: 三者之间的关系如下: source将event写到一个或多个channel中. channel作为event从source到sink传输的保留区. sink只从一个channel接收event. agent可能会有多个

Hadoop学习笔记—19.Flume框架学习

START:Flume是Cloudera提供的一个高可用的.高可靠的开源分布式海量日志收集系统,日志数据可以经过Flume流向需要存储终端目的地.这里的日志是一个统称,泛指文件.操作记录等许多数据. 一.Flume基础理论 1.1 常见的分布式日志收集系统 Scribe是facebook开源的日志收集系统,在facebook内部已经得到大量的应用. Chukwa 是一个开源的用于监控大型分布式系统的数据收集系统.这是构建在 hadoop 的 hdfs 和 map/reduce 框架之上的,继承了

学习笔记——Spring+SpringMVC+MyBatis框架整合

一.Maven创建项目 1. 在Eclipse中选择New -> Project -> Maven -> Maven Project 2. 选择默认workspace之后建立maven-webapp 3. 填写Group Id和Artifact Id(项目名称) 4. 建立工程后发现目录结构报错 5. 为了避免乱码,右键点击工程选择Properties -> Resource,选择编码方式为UTF-8 6. 在Properties中选择Java Build Path -> J

PMP项目管理学习笔记(4)——项目整合管理

六个整合管理过程. 1.制定项目章程 一个新项目要完成的第一件事,就是项目章程的制定.这是授权你开展工作的文档.不过并不总是需要你介入,通常情况下会由赞助人交给你.如果没有项目章程,你就没有权利告诉你的团队要做什么以及什么时候做. 2.制定项目管理计划 项目管理计划是整个,项目中最重要的文档,因为它将指导项目中进行的所有工作,项目管理计划涵盖了所有知识领域.项目管理计划的很大一部分就是告诉你在出现问题时如何处理变更. 3.指导和管理项目执行 完成计划之后,接下来就是具体的工作了.你的任务就是确保