flume-ng 中 selector multiplexing 的使用

flume-ng 中 selector的使用



在最近的项目中,需要用到flume。使用的是非常常见的结构:netcat source开启监听端口,接收发送来的报文消息,通过memory channel与sink(重写的roll file sink)写到本地磁盘。特别的是,这里需要根据报文的类型来发往不同的sink(暂且命名为sink1与sink2)。根据该需求,考虑有两种解决方案。

方案一

在一个flume的agent中,启用2个source,2个channel以及2个sink。组成两条独立的flow。一条flow接收一种报文类型,互不干扰。这种方案无需重写任何flume的组件,仅需修改flume的配置文件。发送方根据报文类型的不同(这里要求发送方自己必须了解报文类型)发往不同的flume监听端口(即不同flow的netcat source)。

方案二

采用selector multiplexing的方式进行选择。对收到的报文进行分类,发往不同的channel,最终送给相应的sink。

官网对于selector multiplexing的介绍大致是:selector会根据event中某个header对应的value来将event发往不同的channel(header与value就是KV结构)。刚看到这里的时候我就有个疑惑,这个header在哪里进行设置的呢?

后来查看源码后,我猜测是source在收到报文后,封装event时,打入的header。这也就意味着如果是这样的话,需要改写项目中的netcat source。netcat source需要能够区分报文的类型,或者能够得到报文发送方提供的报文类型信息,并将报文类型设置到event的header中。完成以上功能,将flume提供的NetcatSource中原来生成event的地方修改为:

bytes.get(body);
String line = new String(body);
String[] records = line.split("\t", 2);
String header = records[0];
String strBody = records[1];
Map<String, String> headers = new HashMap<String, String>();
headers.put("LOG_FILE", header);

这个headers就是一个KV结构的map。

改写好之后,只需修改配置文件即可实现

# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called ‘agent‘

agent1.sources = seqGenSrc
agent1.channels = memoryChannel1 memoryChannel2
agent1.sinks = msgRollingSink1 msgRollingSink2

# For each one of the sources, the type is defined
agent1.sources.seqGenSrc.type = com.flume.source.NetcatSource
agent1.sources.seqGenSrc.bind = 192.168.19.107
agent1.sources.seqGenSrc.port = 44444
agent1.sources.seqGenSrc.header = LOG_TYPE
agent1.sources.seqGenSrc.selector.type = multiplexing
agent1.sources.seqGenSrc.selector.header = LOG_TYPE
agent1.sources.seqGenSrc.selector.mapping.CREDIT = memoryChannel1
agent1.sources.seqGenSrc.selector.mapping.OTHER = memoryChannel2
agent1.sources.seqGenSrc.selector.default = memoryChannel2

# The channel can be defined as follows.
agent1.sources.seqGenSrc.channels = memoryChannel1 memoryChannel2

# Each sink‘s type must be defined
#agent1.sinks.msgRollingSink.type = logger
agent1.sinks.msgRollingSink1.type = com.flume.sink.RollingFileSink
agent1.sinks.msgRollingSink1.sink.directory = /home/disk1/somebody/multiplexing/credit_log
#agent1.sinks.msgRollingSink.sink.directory = /home/somebody/realtime-charge-stat/input_test
agent1.sinks.msgRollingSink1.sink.rollInterval = 60

#Specify the channel the sink should use
agent1.sinks.msgRollingSink1.channel = memoryChannel1

根据如上配置文件。客户端在发送报文到flume服务器的时候,仅需在报文正文前加上CREDITOTHER的报文头,与报文正文用"\t"分隔开来。这样改写的netcat
source即可将报文头打入event的header,而后selector再根据header发往不同的channel/sink。

flume-ng 中 selector multiplexing 的使用

时间: 2024-10-12 19:02:34

flume-ng 中 selector multiplexing 的使用的相关文章

【Flume】flume ng中HDFS sink设置按天滚动,0点滚动文件,修改源码实现

HDFS sink里有个属性hdfs.rollInterval=86400,这个属性你设置了24小时滚动一次,它的确就到了24小时才滚动,但是我们的需求往往是到了0点就滚动文件了,因为离线的job因为都会放在夜里执行. 如果flume是早上9点启动的,那么要到明天早上9点,hdfs的文件才会关闭,难道job要等到9点后才执行,这显然不合适,所以通过修改源码使其能够在0点滚动文件. 首先添加一个属性,可配置为day,hour,min private String timeRollerFlag; t

