Flume-ng安装与使用

一、 Flume NG核心概念

二、 Flumen NG数据流模型

Flume以agent为最小的独立运行单位。一个agent就是一个JVM。单agent由Source、Sink和Channel三大组件构成。

Flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source,比如上图中的Web Server生成。当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。

很直白的设计,其中值得注意的是,Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。

如果你以为Flume就这些能耐那就大错特错了。Flume支持用户建立多级流,也就是说,多个agent可以协同工作,并且支持Fan-in扇入、Fan-out、Contextual Routing上下文路由、Backup Routes。如下图所示。

三、开发第三方组件

flume可支持avro、log4j、syslog、HTTP Post(带有JSON参数)的source数据来源。

在当前的数据来源选项不方便使用时,也可以构建第三方客户端client机制向FLUME发送日志。构建第三方客户端client的方法有两个:

(1)构建客户端client与flume已有的某个source,比如avroSource或syslogTcpSource,进行通信,客户端client需要将它的数据转换成这些flume source可理解的message对象。

(2)另一种方法就是写一个第三方的Flumen Source,直接与使用了IPC或RPC协议的客户端应用程序进行通话,再将数据客户端数据转换成flume的Event。

1. flume SDK

lume Client SDK 是一个类库,可使得应用程序连接到Flumen,并通过RPC协议直接向Flume发送数据到flume的数据流中。

2. RPC client接口

对Flume RPCclient接口的实现,封装在了RPC机制里面,用户应用程序可以简单调用flume client SDK中的方法append(Event)或appendBatch(List<Event>),发送数据,而不需要考虑底层信息交换细节。如何提供Event对象参数呢?(1)可以直接使用Event接口,使用一个简单的实现方法simpleEvent类;(2)可以使用EventBuilder的重载方法withBody()静态帮助方法;

3. RPCclients——Avro和 Thrift

The client needs tocreate this object with the host and port of the target Flume agent, and canthen use the RpcClient to send data into the agent. The following example showshow to use the Flume Client SDK API within a user’s data-generatingapplication:

对于1.4.0版本中,Avro是默认的RPC协议。NettyAvroRpcClient 和 ThriftRpcClient实现了RpcClient接口。client客户端需要创建包含Flume agent日志服务器hostname(即IP)和端口(即Port)的对象,才能使用RPCclient发送数据到agent中。下面的例子展示了如何在用户的应用程序中使用flumeclient SDK:

importorg.apache.flume.Event;

importorg.apache.flume.EventDeliveryException;

importorg.apache.flume.api.RpcClient;

importorg.apache.flume.api.RpcClientFactory;

importorg.apache.flume.event.EventBuilder;

importjava.nio.charset.Charset;

public class MyApp {

public static voidmain(String[]args) {

MyRpcClientFacade client = newMyRpcClientFacade();

// Initializeclient with the remote Flume agent‘s host and port

client.init("host.example.org",41414);

// Send 10 eventsto the remote Flume agent. That agent should be

// configured tolisten with an AvroSource.

String sampleData = "HelloFlume!";

for (int i = 0; i < 10; i++) {

client.sendDataToFlume(sampleData);

}

client.cleanUp();

}

}

classMyRpcClientFacade {

private RpcClient client;

private String hostname;

private int port;

public voidinit(String hostname,int port) {

// Setup the RPCconnection

this.hostname = hostname;

this.port = port;

this.client =RpcClientFactory.getDefaultInstance(hostname,port);

// Use thefollowing method to create a thrift client (instead of the above line):

// this.client =RpcClientFactory.getThriftInstance(hostname, port);

}

public voidsendDataToFlume(String data){

// Create a FlumeEvent object that encapsulates the sample data

Event event = EventBuilder.withBody(data,Charset.forName("UTF-8"));

// Send the event

try {

client.append(event);

} catch (EventDeliveryExceptione) {

// clean up andrecreate the client

client.close();

client = null;

client = RpcClientFactory.getDefaultInstance(hostname,port);

// Use thefollowing method to create a thrift client (instead of the above line):

// this.client =RpcClientFactory.getThriftInstance(hostname, port);

}

}

public voidcleanUp() {

// Close the RPCconnection

client.close();

}

}

