flume 多chanel配置

#配置文
a1.sources= r1
a1.sinks= k1 k2
a1.channels= c1 c2  

#Describe/configure the source  

a1.sources.r1.type = avro
a1.sources.r1.bind =  slave3
a1.sources.r1.port = 50001
a1.sources.r1.host=slave3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 

#第一个 hdfs
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /maats5/%{logtype}/logdate=%{date}/
a1.sinks.k1.hdfs.filePrefix = %{logtype}-%{date}
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = hour
a1.sinks.k1.hdfs.rollInterval = 180000
a1.sinks.k1.hdfs.rollSize = 134217700

a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text
#第二个到kafka

#a1.channels = c1
#a1.sinks = k1
#a1.sinks.k1.type = org.example.MySink
#a1.sinks.k1.channel = c1

a1.sinks.k2.channel = c2
a1.sinks.k2.type = my.bigdata.KafkaSink2
a1.sinks.k2.kafka.topic = maats1
a1.sinks.k2.kafka.bootstrap.servers = slave3:9092
a1.sinks.k2.kafka.flumeBatchSize = 20
a1.sinks.k2.kafka.producer.acks = 1
a1.sinks.k2.kafka.producer.linger.ms = 1
#a1.sinks.ki.kafka.producer.compression.type = snappy

# Usea channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity= 1000
a1.channels.c1.transactionCapacity= 100  

a1.channels.c2.type= memory
a1.channels.c2.capacity= 1000
a1.channels.c2.transactionCapacity= 100  
时间: 2024-10-20 00:44:32

flume 多chanel配置的相关文章

flume安装及配置介绍(二)

注: 环境: sklin-linux Flume的下载方式: wget http://www.apache.org/dyn/closer.lua/flume/1.6.0/apache-flume-1.6.0-bin.tar. 下载完成之后,使用tar进行解压 tar -zvxf apache-flume-1.6..0-bin.tar. 进入flume的conf配置包中,使用命令touch flume.conf,然后cp flume-conf.properties.template flume.c

Flume负载均衡配置

flume负载均衡配置 集群DNS配置如下: hadoop-maser 192.168.177.162 machine-0192.168.177.158 machine-1191.168.177.167 配置主Flume,在hadoop-maser机上.配置文件为loadbalance.properties. agent.sources=s1 agent.channels=c1 agent.sinks=k1 k2 agent.sinkgroups = g1 agent.sinkgroups.g1

flume自动reload配置的源码分析

在1.5.0的flume版本中开始提供这个功能,判断配置文件的更新时间戳来reload服务原理:1)在启动中使用EventBus.register注册Application对象,同时Application有一个Subscribe的方法handleConfigurationEvent(参数是MaterializedConfiguration对象)2)定义了一个计划任务线程池,检测到文件更新情况(判断文件的更新时间)3)如果检测到文件有更新会使用EventBus.post方法发送这个event(Ma

Flume的安装配置

flume是一个分布式.可靠.和高可用的海量日志采集.聚合和传输的系统.支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本.HDFS.Hbase等)的能力 . 一.什么是Flume? flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用.Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera.但随着 FLume 功能的扩展,F

flume + kafka 基本配置

apache-flume1.6 sink默认支持kafka [FLUME-2242] - Flume Sink and Source for Apache Kafka 官方给的例子很贴心,可以直接运行=,=,详细配置之后慢慢看. a1.channels = channel1a1.sources = src-1a1.sinks = k1 a1.sources.src-1.type = spooldira1.sources.src-1.channels = channel1a1.sources.sr

Flume伪分布式配置

配置Flume tar -zxvf apache-flume-1.8.0-bin.tar.gz mkdir /opt/flume/ mv apache-flume-1.8.0-bin.tar.gz /opt/flume/flume1.8.0/ 配置环境变量 vim /etc/profile export FLUME_HOME=/opt/flume/flume1.8.0 export FLUME_CONF_DIR=${FLUME_HOME}/conf export PATH=.:${JAVA_HO

关于Flume中Chanel.Selector.header解释

flume内置的ChannelSelector有两种,分别是Replicating和Multiplexing. Replicating类型的ChannelSelector会针对每一个Event,拷贝到所有的Channel中,这是默认的ChannelSelector. replicating类型的ChannelSelector例子如下 1 a1.sources = r1 2 a1.channels = c1 c2 # 如果有100个Event,那么c1和c2中都会有这100个事件 3 4 a1.c

【转】Flume(NG)架构设计要点及配置实践

Flume(NG)架构设计要点及配置实践 Flume NG是一个分布式.可靠.可用的系统,它能够将不同数据源的海量日志数据进行高效收集.聚合.移动,最后存储到一个中心化数据存储系统中.由原来的Flume OG到现在的Flume NG,进行了架构重构,并且现在NG版本完全不兼容原来的OG版本.经过架构重构后,Flume NG更像是一个轻量的小工具,非常简单,容易适应各种方式日志收集,并支持failover和负载均衡. 架构设计要点 Flume的架构主要有一下几个核心概念: Event:一个数据单元

日志收集系统Flume调研笔记第2篇 - Flume配置及使用实例

上篇笔记对Flume的使用场景和系统架构做了介绍,本篇笔记以实例说明Flume的配置方法.下面开始正文. 1. Flume使用实例 1.1 配置 Flume agent的3个组件及其拓扑关系是在配置文件中指定的,总的原则是必须列出source/channel/sink的name/type等重要的配置项,并通过channel将source(s)和sink(s)连接起来,此外,1个source可以指定多个channel,而1个sink只能接收来自1个channel的数据. 这里给出的是部署1套含1个