Flume NG 学习笔记(二)单机与集群Flume 配置

下面的内容基本来自官网:http://flume.apache.org/FlumeUserGuide.html

本文使用的是最新版本的apache flume 1.5,安装完Flume然后测试下Flume是否可以用,在Flume目录下用以下语句测试:

bin/flume-ng agent -n$agent_name -c conf -f conf/flume-conf.properties.template

结果如图显示:

Ok,我们接下去看下面常用架构、功能配置示例

一、最简单的单一代理Flume 配置

下面是配置文件:

[html] view plain copy

  1. #文件名:single_case1.conf.conf
  2. #配置内容:
  3. #single_case1.conf.conf: A single-node Flume configuration
  4. #Name the components on this agent
  5. a1.sources= r1
  6. a1.sinks= k1
  7. a1.channels= c1
  8. #Describe/configure the source
  9. a1.sources.r1.type= netcat
  10. a1.sources.r1.bind= localhost
  11. a1.sources.r1.port= 44444
  12. #Describe the sink
  13. a1.sinks.k1.type= logger
  14. #Use a channel which buffers events in memory
  15. a1.channels.c1.type= memory
  16. a1.channels.c1.capacity= 1000
  17. a1.channels.c1.transactionCapacity= 100
  18. #Bind the source and sink to the channel
  19. a1.sources.r1.channels= c1
  20. a1.sinks.k1.channel= c1

说明下,这里所有的例子都是将配置文件放到 $FLUME_HOME/conf 目录下,后面就不赘述了。

 

#敲命令

flume-ng agent -cconf -f conf/single_case1.conf -n a1 -Dflume.root.logger=INFO,console

#参数命令

-c conf 指定配置目录为conf

-f conf/single_case1.conf指定配置文件为conf/single_case1.conf

-n a1 指定agent名字为a1,需要与case1_example.conf中的一致

-Dflume.root.logger=INFO,console指定DEBUF模式在console输出INFO信息

具体参数命令请通过flume-nghelp查看

#然后在另一个终端进行测试

telnet 127.0.0.1 44444

然后会看在之前启动的终端查看console输出到如下:

这里会发现消息hello world! 输出了,而hello world! hello world!hello world!则被拦截了。因为在配置文件中,我们选择的输出方式为:a1.sinks.k1.type= logger

,即console输出,flume-ng针对logger是只显示16个字节的,剩下的都被sink截了。下面是源码
在LoggerSink.Java中:

[java] view plain copy

  1. if(event != null) {
  2. if (logger.isInfoEnabled()) {
  3. logger.info("Event: " + EventHelper.dumpEvent(event));
  4. }
  5. }

我们去看EventHelper.java的dumpEvent方法:

[java] view plain copy

  1. privatestatic final int DEFAULT_MAX_BYTES = 16;
  2. publicstatic String dumpEvent(Event event) {
  3. return dumpEvent(event, DEFAULT_MAX_BYTES);
  4. }
  5. publicstatic String dumpEvent(Event event, int maxBytes) {
  6. StringBuilder buffer = new StringBuilder();
  7. if (event == null || event.getBody() == null) {
  8. buffer.append("null");
  9. } else if (event.getBody().length == 0) {
  10. // do nothing... in this case, HexDump.dump() will throw anexception
  11. } else {
  12. byte[] body = event.getBody();
  13. byte[] data = Arrays.copyOf(body, Math.min(body.length,maxBytes));
  14. ByteArrayOutputStream out = new ByteArrayOutputStream();
  15. try {
  16. HexDump.dump(data, 0, out, 0);
  17. String hexDump = new String(out.toByteArray());
  18. // remove offset since it‘s not relevant for such a smalldataset
  19. if(hexDump.startsWith(HEXDUMP_OFFSET)) {
  20. hexDump =hexDump.substring(HEXDUMP_OFFSET.length());
  21. }
  22. buffer.append(hexDump);
  23. } catch (Exception e) {
  24. if(LOGGER.isInfoEnabled()) {
  25. LOGGER.info("Exception while dumpingevent", e);
  26. }
  27. buffer.append("...Exception while dumping:").append(e.getMessage());
  28. }
  29. String result = buffer.toString();
  30. if(result.endsWith(EOL) && buffer.length() >EOL.length()) {
  31. buffer.delete(buffer.length() - EOL.length(),buffer.length()).toString();
  32. }
  33. }
  34. return "{ headers:" + event.getHeaders() + " body:"+ buffer + " }";
  35. }