此时远程flume agent需要有一个AvroSource(或ThriftSource,如果使用Thrift client的话)在监听数据传输接口。下面就是一个flume agent的配置文件,该agent在等待用户应用程序的连接:

a1.channels=c1
a1.sources=r1
a1.sinks=k1
 
a1.channels.c1.type=memory
 
a1.sources.r1.channels=c1
a1.sources.r1.type=avro
# For using a thrift source set the following instead of the above line.
# a1.source.r1.type = thrift
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=41414
 
a1.sinks.k1.channel=c1
a1.sinks.k1.type=logger

为了增加扩展性,上述配置文件中关于默认的Flume client的实现可以进行如下配置:

client.type=default (for avro) or thrift (for thrift)
 
hosts=h1                           # default client accepts only 1 host
                                     # (additional hosts will be ignored)
 
hosts.h1=host1.example.org:41414   # host and port must both be specified
                                     # (neither has a default)
 
batch-size=100                     # Must be >=1 (default: 100)
 
connect-timeout=20000              # Must be >=1000 (default: 20000)
 
request-timeout=20000              # Must be >=1000 (default: 20000)

4. failoverclient 容错client

这个类中包括了默认的Avro RpcClient 来提供client客户端的容错处理能力。通过使用间隔的<host>:<port>列表来提供多个flume agents,组成一个容错组(即用一组agent来提供备用的agent,如:<host1>:<port1><host2>:<port2> <host3>:<port3> ... 如果第一个agent服务器挂了,那么自动启用第二个agent服务器)。目前容错RPCclient还不支持thrift。client类中可定义如下:

// Setup properties for the failover
Propertiesprops=newProperties();
props.put("client.type","default_failover");
// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts","h1 h2 h3");
// host/port pair for each host alias
Stringhost1="host1.example.org:41414";
Stringhost2="host2.example.org:41414";
Stringhost3="host3.example.org:41414";
props.put("hosts.h1",host1);
props.put("hosts.h2",host2);
props.put("hosts.h3",host3);
// create the client with failover properties
RpcClientclient=RpcClientFactory.getInstance(props);

为了提高扩展性,failoverclient的实现中可以进行如下的配置:

client.type=default_failover
 
hosts=h1 h2 h3                     # at least one is required, but 2 or
                                     # more makes better sense
 
hosts.h1=host1.example.org:41414
 
hosts.h2=host2.example.org:41414
 
hosts.h3=host3.example.org:41414
 
max-attempts=3                     # Must be >=0 (default: number of hosts
                                     # specified, 3 in this case). A ‘0‘
                                     # value doesn‘t make much sense because
                                     # it will just cause an append call to
                                     # immmediately fail. A ‘1‘ value means
                                     # that the failover client will try only
                                     # once to send the Event, and if it
                                     # fails then there will be no failover
                                     # to a second client, so this value
                                     # causes the failover client to
                                     # degenerate into just a default client.
                                     # It makes sense to set this value to at
                                     # least the number of hosts that you
                                     # specified.
 
batch-size=100                     # Must be >=1 (default: 100)
 
connect-timeout=20000              # Must be >=1000 (default: 20000)
 
request-timeout=20000              # Must be >=1000 (default: 20000)

5. loadBalanceRPC client 负载均衡rpc client

flume client SDK还提供了一个在多个日志接收主机之间进行负载均衡的 RPCclient,通过使用间隔的<host>:<port>列表来提供多个flume agents,组成一个负载均衡组(即用一组agent来提供分担压力的agent,如:<host1>:<port1> <host2>:<port2> <host3>:<port3>...)。这个client可通过配置,来指定负载均衡策略,比如是随机选择一个日志服务器,还是以循环的模式选择一个日志服务器。用户也可以自定义一个class类,通过实现LoadBalancingRpcClient和HostSelector接口,来指定自定义的服务器选择策略,这种情况下这个自定义class的全类名需要指定为host-selector属性的一个值。当前LoadBalancingRPC Client还不支持thrift。