Flume NG源码分析(五)使用ThriftSource通过RPC方式收集日志

上一篇说了利用ExecSource从本地日志文件异步的收集日志,这篇说说采用RPC方式同步收集日志的方式.笔者对Thrift比较熟悉,所以用ThriftSource来介绍RPC的日志收集方式. 整体的结构图如下: 1. ThriftSource包含了一个Thrift Server,以及一个Thrift Service服务的实现.这里的Thrift Service是由ThriftSourceProtocol定义 2. 应用程序调用Thrift Service的客户端,以RPC的方式将日志发送到Th

【Flume NG用户指南】(2)构造

作者:周邦涛(Timen) Email:[email protected] 转载请注明出处:  http://blog.csdn.net/zhoubangtao/article/details/28277575 上一篇请參考[Flume NG用户指南](1)设置 3. 配置 前边的文章已经介绍过了,Flume Agent配置是从一个具有分层属性的Java属性文件格式的文件里读取的. 3.1 定义数据流 要在一个Flume Agent中定义数据流,你须要通过一个Channel将Source和Sin

【Flume NG用户指南】(2)配置

作者:周邦涛(Timen) Email:[email protected] 转载请注明出处:  http://blog.csdn.net/zhoubangtao/article/details/28277575 上一篇请参考[Flume NG用户指南](1)设置 3. 配置 前边的文章已经介绍过了,Flume Agent配置是从一个具有分层属性的Java属性文件格式的文件中读取的. 3.1 定义数据流 要在一个Flume Agent中定义数据流,你需要通过一个Channel将Source和Sin

【转】Flume(NG)架构设计要点及配置实践

Flume(NG)架构设计要点及配置实践 Flume NG是一个分布式.可靠.可用的系统,它能够将不同数据源的海量日志数据进行高效收集.聚合.移动,最后存储到一个中心化数据存储系统中.由原来的Flume OG到现在的Flume NG,进行了架构重构,并且现在NG版本完全不兼容原来的OG版本.经过架构重构后,Flume NG更像是一个轻量的小工具,非常简单,容易适应各种方式日志收集,并支持failover和负载均衡. 架构设计要点 Flume的架构主要有一下几个核心概念: Event:一个数据单元

Flume NG 学习笔记(五)Sinks和Channel配置

一.HDFS Sink Flume Sink是将事件写入到Hadoop分布式文件系统(HDFS)中.主要是Flume在Hadoop环境中的应用,即Flume采集数据输出到HDFS,适用大数据日志场景. 目前,它支持HDFS的文本和序列文件格式,以及支持两个文件类型的压缩.支持将所用的时间.数据大小.事件的数量为操作参数,对HDFS文件进行关闭(关闭当前文件,并创建一个新的).它还可以对事源的机器名(hostname)及时间属性分离数据,即通过时间戳将数据分布到对应的文件路径. HDFS目录路径可

Flume NG 简介及配置实战

Flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用.Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera.但随着 FLume 功能的扩展,Flume OG 代码工程臃肿.核心组件设计不合理.核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,为了解决这些问题,2011 年 10 月 22 号,cloudera 完成了

Flume NG 学习笔记(一)简介

一.简介 Flume是一个分布式.可靠.高可用的海量日志聚合系统,支持在系统中定制各类数据发送方,用于收集数据:同时,Flume提供对数据的简单处理,并写到各种数据接收方的能力. Flume在0.9.x and 1.x之间有较大的架构调整,1.x版本之后的改称Flume NG(next generation),0.9.x的称为Flume OG(originalgeneration). 对于OG版本, Flume NG (1.x.x)的主要变化如下: 1.sources和sinks 使用chann

分布式实时日志系统(二) 环境搭建之 flume 集群搭建/flume ng资料

最近公司业务数据量越来越大,以前的基于消息队列的日志系统越来越难以满足目前的业务量,表现为消息积压,日志延迟,日志存储日期过短,所以,我们开始着手要重新设计这块,业界已经有了比较成熟的流程,即基于流式处理,采用 flume 收集日志,发送到 kafka 队列做缓冲,storm 分布式实时框架进行消费处理,短期数据落地到 hbase.mongo中,长期数据进入 hadoop 中存储. 接下来打算将这其间所遇到的问题.学习到的知识记录整理下,作为备忘,作为分享,带给需要的人. 学习flume ng的