不难看出,在event处理过程中,发生了数据截取操作。

Ok,进入下一个环节。

二、“集群”代理Flume 配置

这里集群的概念是多台机器的管理,最简单的就是两台机器一台代理主机从数据源获取数据,然后将数据在传送到另一台主机上,进行输出。这样做的意义是,一个业务多数据源的时候,我们可以对每个数据源设置代理,然后将它们汇总到一台代理主机上进行输出。

下面实现最简单的集群配置,即两个代理,一台接受数据源数据的代理将数据推送到汇总的代理,而汇总的代理再将数据输出。因此这两台主机分别是push,pull

根据上图需要用AVRO RPC通信,因此推数据sinks类型与拉数据的sources的类型都是avro 。而拉数据代理的数据源,我们用前文讲的Spool Source 形式来处理,这里我们预先建好目录与文件,test.log

下面设置推代理主机的flume配置文件:

[html] view plain copy

  1. #推数据代理的配置文件push.conf
  2. #Name the components on this agent
  3. a2.sources= r1
  4. a2.sinks= k1
  5. a2.channels= c1
  6. #Describe/configure the source
  7. a2.sources.r1.type= spooldir
  8. a2.sources.r1.spoolDir= /tmp/logs
  9. a2.sources.r1.channels= c1
  10. #Use a channel which buffers events in memory
  11. a2.channels.c1.type= memory
  12. a2.channels.c1.keep-alive= 10
  13. a2.channels.c1.capacity= 100000
  14. a2.channels.c1.transactionCapacity= 100000
  15. #Describe/configure the source
  16. a2.sinks.k1.type= avro
  17. a2.sinks.k1.channel= c1
  18. a2.sinks.k1.hostname= pull
  19. a2.sinks.k1.port= 4444

下面设置汇总代理主机的flume配置文件:

[html] view plain copy

  1. #汇总数据代理的配置文件pull.conf
  2. #Name the components on this agent
  3. a1.sources= r1
  4. a1.sinks= k1
  5. a1.channels= c1
  6. #Describe/configure the source
  7. a1.sources.r1.type= avro
  8. a1.sources.r1.channels= c1
  9. a1.sources.r1.bind= pull
  10. a1.sources.r1.port= 44444
  11. #Describe the sink
  12. a1.sinks.k1.type= logger
  13. a1.sinks.k1.channel = c1
  14. #Use a channel which buffers events in memory
  15. a1.channels.c1.type= memory
  16. a1.channels.c1.keep-alive= 10
  17. a1.channels.c1.capacity= 100000
  18. a1.channels.c1.transactionCapacity= 100000

虽然Spool Source是非实时的,但由于数据量少,处理还是很快的,因此我们只能先启动pull代理。

#敲命令

flume-ng agent -c conf -f conf/pull.conf -n a1 -Dflume.root.logger=INFO,console

上图显示成功。

先后去启动push主机的flume

#敲命令

flume-ng agent -n a2 -c conf -f conf/push.conf -Dflume.root.logger=INFO,console

查看pull主机的状态,发现数据已经传过来了。

然后会过去看push主机的文件

已经加上后缀名.COMPLETED。这与前文说的是一致的。

下面只要将新数据存入到目录/tmp/logs,push主机就会将数据发送到pull主机输出,并修改新数据文件的文件名。

时间: 2024-07-30 10:19:28

Flume NG 学习笔记(二)单机与集群Flume 配置的相关文章

Flume NG 学习笔记(五)Sinks和Channel配置

一.HDFS Sink Flume Sink是将事件写入到Hadoop分布式文件系统(HDFS)中.主要是Flume在Hadoop环境中的应用,即Flume采集数据输出到HDFS,适用大数据日志场景. 目前,它支持HDFS的文本和序列文件格式,以及支持两个文件类型的压缩.支持将所用的时间.数据大小.事件的数量为操作参数,对HDFS文件进行关闭(关闭当前文件,并创建一个新的).它还可以对事源的机器名(hostname)及时间属性分离数据,即通过时间戳将数据分布到对应的文件路径. HDFS目录路径可