其中的backoff选项如果为true时,会将接收失败的服务器列入临时黑名单,以防止该出错服务器被选为容错主机,在经过了指定时间后才会解除黑名单。当经过了指定时间后,如果这个服务器还是没反应,那么认定它是一个持续性错误,指定的时间段将会增长,避免进入长时间的等待。

The maximum backofftime can be configured by setting maxBackoff (in milliseconds). The maxBackoffdefault is 30 seconds (specified in the OrderSelector class that’s thesuperclass of both load balancing strategies). The backoff timeout willincrease exponentially with each sequential failure up to the maximum possible backofftimeout. The maximum possible backoff is limited to 65536 seconds (about 18.2hours). For example:

最大的backoff时间可通过maxBackoff配置项指定(单位是毫秒,默认是30S,在load balance 策略的父类OrderSelector中指定了),最大的backoff时间是65536S,大概18.2个小时。可参考的client类中可指定如下:

// Setup properties for the load balancing
Propertiesprops=newProperties();
props.put("client.type","default_loadbalance");
 
// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts","h1 h2 h3");
 
// host/port pair for each host alias
Stringhost1="host1.example.org:41414";
Stringhost2="host2.example.org:41414";
Stringhost3="host3.example.org:41414";
props.put("hosts.h1",host1);
props.put("hosts.h2",host2);
props.put("hosts.h3",host3);
 
props.put("host-selector","random");// For random host selection
// props.put("host-selector", "round_robin"); // For round-robin host
//                                            // selection
props.put("backoff","true");// Disabled by default.
 
props.put("maxBackoff","10000");// Defaults 0, which effectively
                                  // becomes 30000 ms
 
// Create the client with load balancing properties
RpcClientclient=RpcClientFactory.getInstance(props);

为了提高扩展性,load balanceclient的实现(LoadBalancingRpcClient)中可以进行如下的配置:

client.type=default_loadbalance
 
hosts=h1 h2 h3                     # At least 2 hosts are required
 
hosts.h1=host1.example.org:41414
 
hosts.h2=host2.example.org:41414
 
hosts.h3=host3.example.org:41414
 
backoff=false                      # Specifies whether the client should
                                     # back-off from (i.e. temporarily
                                     # blacklist) a failed host
                                     # (default: false).
 
maxBackoff=0                       # Max timeout in millis that a will
                                     # remain inactive due to a previous
                                     # failure with that host (default: 0,
                                     # which effectively becomes 30000)
 
host-selector=round_robin          # The host selection strategy used
                                     # when load-balancing among hosts
                                     # (default: round_robin).
                                     # Other values are include "random"
                                     # or the FQCN of a custom class
                                     # that implements
                                     # LoadBalancingRpcClient$HostSelector
 
batch-size=100                     # Must be >=1 (default: 100)
 
connect-timeout=20000              # Must be >=1000 (default: 20000)
 
request-timeout=20000              # Must be >=1000 (default: 20000)

四、高可靠性

从单agent来看,Flume使用基于事务的数据传递方式来保证事件传递的可靠性。Source和Sink被封装进一个事务。事件被存放在Channel中直到该事件被处理,Channel中的事件才会被移除。这是Flume提供的点到点的可靠机制。

从多级流来看,前一个agent的sink和后一个agent的source同样由它们的事务来保障数据的可靠性。

五、可恢复性

推荐使用FileChannel,事件持久化在本地文件系统里(性能较差)。

六、在现有项目中使用Flume-NG

对现有程序改动最小的使用方式是使用直接读取程序原来记录的日志文件,基本可以实现无缝接入,不需要对现有程序进行任何改动。 
对于直接读取文件Source, 主要有两种方式:

