Flume Channel Selector

Flume 基于Channel Selector可以实现扇入、扇出。

同一个数据源分发到不同的目的,如下图。

在source上可以定义channel selector:


1

2

3

4

5

6

7

8

9

a1.sources=r1

...

a1.channels=c1 c2

...

a1.sources.r1.selector.type=multiplexing

a1.sources.r1.selector.header=type

a1.sources.r1.selector.mapping.type1=c1

a1.sources.r1.selector.mapping.type2=c2

...

但是这个type变量从哪里来呢?

解决方法:

1、修改用到的那个source的源码,应用到client端,不同的数据类型添加不同的type

2、在source端配置interceptor,通过interceptor在header上设置变量type

比如:

使用regex_extractor,对传过来的数据进行处理,提取出type值(如果可以的话,可以在client端的数据格式添加type值,方便使用regex_extractor提取出来)。

3、在source端自定义interceptor,在interceptor里对处理变量type

来自为知笔记(Wiz)

时间: 2024-11-04 00:06:29

Flume Channel Selector的相关文章

flume channel monitor实现

对于flume的监控,只需要监控channel的性能数据即可,source和sink的性能一部分可以从channel中表现出来.以MemoryChannel为例,在MemoryTransaction的构造函数中会实例化一个org.apache.flume.instrumentation.ChannelCounter对象     public MemoryTransaction( int transCapacity, ChannelCounter counter) {       putList 

[flume] channel 和 sink

上周把安卓日志手机的客户端sdk完成跑通,这周开始调试日志服务器端. 使用flume进行日志收集,然后转kafka.在测试的时候总是发现漏掉一些event,后来才知道对 channel 和 sink 的使用有误.当多个sink使用同一个channel时,event是会分流共同消耗的,而不是每个sink都复制.最后,改成多个channel,每个channel对应一个sink的模式. 有篇不错的博客,http://shiyanjun.cn/archives/915.html

Flume NG 学习笔记(五)Sinks和Channel配置

一.HDFS Sink Flume Sink是将事件写入到Hadoop分布式文件系统(HDFS)中.主要是Flume在Hadoop环境中的应用,即Flume采集数据输出到HDFS,适用大数据日志场景. 目前,它支持HDFS的文本和序列文件格式,以及支持两个文件类型的压缩.支持将所用的时间.数据大小.事件的数量为操作参数,对HDFS文件进行关闭(关闭当前文件,并创建一个新的).它还可以对事源的机器名(hostname)及时间属性分离数据,即通过时间戳将数据分布到对应的文件路径. HDFS目录路径可

flume单channel多sink的测试

说明: 该结果是亲自测试,只提供简单的数据分析,很简陋,结果可能不准确. 先说一下结果,多sink可以直接按常规配置,这样的话每个sink会启动一个sinkrunner,相当于每个线程一个sink,互不干扰,负载均衡是通过channel实现的,效率会提高为n倍,如果在此基础上加入 sinkgroup,则sinkgroup会启动一个sinkrunner,就是单线程,sinkgroup从channel中读取数据,然后分发到下面挂载的sink中,效率和单sink一样,没有提高,但是可以实现两个sink

flume组件汇总 source、sink、channel

Flume Source Source类型 说明 Avro Source 支持Avro协议(实际上是Avro RPC),内置支持 Thrift Source 支持Thrift协议,内置支持 Exec Source 基于Unix的command在标准输出上生产数据 JMS Source 从JMS系统(消息.主题)中读取数据,ActiveMQ已经测试过 Spooling Directory Source 监控指定目录内数据变更 Twitter 1% firehose Source 通过API持续下载

Flume NG 学习笔记(十) Transaction、Sink、Source和Channel开发

版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] 一.Transaction interface Transaction接口是基于flume的稳定性考虑的.所有主要的组件(sources.sinks.channels)都必须使用Flume Transaction.我们也可以理解Transaction接口就是flume的事务,sources和sinks的发送数据与接受数据都是在一个Transaction里完成的. 从上图中可以看出,一个Transaction在Channel实

flume file channel 异常解决

1. 错误提示 2016-04-21 05:40:51,393 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows. java.lang.IllegalStateException: Channel closed [

flume的source, channel, sink 列表

Flume Source Source类型 说明 Avro Source 支持Avro协议(实际上是Avro RPC),内置支持 Thrift Source 支持Thrift协议,内置支持 Exec Source 基于Unix的command在标准输出上生产数据 JMS Source 从JMS系统(消息.主题)中读取数据,ActiveMQ已经测试过 Spooling Directory Source 监控指定目录内数据变更 Twitter 1% firehose Source 通过API持续下载

Flume-ng源码解析之Channel组件

如果还没看过Flume-ng源码解析之启动流程,可以点击Flume-ng源码解析之启动流程 查看 1 接口介绍 组件的分析顺序是按照上一篇中启动顺序来分析的,首先是Channel,然后是Sink,最后是Source,在开始看组件源码之前我们先来看一下两个重要的接口,一个是LifecycleAware ,另一个是NamedComponent 1.1 LifecycleAware @[email protected] interface LifecycleAware {  public void s