介绍
概述
Apache Flume是一个用来从很多不同的源有效地收集,聚集和移动大量的日志数据到一个中心数据仓库的分布式的,可靠的和可用的系统。
Apache Flume是Apache软件基金会的顶级项目。目前有两个可获得的发布代码路线,0.9.x版本和1.x版本。本文档适用于1.x代码线。对于0.9.x代码线,请看Flume 0.9.x开发指南。
结构
数据流模型
一个Event是在Flume代理之间流动的数据单元。Event从Source流动到Channel再到Sink,并由一个Event接口的实现表示。一个Event携带着一个有效负载(字节数组)和一个可选的头部(字符串属性)集合。一个Flume代理是一个进程(JVM),它能控制组件允许Events从一个外部源流向一个外部目的地。
一个Source消耗有特殊格式的Events,并且那些Events通过像Web服务器之类的外部源被传送到Source。例如,一个AvroSource可以用来从客户端或从流中的其他的Flume代理接收Avro Events。当一个源收到了一个Event,它将它存入到一个或多个Channel中。Channel采用被动存储的形式,Channel会缓存该Event直到它被一个Sink处理。在Flume中,有一种Channel类型是FileChannel,它使用本地文件系统作为它的备份仓库。一个Sink负责将Event从Channel中移除,并将它放到外部仓库中,例如HDFS(这种情况下使用的是HDFSEventSink),或者将它放置到流中下一跳的Source中。在给定的代理中,Source和Sink是异步运行的,因为Events会缓存在Channel中。
可靠性
一个Event被缓存在Flume代理的Channel中。然后就是Sink的任务来将Event传送到流中的下一个代理或者目标仓库(例如HDFS)。Sink只有在Event存储到下一个代理的Channel或者目标仓库中,才会将Event从Channel中移除。这就是单跳消息传递语义如何在Flume中提供端到端的流的可靠性。Flume使用一个事务处理方法保证Events传输的可靠性。Sources和Sinks在由Channel提供的事务中封装了Events的存储和检索。这保证了Events集合可靠地在流中点到点传输。在多跳流的例子中,前一跳的Sink和后一跳的Source都有各自的事务运行来保证数据被安全地存储在下一跳的Channel中。
构建Flume
获取源代码
使用Git检出代码。获取git仓库根目录点击此处https://git-wip-us.apache.org/repos/asf/flume.git。
Flume 1.x的开发在“trunk”分支之下进行,所以可以使用下面的命令行:
git clone https://git-wip-us.apache.org/repos/asf/flume.git
编译/测试Flume
Flume是以Maven方式构建的。你可以使用标准的Maven命令编译Flume:
1. 只编译:mvn clean compile
2. 编译并运行单元测试:mvn clean test
3. 运行独立测试:mvn clean test –Dtest=<Test1><Test2>,…-DfailIfNoTests=false
4. 创建tarball包:mvn clean install
5. 创建tarball包(跳过单元测试):mvn clean install –DskipTests
请注意,Flume的构建需要GoogleProtocol Buffers编译器在路径中。你可以通过下面的介绍下载并安装它https://developers.google.com/protocol-buffers/,。
开发自定义组件
客户端
Client在Event的起始点进行操作,并将他们传送到一个Flume代理上。Client通常在它们处理的数据来自于的程序的进程空间内操作。Flume目前支持Avro,log4j,syslog和Http POST(使用一个JSON)等方式从一个外部源传输数据。除此之外,有一个ExecSource可以处理本地进程的输出作为给Flume的输入。
很有可能有一个用例使得当前存在的选项都没有效。在这种情况下,你可以建立一个自定义的机制发送数据给Flume。要实现这个有两个方法。第一个方法是创建一个自定义的Client来跟Flume已经存在的Source像AvroSource或者SyslogTcpSource通信。这里Client应该把它的数据转换成这些Flume Source可以理解的数据。另一个选择是写一个自定义的Flume Source,使用IPC或者RPC协议,直接和你已有的客户端程序通信,并把客户端的数据转换成Flume Events进行发送。注意所有存储在一个Flume代理的Channel中events必须以Flume
Events的形式存在。
客户端SDK
尽管Flume包含了多个内建的机制(例如Sources)来接收数据,但是人们经常想要能够从一个自定义的程序直接与Flume交互。Flume Client SDK就是一个可以让应用程序使用RPC协议连接到Flume并给Flume的数据流发送数据的库。
RPC客户端接口
Flume RpcClient接口的实现封装着Flume支持的RPC机制。用户的程序可以简单地调用Flume Client SDK中的append(Event)或者appendBatch(List<Event>)来发送数据而不必担心底层信息交换的细节。直接实现Event接口,同一个方便的实现SimpleEvent类,或者通过使用EventBuilder的重载的静态辅助方法wintBody(),用户可以提供需要的Event参数。
RPC客户端——Avro和Thrift
在Flume1.4.0中,Avro是默认的RPC协议。NettyAvroRpcClient和ThriftRpcClient实现了RpcClient接口。客户端需要目标Flume代理的主机地址和端口号来创建这个对象,然后就可以使用RpcClient将数据发送给代理。下面的例子展示了在一个用户的数据生成程序中如何使用Flume Client SDK API:
import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.api.RpcClient; import org.apache.flume.api.RpcClientFactory; The remote Flume agent needs to have an AvroSource(or aThriftSourceif you are using a Thrift client) listening on some port. Below is an example Flumeagent configuration that’s import org.apache.flume.event.EventBuilder; import java.nio.charset.Charset; public class MyApp { public static void main(String[] args) { MyRpcClientFacade client = new MyRpcClientFacade(); // Initialize client with the remote Flume agent's host and port client.init("host.example.org", 41414); // Send 10 events to the remote Flume agent. That agent should be // configured to listen with an AvroSource. String sampleData = "Hello Flume!"; for (int i = 0; i < 10; i++) { client.sendDataToFlume(sampleData); } client.cleanUp(); } } class MyRpcClientFacade { private RpcClient client; private String hostname; private int port; public void init(String hostname, int port) { // Setup the RPC connection this.hostname = hostname; this.port = port; this.client = RpcClientFactory.getDefaultInstance(hostname, port); // Use the following method to create a thrift client (instead ofthe above line): // this.client = RpcClientFactory.getThriftInstance(hostname, port); } public void sendDataToFlume(String data) { // Create a Flume Event object that encapsulates the sample data Event event = EventBuilder.withBody(data,Charset.forName("UTF-8")); // Send the event try { client.append(event); } catch (EventDeliveryException e) { // clean up and recreate the client client.close(); client = null; client = RpcClientFactory.getDefaultInstance(hostname, port); // Use the following method to create a thrift client (instead ofthe above line): // this.client = RpcClientFactory.getThriftInstance(hostname, port); } } public void cleanUp() { // Close the RPC connection client.close(); } }
远程Flume代理需要有一个AvroSource(或者如果你用的是Thrift客户端的话那就是ThriftSource)监听某个端口。下面是一个Flume代理的配置在等带来自与MyApp的连接:
a1.channels = c1 a1.sources = r1 a1.sinks = k1 a1.channels.c1.type = memory a1.sources.r1.channels = c1 a1.sources.r1.type = avro # For using a thrift source set the following instead of the aboveline. # a1.source.r1.type = thrift a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 41414 a1.sinks.k1.channel = c1 a1.sinks.k1.type = logger
为了更具灵活性,默认的Flume客户端实现(NettyAvroRpcClient和ThriftRpcClient)可以用下面的属性进行配置:
client.type = default (for avro) or thrift(for thrift) hosts = h1 # default clientaccepts only 1 host # (additional hosts will be ignored) hosts.h1 = host1.example.org:41414 # host and port must both be specified # (neither has a default) batch-size = 100 # Must be >=1 (default:100) connect-timeout = 20000 # Must be >=1000 (default:20000) request-timeout = 20000 # Must be >=1000 (default:20000)
故障转移客户端
这个类封装了默认的RPC客户端来给客户端提供故障转移能力。这个需要由空格分隔的<主机>:<端口>列表表示的Flume代理组成一个故障转移组。故障转移RPC客户端目前不支持Thrift。如果和目前选择的主机(例如代理)代理通信出现错误,那么故障转移客户端就会自动故障转移到列表中的下一个主机中。例如:
// Setup properties for the failover Properties props = new Properties(); props.put("client.type", "default_failover"); // List of hosts (space-separated list of user-chosen host aliases) props.put("hosts", "h1 h2 h3"); // host/port pair for each host alias String host1 = "host1.example.org:41414"; String host2 = "host2.example.org:41414"; String host3 = "host3.example.org:41414"; props.put("hosts.h1", host1); props.put("hosts.h2", host2); props.put("hosts.h3", host3); // create the client with failover properties RpcClient client = RpcClientFactory.getInstance(props);
为了能更灵活,故障转移Flume客户端实现(FailoverRpcClient)可以用下面的属性来配置:
client.type = default_failover hosts = h1 h2 h3 # at least one isrequired, but 2 or # more makes better sense hosts.h1 = host1.example.org:41414 hosts.h2 = host2.example.org:41414 hosts.h3 = host3.example.org:41414 max-attempts = 3 # Must be >=0(default: number of hosts # specified, 3 in this case). A '0' # value doesn't make much sense because # it will just cause an append call to # immmediately fail. A '1' value means # that the failover client will try only # once to send the Event, and if it # fails then there will be no failover # to a second client, so this value # causes the failover client to # degenerate into just a default client. # It makes sense to set this value to at # least the number of hosts that you # specified. batch-size = 100 # Must be >=1(default: 100) connect-timeout = 20000 # Must be >=1000 (default:20000) request-timeout = 20000 # Must be >=1000 (default:20000)
负载均衡RPC客户端
Flume客户端SDK也支持一个RpcClient在多个主机之间负载均衡。这种类型的客户端需要空格分隔的表示Flume代理的<host>:<port>列表,组成一个负载平衡组。这个客户端可以被配置一个负载平衡的策略,可能是随机选择一个配置的主机,或者以循环的方式选择一个主机。你也可以自定义你自己的类来实现LoadBalancingRpcClient$HostSelector接口,来使用一个自定义的选择顺序。在那种情况下,这个自定义类的全类名需要在host-selector中的属性中指定。负载均衡RPC客户端目前不支持Thrift。
如果启用了backoff属性,客户端会将失败的主机临时存放起来,这会导致这些主机在给定的超时时间内被排除在可选的主机列表中。当超过超时时间,如果某个主机仍然无响应,该主机将会被认为是一个连续的失效,从而导致超时时间的设置会成倍增长,以避免由于这些未响应的主机而陷入长时间的等待。
Backoff时间的最大值可以通过maxBackoff(单位毫秒)来设置。maxBackoff的默认值为30秒(在OrderSelector类中指定,它是所有负载均衡策略的超类)。Backoff超时时间会随着每个连续失败增长直到达到最大超时时间。超时时间最大的可能值是65535秒(约18.2个小时)。例如:
// Setup properties for the load balancing Properties props = new Properties(); props.put("client.type", "default_loadbalance"); // List of hosts (space-separated list of user-chosen host aliases) props.put("hosts", "h1 h2 h3"); // host/port pair for each host alias String host1 = "host1.example.org:41414"; String host2 = "host2.example.org:41414"; String host3 = "host3.example.org:41414"; props.put("hosts.h1", host1); props.put("hosts.h2", host2); props.put("hosts.h3",host3); props.put("host-selector", "random"); // Forrandom host selection // props.put("host-selector", "round_robin"); //For round-robin host // // selection props.put("backoff","true"); // Disabled by default. props.put("maxBackoff", "10000"); // Defaults 0,which effectively // becomes 30000 ms // Create the client with load balancing properties RpcClient client = RpcClientFactory.getInstance(props);
为了更具灵活性,负载均衡Flume客户端实现(LoadBalancingRpcClient)可以用如下的属性进行配置:
client.type = default_loadbalance hosts = h1 h2 h3 # At least 2hosts are required hosts.h1 = host1.example.org:41414 hosts.h2 = host2.example.org:41414 hosts.h3 = host3.example.org:41414 backoff = false # Specifieswhether the client should # back-off from (i.e. temporarily # blacklist) a failed host # (default: false). maxBackoff = 0 # Max timeout inmillis that a will # remain inactive due to a previous # failure with that host (default: 0, # which effectively becomes 30000) host-selector = round_robin # The host selectionstrategy used # when load-balancing among hosts # (default: round_robin). # Other values are include "random" # or the FQCN of a custom class # that implements # LoadBalancingRpcClient$HostSelector batch-size = 100 # Must be >=1 (default: 100) connect-timeout = 20000 # Must be >=1000 (default: 20000) request-timeout = 20000 # Must be >=1000 (default: 20000)
嵌入式代理
Flume有一套嵌入式代理API,它允许用户将一个代理嵌入到他们的应用程序中。这个代理是轻量级的,并不支持所有的Sources,Sinks和Channels。Source使用的是一种特殊的Source,Events需要通过EmbeddedAgent对象的put,putAll方法发送到Source。只有文件Channel和内存Channel是支持的Channel,Avro Sink是唯一支持的Sink。
注意:嵌入式代理需要依赖hadoop-core.jar包
嵌入式代理的配置和完全代理的配置是很相似的。下面是一个详细的配置选项的列表:
必要的属性用黑体表示。
Property Name |
Default |
Description |
source.type |
embedded |
唯一可用的Source就是嵌入式Source |
channel.type |
- |
Memory或file分别对应Memory Channel和FileChannel |
channel.* |
- |
对Channel类型的配置选项,查看MemoryChannel或者FileChannel用户指南查找更详尽的列表 |
sinks |
- |
Sink名称的列表 |
sink.type |
- |
属性名称必须和Sinks列表中的一个名称匹配。值必须是avro |
sink.* |
- |
Sink的配置选项。查看AvroSink用户指南获得更详尽的列表,然而要注意AvroSink至少需要主机名和端口号 |
processor.type |
- |
Failover或者load_balance,分别和FailoverSinksProcessor和LoadBalancingSinkProcessor一致 |
processor.* |
- |
对选择的Sink处理器的配置项。查看FailoverSinksProcessor和LoadBalancingSinkProcessor用户指南查看更详尽的列表 |
下面是一个例子展示怎样使用代理:
Map<String, String> properties = newHashMap<String, String>(); properties.put("channel.type","memory"); properties.put("channel.capacity","200"); properties.put("sinks","sink1 sink2"); properties.put("sink1.type","avro"); properties.put("sink2.type","avro"); properties.put("sink1.hostname","collector1.apache.org"); properties.put("sink1.port","5564"); properties.put("sink2.hostname", "collector2.apache.org"); properties.put("sink2.port","5565"); properties.put("processor.type","load_balance"); EmbeddedAgent agent = newEmbeddedAgent("myagent"); agent.configure(properties); agent.start(); List<Event> events =Lists.newArrayList(); events.add(event); events.add(event); events.add(event); events.add(event); agent.putAll(events); ... agent.stop();
Transaction接口
Transaction接口是Flume可靠性的基础。所有主要的组件(例如Sources,Sinks和Channels)必须使用一个Flume Transaction。
一个Transaction是在一个Channel实现中实现的。每个与Channel连接的Source和Sink必须获得一个Transaction对象。Sources实际上使用一个ChannelSelector接口来封装Transaction。存储(把它放到Channel中)和提取(把它从Channel中拿出来)一个Event的操作都是在一个活动的Transaction中完成的。例如:
Channel ch = new MemoryChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { // This try clause includes whatever Channel operations you want todo Event eventToStage = EventBuilder.withBody("Hello Flume!", Charset.forName("UTF-8")); ch.put(eventToStage); // Event takenEvent = ch.take(); // ... txn.commit(); } catch (Throwable t) { txn.rollback(); // Log exception, handle individual exceptions as needed // re-throw all Errors if (t instanceof Error) { throw (Error)t; } } finally { txn.close(); }
这里我们从一个Channel中获取了一个Transaction。在begin()返回后,Transaction现在是活动/打开的,然后Event被放到Channel里。如果放置成功,Transaction就进行提交并关闭。
Sink
Sink的目的是将Events从Channel中取出并将他们发送到下一个流中的Flume代理或者将他们存储到一个外部仓库中。一个Sink只与一个Channel相关,如在Flume属性中配置的那样。有一个SinkRunner实例,它和每一个配置的Sink都有关系,当Flume框架调用SinkRunner.start(),一个新线程就会被创建来驱动Sink(使用SinkRunner.PollingRunner作为线程的Runnable)。这个线程管理Sink的生命周期。Sink需要实现start()和sttop()方法,这两个方法是LifecycleAware接口的一部分。Sink.start()方法应该初始化Sink并把它带入它可以将Events发送到下一个目的地的状态。Sink.process()方法需要做核心的处理来将Events从Channel中取出并转发它。Sink.stop()方法需要做必要的清理(例如释放资源)。Sink的实现也需要实现Configurable接口来处理它自己的配置设置。例如:
public class MySink extends AbstractSink implements Configurable { private String myProp; @Override public void configure(Context context) { String myProp = context.getString("myProp","defaultValue"); // Process the myProp value (e.g. validation) // Store myProp for later retrieval by process() method this.myProp = myProp; } @Override public void start() { // Initialize the connection to the external repository (e.g. HDFS)that // this Sink will forward Events to .. } @Override public void stop () { // Disconnect from the external respository and do any // additional cleanup (e.g. releasing resources or nulling-out // field values) .. } @Override public Status process() throws EventDeliveryException { Status status = null; // Start transaction Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { // This try clause includes whatever Channel operations you want todo Event event = ch.take(); // Send the Event to the external repository. // storeSomeData(e); txn.commit(); status = Status.READY; } catch (Throwable t) { txn.rollback(); // Log exception, handle individual exceptions as needed status = Status.BACKOFF; // re-throw all Errors if (t instanceof Error) { throw (Error)t; } } finally { txn.close(); } return status; } }
Source
Source的目的是从外部客户端获取数据并把它存储到Channel中。一个Source可以获取它自己的ChannelProcessor的一个实例来处理Event。ChannelProcessor也可以获得它自己的ChannelSelector的一个实例来获取和Source相关联的Channel,正如在Flume属性中配置的那样。然后一个Transaction从每一个相关的Channel中取出,这样Source就可以通过一个Transaction把Event可靠地放入Channel中。
类似于SinkRunner.RollingRunner中的 Runnable,当Flume框架调用PollableSourceRunner.start()的时候,在创建的线程上会有一个PollingRunner的Runnable运行。每个经过配置的PollableSource都和它自己的运行着PollingRunner的线程相关。这个线程管理PollableSource的生命周期,例如启动和停止。一个PollableSource实现必须实现在LifecycleAware接口中声明的start()方法和stop()方法。PollableSource的运行要调用Source的process()方法。Process()方法应该检查新数据并把它们以Flume
Events的方式存入到Channel中。
注意实际上有两种类型的Sources。PollableSource已经提过了。另外一个是EventDrivenSource。EventDrivenSource,不像PollableSource,必须有它自己的回调机制来捕获数据并它存入Channel。EventDrivenSource并不是像PollableSource那样每个都由它们自己的线程驱动。下面是一个自定义PollableSource的例子:
public class MySource extends AbstractSource implementsConfigurable, PollableSource private String myProp; Channel @Override public void configure(Context context) { String myProp = context.getString("myProp", "defaultValue"); // Process the myProp value (e.g. validation, convert to anothertype, ...) // Store myProp for later retrieval by process() method this.myProp = myProp; } @Override public void start() { // Initialize the connection to the external client } @Override public void stop () { // Disconnect from external client and do any additional cleanup // (e.g. releasing resources or nulling-out field values) .. } @Override public Status process() throws EventDeliveryException { Status status = null; // Start transaction Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { // This try clause includes whatever Channel operations you want todo // Receive new data Event e = getSomeData(); // Store the Event into this Source's associated Channel(s) getChannelProcessor().processEvent(e) txn.commit(); status = Status.READY; } catch (Throwable t) { txn.rollback(); // Log exception, handle individual exceptions as needed status = Status.BACKOFF; // re-throw all Errors if (t instanceof Error) { throw (Error)t; } } finally { txn.close(); } return status; } }
Channel
待讨论