(1)Exec source方式,最常用的就是tail -F [file] 方式组织数据,每次由source获取文件的增量数据,发送到channel。问题是,如果flume的source挂了,那么等flume的source再次开启的这段时间内,增加的日志内容,就没办法被source读取到了。解决方案:为source添加一个状态监控,如果source挂了,监控会把增量数据保存到其他临时文件中,下次读取。

(2) spooling directory source 方式,将原始日志文件按时间进行分割,放到指定目录下,flume的source读取新增文件。但要注意,这个目录不允许做其他操作,里面的文件也不允许编辑。

七、单节点Flumen NG配置

apache-flume-1.5.2-bin.tar下载地址:http://flume.apache.org/download.html

需事先安装:jdk

配置flume环境变量:/conf/flume-env.sh

JAVA_HOME=/usr/jdk7/jdk1.7.0_67

FLUME_CLASSPATH=/usr/apache-flume-1.5.2-bin

agent配置:/conf/flume-conf.properties

启动agent:

$ bin/flume-ngagent -n agent_name -c conf -f conf/flume-conf.properties

问题:如果提示log4j ERROR,并提示log文件不存在,可能是目录权限不够,不允许生成log日志文件。可以通过修改apache-flume-1.5.2-bin目录的权限:sudo chmod 777 –R  来解决。

  • -n指定agent名称
  • -c指定配置文件目录
  • -f指定配置文件
  • -Dflume.root.logger=DEBUG,console设置日志等级和输出地方(可选)

八、 web server服务器中配置log4j写日志到flume

由远程服务器接收日志,并写入服务器目录路径

我的实验环境是windows 7,在windows7中安装了ubuntu 虚拟机。日志由windows中的web项目生成,通过配置的log4j,发送到ubuntu虚拟机中的flume  agent。

1 数据传输结构图:

2 web Service端的配置:

         a)  log4j.properties文件的配置:

# ---------Debugging log settings-------------------

log4j.rootLogger=info, stdout,logfile,dailylogfile

#----------------flume ng  ,为某个类设置日志发送--------

log4j.logger.com.trace.web.action.user.login = info,flume

#-----或者为 log4j.logger.com.trace. = info,flume ,范围扩大到包

log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender

log4j.appender.flume.layout = org.apache.log4j.PatternLayout

log4j.appender.flume.layout.ConversionPattern=%d{yyyy-MM-ddHH:mm:ss}%p[%c{2}]-%m%n

#--接收数据的远程主机IP,此处为虚拟机的ip地址

log4j.appender.flume.Hostname=192.168.17.128

#--监听数据发送的远程主机端口号,需开通(web服务器端和数据接收服务器端要开通)

log4j.appender.flume.Port=41414

         发送日志的.java类文件中的代码:

private static Logger flumeLog =Logger.getLogger("flume");

#-----或者privatestatic Logger flumeLog = Logger.getLogger(XXX.class);

flumeLog.info("432343234");

         b)  log4j.xml文件的配置:

<appender name="flume"   class="org.apache.flume.clients.log4jappender.Log4jAppender">

<paramname="Port" value="41414" />

<paramname="Hostname" value="21.20.8.6" />

<layoutclass="org.apache.log4j.PatternLayout">

<paramname="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss}   %m%n" />

</layout>

</appender>

<logger name="com.test">

<paramname="level" value="info" />

<appender-refref="flume" />

</logger>

         发送日志的类文件中的代码:

#----这种方法会失效,无法发送日志

#----private static Logger flumeLog =Logger.getLogger("flume");

private static Logger flumeLog = Logger.getLogger(XXX.class);

flumeLog.info("432343234");

支持log4j------>flume数据传输的jar包:

3 远程服务器端的配置:注意远程日志接收服务器和web server都需要开通日志数据端口

1安装flume ng:

第6章节说明了flume ng安装步骤。

2用于“接收web端数据”的flume.conf文件的配置:

# The configuration file needs to define the sources,

# the channels and the sinks.

# Sources, channels and sinks are defined per agent,