Hadoop学习笔记_4_实施Hadoop集群 --伪分布式安装

实施Hadoop集群 --伪分布式安装 准备与配置安装环境 安装虚拟机和linux,虚拟机推荐使用vmware,PC可以使用workstation,服务器可以使用ESXi,在管理上比较方便.ESXi还可以通过拷贝镜像文件复制虚拟机,复制后自动修改网卡号和ip,非常快捷.如果只是实验用途,硬盘大约预留20-30G空间. 以Centos为例,分区可以选择默认[如果想要手动分区,请参考博客:http://blog.csdn.net/zjf280441589/article/details/175485

Heartbeat学习笔记--HA高可用集群实现

一.部署环境: 服务器版本:CentOS6.5 双主热备模式: VIP:192.168.3.30(MASTER上) VIP:192.168.3.32(BACKUP上) 主机网络参数: 接口 MASTER BACKUP 说明 eth1 192.168.3.23 192.168.3.24 内网管理IP eth2 192.168.5.23 192.168.5.24 心跳线 eth3 192.168.2.23 192.168.2.24 外网(临时下载文件用) 网络拓扑: 二.需求分析: 通过Heartb

Hadoop学习笔记_8_实施Hadoop集群 --分布式安装Hadoop

实施Hadoop集群 --分布式安装Hadoop 说明: 以Ubuntu配置为例,其中与CentOS不同之处会给出详细说明 现有三台服务器:其IP与主机名对应关系为: 192.168.139.129 master #NameNode/JobTrackerr结点 192.168.139.132 slave01 #DataNode/TaskTracker结点 192.168.139.137 slave02 #DataNode/TaskTracker结点 一.配置ssh实现Hadoop节点间用户的无密

Hadoop的学习前奏(二)——Hadoop集群的配置

前言: Hadoop集群的配置即全然分布式Hadoop配置. 笔者的环境: Linux:  CentOS 6.6(Final) x64 JDK:    java version "1.7.0_75" OpenJDK Runtime Environment (rhel-2.5.4.0.el6_6-x86_64 u75-b13) OpenJDK 64-Bit Server VM (build 24.75-b04, mixed mode) SSH:    OpenSSH_5.3p1, Ope

Flume NG 学习笔记(一)简介

一.简介 Flume是一个分布式.可靠.高可用的海量日志聚合系统,支持在系统中定制各类数据发送方,用于收集数据:同时,Flume提供对数据的简单处理,并写到各种数据接收方的能力. Flume在0.9.x and 1.x之间有较大的架构调整,1.x版本之后的改称Flume NG(next generation),0.9.x的称为Flume OG(originalgeneration). 对于OG版本, Flume NG (1.x.x)的主要变化如下: 1.sources和sinks 使用chann

Flume NG 学习笔记(三)流配置

版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] 在通过flume采集日志数据的时候,一般都是通过flume 代理从日志源或者日志客户端采集数据到flume代理中,然后再由flume代理送到目标存储.上图中就是每个一级flume代理负责从webserv采集数据,然后再由一个二级flume代理进行日志汇总. Flume支持从一个源发送事件到多个通道中,这被称为事件流的复用.这里需要在配置中定义事件流的复制/复用,选择1个或者多个通道进行数据流向. 下面的内容主要介绍flume

Flume NG 学习笔记(四)Source配置

首先.这节水的东西就比较少了,大部分是例子. 一.Avro Source与Thrift Source Avro端口监听并接收来自外部的Avro客户流的事件.当内置Avro 去Sinks另一个配对Flume代理,它就可以创建分层采集的拓扑结构.官网说的比较绕,当然我的翻译也很弱,其实就是flume可以多级代理,然后代理与代理之间用Avro去连接 下面是官网给出的source的配置,加粗的参数是必选,描述就不解释了. Property Name Default Description channel

学习MongoDB(二) Replica Set集群配置

1.官方网址:https://docs.mongodb.org/manual/tutorial/deploy-replica-set-for-testing/ 2.Replica Set介绍: https://docs.mongodb.org/manual/core/replica-set-members/ 3.简单叙述一下什么是Replica Set: 如果我们使用单一的Mongodb数据库,那么它会存在单点故障等问题,Mongodb允许我们将多个单一的Mongodb数据库组成一个(“集群Re