# in this case called ‘agent‘

#--配置agent

agent1.sources = avro-source1

agent1.channels = ch1

agent1.sinks = sink1

agent1.sources.avro-source1.channels= ch1

#--source的类型avro表示以avro协议接收数据,且数据由avro客户端事件驱动

agent1.sources.avro-source1.type =avro

#--source绑定的数据源主机地址。web端的日志已由log4j以RPC协议发送到数据接收服务器的41414端口,那么数据接收服务器中flume agent的source只需绑定本机的ip,并接收41414端口过来的数据(这里很重要,否则会接收不到数据,web端也可能无法发送)

agent1.sources.avro-source1.bind =192.168.17.128

agent1.sources.avro-source1.port =41414

#agent1.sources.avro-source1.interceptors= i1

#agent1.sources.avro-source1.interceptors.i1.type= timestamp

agent1.channels.ch1.type = memory

#--配置sink,将接收到的数据保存到指定地点。

agent1.sinks.sink1.channel = ch1

#agent1.sinks.sink1.type = logger

#--sink的type=file_roll表示以滚动文件的形式将数据保存到文件中。这里有个问题,如果不做其他设置,flume默认30秒生成一个新文件(即使没有数据)

agent1.sinks.sink1.type = file_roll

#--设置shik组件将文件sink的路径。

agent1.sinks.sink1.sink.directory =/home/hadoop1/apache-flume-1.5.2-bin/logs

#此处配置的,应该是每隔24秒,生成一个flume日志数据文件。此处如果不配置,默认是30秒(数字的单位是秒)

agent1.sinks.sink1.sink.rollInterval=300

9、重大注意事项:

1) 可能出现的问题:

刚配置时,在上述的配置环境下,有一次数据收集端的agent意外挂掉了(服务器宕机),导致web server端的log4j无法创建Appender,从而影响需要输出日志的对象功能,使web无法正常运行。

这个问题是由flume的设计思路导致的。如果agent挂掉了,flume为了节约系统和网络资源,会在请求若干次后仍无无响应的情况下,主动放弃请求,并置发送端RPCclient为null。所以在agent挂掉的时候,只要及时重启agent,就可以了;但如果超过了flume设定的阈值,那么即使重启agent,还是不能再发送数据了(RPCclient已经为null了),此时会报异常:RPCclient 为null。

建议的解决方案:可以设置一个针对flume网络连接的心跳线程,每隔一段时间检测一下网络连接,如果连接中断,那么重新初始化RPCclient。

时间: 2024-08-11 07:59:00

Flume-ng安装与使用的相关文章

Flume NG安装部署及数据采集测试

转载请注明出处:http://www.cnblogs.com/xiaodf/ Flume作为日志收集工具,监控一个文件目录或者一个文件,当有新数据加入时,采集新数据发送给消息队列等. 1 安装部署Flume 若要采集数据节点的本地数据,每个节点都需要安装一个Flume工具,用来做数据采集. 1.1 下载并安装 到官网去下载最新版本的Flume 下载地址为:http://flume.apache.org/,目前最新版本为1.6.0,需要1.7及以上版本的JDK. 1.解压 tar -xzvf ap

Flume 学习笔记之 Flume NG概述及单节点安装

Flume NG概述: Flume NG是一个分布式,高可用,可靠的系统,它能将不同的海量数据收集,移动并存储到一个数据存储系统中.轻量,配置简单,适用于各种日志收集,并支持 Failover和负载均衡.其中Agent包含Source,Channel和 Sink,三者组建了一个Agent.三者的职责如下所示: Source:用来消费(收集)数据源到Channel组件中 Channel:中转临时存储,保存所有Source组件信息 Sink:从Channel中读取,读取成功后会删除Channel中的

分布式实时日志系统(二) 环境搭建之 flume 集群搭建/flume ng资料

最近公司业务数据量越来越大,以前的基于消息队列的日志系统越来越难以满足目前的业务量,表现为消息积压,日志延迟,日志存储日期过短,所以,我们开始着手要重新设计这块,业界已经有了比较成熟的流程,即基于流式处理,采用 flume 收集日志,发送到 kafka 队列做缓冲,storm 分布式实时框架进行消费处理,短期数据落地到 hbase.mongo中,长期数据进入 hadoop 中存储. 接下来打算将这其间所遇到的问题.学习到的知识记录整理下,作为备忘,作为分享,带给需要的人. 学习flume ng的

【转】Flume(NG)架构设计要点及配置实践

Flume(NG)架构设计要点及配置实践 Flume NG是一个分布式.可靠.可用的系统,它能够将不同数据源的海量日志数据进行高效收集.聚合.移动,最后存储到一个中心化数据存储系统中.由原来的Flume OG到现在的Flume NG,进行了架构重构,并且现在NG版本完全不兼容原来的OG版本.经过架构重构后,Flume NG更像是一个轻量的小工具,非常简单,容易适应各种方式日志收集,并支持failover和负载均衡. 架构设计要点 Flume的架构主要有一下几个核心概念: Event:一个数据单元

Flume的安装配置

flume是一个分布式.可靠.和高可用的海量日志采集.聚合和传输的系统.支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本.HDFS.Hbase等)的能力 . 一.什么是Flume? flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用.Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera.但随着 FLume 功能的扩展,F

Flume NG 简介及配置实战

Flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用.Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera.但随着 FLume 功能的扩展,Flume OG 代码工程臃肿.核心组件设计不合理.核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,为了解决这些问题,2011 年 10 月 22 号,cloudera 完成了

高可用Hadoop平台-Flume NG实战图解篇

1.概述 今天补充一篇关于Flume的博客,前面在讲解高可用的Hadoop平台的时候遗漏了这篇,本篇博客为大家讲述以下内容: Flume NG简述 单点Flume NG搭建.运行 高可用Flume NG搭建 Failover测试 截图预览 下面开始今天的博客介绍. 2.Flume NG简述 Flume NG是一个分布式,高可用,可靠的系统,它能将不同的海量数据收集,移动并存储到一个数据存储系统中.轻量,配置简单,适用于各种日志收集,并支持Failover和负载均衡.并且它拥有非常丰富的组件.Fl

Flume NG源码分析(五)使用ThriftSource通过RPC方式收集日志

上一篇说了利用ExecSource从本地日志文件异步的收集日志,这篇说说采用RPC方式同步收集日志的方式.笔者对Thrift比较熟悉,所以用ThriftSource来介绍RPC的日志收集方式. 整体的结构图如下: 1. ThriftSource包含了一个Thrift Server,以及一个Thrift Service服务的实现.这里的Thrift Service是由ThriftSourceProtocol定义 2. 应用程序调用Thrift Service的客户端,以RPC的方式将日志发送到Th

让你系统认识flume及安装和使用flume1.5传输数据到hadoop2.2

问题导读:1.什么是flume?2.如何安装flume?3.flume的配置文件与其它软件有什么不同?一.认识flume1.flume是什么?这里简单介绍一下,它是Cloudera的一个产品2.flume是干什么的?收集日志的3.flume如何搜集日志?我们把flume比作情报人员(1)搜集信息(2)获取记忆信息(3)传递报告间谍信息flume是怎么完成上面三件事情的,三个组件:source: 搜集信息channel:传递信息sink:存储信息上面有点简练,详细可以参考Flume内置channe

Flume NG 学习笔记(一)简介

一.简介 Flume是一个分布式.可靠.高可用的海量日志聚合系统,支持在系统中定制各类数据发送方,用于收集数据:同时,Flume提供对数据的简单处理,并写到各种数据接收方的能力. Flume在0.9.x and 1.x之间有较大的架构调整,1.x版本之后的改称Flume NG(next generation),0.9.x的称为Flume OG(originalgeneration). 对于OG版本, Flume NG (1.x.x)的主要变化如下: 1.sources和sinks 使用chann