flume单channel多sink的测试

说明:

该结果是亲自测试,只提供简单的数据分析,很简陋,结果可能不准确。

先说一下结果,多sink可以直接按常规配置,这样的话每个sink会启动一个sinkrunner,相当于每个线程一个sink,互不干扰,负载均衡是通过channel实现的,效率会提高为n倍,如果在此基础上加入

sinkgroup,则sinkgroup会启动一个sinkrunner,就是单线程,sinkgroup从channel中读取数据,然后分发到下面挂载的sink中,效率和单sink一样,没有提高,但是可以实现两个sink的负载均衡或者热备模式。

上面的分析即参考了源码也参考了网上的文章,不保证绝对正确,见谅。

个人认为实际使用中还是直接配置多sink,可以提高效率,达到负载均衡,至于热备,可以通过其他负载均衡软件或者硬件提供虚IP实现。

贴一下测试的配置

配置是一样的,用的时候打开或者关闭sinkgroup的注释即可。

这是采集结点的配置

#flume配置文件

agent1.sources=execSource

agent1.sinks= avrosink1 avrosink2

agent1.channels=filechannel

#sink groups    非常影响性能

#agent1.sinkgroups=avroGroup

#agent1.sinkgroups.avroGroup.sinks = avrosink1 avrosink2

#sink调度模式 load_balance  failover

#agent1.sinkgroups.avroGroup.processor.type=load_balance

#负载均衡模式  轮询  random  round_robin

#agent1.sinkgroups.avroGroup.processor.selector=round_robin

#失效降级

#agent1.sinkgroups.avroGroup.processor.backoff=true

#降级时间30秒

#agent1.sinkgroups.avroGroup.processor.maxTimeOut=30000

#配置execSource

#channel

agent1.sources.execSource.channels=filechannel

#souorce 类型

agent1.sources.execSource.type=exec

#监控正在写入的日志文件

agent1.sources.execSource.command=tail -F /home/flume/log/test.log

#如果命令死掉是否重新启动

agent1.sources.execSource.restart=true

#重新启动命令的间隔时间

agent1.sources.execSource.restartThrottle=2000

#记录命令的错误日志

agent1.sources.execSource.logStdErr=true

#批量提交的大小

agent1.sources.execSource.batchSize=1000

#批量提交的超时 单位毫秒

agent1.sources.execSource.batchTimeout=1000

#配置filechannel

#channel类型  file  memory

agent1.channels.filechannel.type=memory

#agent1.channels.filechannel.checkpointDir=/home/flume/channel/log/ckpdir

#agent1.channels.filechannel.dataDirs=/home/flume/channel/log/data

#单个文件大小  100M

#agent1.channels.filechannel.maxFileSize=204800000

#channel的event个数

agent1.channels.filechannel.capacity=20000000

#事务event个数

agent1.channels.filechannel.transactionCapacity=10000

#内存channel占用内存大小 默认是jvm内存的0.8

agent1.channels.filechannel.byteCapacity=1024000000

#配置avrosink1

#sink的channel

agent1.sinks.avrosink1.channel=filechannel

#sink类型  avro  thrift

agent1.sinks.avrosink1.type=avro

#ip地址

agent1.sinks.avrosink1.hostname=10.8.6.161

#端口

agent1.sinks.avrosink1.port=1463

#批量提交的个数

agent1.sinks.avrosink1.batch-size=1000

#连接超时 毫秒

agent1.sinks.avrosink1.connect-timeout=3000

#请求超时 毫秒

agent1.sinks.avrosink1.request-timeout=20000

#重新连接source的时间 单位秒 用于后端负载均衡的轮询

agent1.sinks.avrosink1.reset-connection-interval=300

#最大连接数 默认5

agent1.sinks.avrosink1.maxConnections=5

#配置avrosink2

#sink的channel

agent1.sinks.avrosink2.channel=filechannel

#sink类型  avro  thrift

agent1.sinks.avrosink2.type=avro

#ip地址

agent1.sinks.avrosink2.hostname=10.8.6.160

#端口

agent1.sinks.avrosink2.port=1463

#批量提交的个数

agent1.sinks.avrosink2.batch-size=1000

#连接超时 毫秒

agent1.sinks.avrosink2.connect-timeout=3000

#请求超时 毫秒

agent1.sinks.avrosink2.request-timeout=20000

#重新连接source的时间 单位秒 用于后端负载均衡的轮询

agent1.sinks.avrosink2.reset-connection-interval=300

#最大连接数 默认5

agent1.sinks.avrosink2.maxConnections=5

这是汇聚结点的配置

#flume配置文件

agent1.sources=avrosource

agent1.sinks=hdfssink1 hdfssink2

agent1.channels=filechannel

#sink groups 可以用空格分开配置多个    非常影响性能关闭

#agent1.sinkgroups=hdfsGroup

#agent1.sinkgroups.hdfsGroup.sinks = hdfssink1 hdfssink2

#sink调度模式 load_balance  failover

#agent1.sinkgroups.hdfsGroup.processor.type=load_balance

#负载均衡模式  轮询  random  round_robin

#agent1.sinkgroups.hdfsGroup.processor.selector=round_robin

#失效降级

#agent1.sinkgroups.hdfsGroup.processor.backoff=true

#降级时间30秒

#agent1.sinkgroups.hdfsGroup.processor.maxTimeOut=30000

#配置avrosource

#channel

agent1.sources.avrosource.channels=filechannel

#source  类型  thrift  avro

agent1.sources.avrosource.type=avro

#监控正在写入的日志文件

agent1.sources.avrosource.bind=0.0.0.0

#端口

agent1.sources.avrosource.port=1463

#线程数

agent1.sources.avrosource.threads=24

#增加拦截器 可以用空格分开配置多个

agent1.sources.avrosource.interceptors=i1

#拦截器类型  必须配置Builder  由Builder来创建Interceptor

agent1.sources.avrosource.interceptors.i1.type=com.cfto.flume.interceptor.TimeStampInterceptor$Builder

#配置filechannel

#channel类型  file  memory

agent1.channels.filechannel.type=memory

agent1.channels.filechannel.checkpointDir=/tmp/flume1/channel/log/ckpdir

agent1.channels.filechannel.dataDirs=/tmp/flume1/channel/log/data

#单个文件大小  100M

#agent1.channels.filechannel.maxFileSize=204800000

#channel的event个数

agent1.channels.filechannel.capacity=200000000

#事务event个数

agent1.channels.filechannel.transactionCapacity=10000

#内存channel占用内存大小 默认是jvm内存的0.8

agent1.channels.filechannel.byteCapacity=1024000000

#配置hdfssink1

#连接的channel

agent1.sinks.hdfssink1.channel=filechannel

#sink的类型

agent1.sinks.hdfssink1.type=hdfs

#写入hdfs的路径 %{}是从header里取属性 %是自己解析属性 %Y/%m/%d

#最后不要有/

agent1.sinks.hdfssink1.hdfs.path = hdfs://nameservice1/flumelog/%{dateDir}

#文件名前缀

agent1.sinks.hdfssink1.hdfs.filePrefix=hostxx_1

#是否是用本地时间戳 header里没有timestamp属性且需要获取时间是必须设置为true

agent1.sinks.hdfssink1.hdfs.useLocalTimeStamp = true

#文件类型 SequenceFile(默认) DataStream(不压缩)  CompressedStream(压缩)

agent1.sinks.hdfssink1.hdfs.fileType=CompressedStream

#压缩编码

agent1.sinks.hdfssink1.hdfs.codeC=lzop

#文件写入格式 Text  Writable

agent1.sinks.hdfssink1.hdfs.writeFormat=Text

#按时间滚动文件 单位秒  默认30秒 0不滚动

agent1.sinks.hdfssink1.hdfs.rollInterval=0

#按文件大小滚动文件 单位字节  1G

agent1.sinks.hdfssink1.hdfs.rollSize=1024000000

#按event是个数滚动文件 默认10 0不滚动

agent1.sinks.hdfssink1.hdfs.rollCount=0

##批量提交大小

agent1.sinks.hdfssink1.hdfs.batchSize=1000

#HDFS IO操作的线程池大小

agent1.sinks.hdfssink1.hdfs.threadsPoolSize=10

#hdfs文件访问超时时间  默认 100000 单位毫秒

agent1.sinks.hdfssink1.hdfs.callTimeout=30000

#文件关闭前空闲时间 默认0 不关闭 单位秒

agent1.sinks.hdfssink1.hdfs.idleTimeout=300

#写入hdfs文件的用户

agent1.sinks.hdfssink1.hdfs.proxyUser=hadoop

#hdfs文件操作失败后的重试时间 单位秒 默认180

agent1.sinks.hdfssink1.hdfs.retryInterval = 3

#配置hdfssink2

#连接的channel

agent1.sinks.hdfssink2.channel=filechannel

#sink的类型

agent1.sinks.hdfssink2.type=hdfs

#写入hdfs的路径 %{}是从header里取属性 %是自己解析属性 %Y/%m/%d

#最后不要有/

agent1.sinks.hdfssink2.hdfs.path = hdfs://nameservice1/flumelog/%{dateDir}

#文件名前缀

agent1.sinks.hdfssink2.hdfs.filePrefix=hostxx_2

#是否是用本地时间戳 header里没有timestamp属性且需要获取时间是必须设置为true

agent1.sinks.hdfssink2.hdfs.useLocalTimeStamp = true

#文件类型 SequenceFile(默认) DataStream(不压缩)  CompressedStream(压缩)

agent1.sinks.hdfssink2.hdfs.fileType=CompressedStream

#压缩编码

agent1.sinks.hdfssink2.hdfs.codeC=lzop

#文件写入格式 Text  Writable

agent1.sinks.hdfssink2.hdfs.writeFormat=Text

#按时间滚动文件 单位秒  默认30秒 0不滚动

agent1.sinks.hdfssink2.hdfs.rollInterval=0

#按文件大小滚动文件 单位字节  1G

agent1.sinks.hdfssink2.hdfs.rollSize=1024000000

#按event是个数滚动文件 默认10 0不滚动

agent1.sinks.hdfssink2.hdfs.rollCount=0

##批量提交大小

agent1.sinks.hdfssink2.hdfs.batchSize=1000

#HDFS IO操作的线程池大小

agent1.sinks.hdfssink2.hdfs.threadsPoolSize=10

#hdfs文件访问超时时间  默认 100000 单位毫秒

agent1.sinks.hdfssink2.hdfs.callTimeout=30000

#文件关闭前空闲时间 默认0 不关闭 单位秒

agent1.sinks.hdfssink2.hdfs.idleTimeout=300

#写入hdfs文件的用户

agent1.sinks.hdfssink2.hdfs.proxyUser=hadoop

#hdfs文件操作失败后的重试时间 单位秒 默认180

agent1.sinks.hdfssink2.hdfs.retryInterval = 3

时间: 2024-08-06 11:41:09

flume单channel多sink的测试的相关文章

[flume] channel 和 sink

上周把安卓日志手机的客户端sdk完成跑通,这周开始调试日志服务器端. 使用flume进行日志收集,然后转kafka.在测试的时候总是发现漏掉一些event,后来才知道对 channel 和 sink 的使用有误.当多个sink使用同一个channel时,event是会分流共同消耗的,而不是每个sink都复制.最后,改成多个channel,每个channel对应一个sink的模式. 有篇不错的博客,http://shiyanjun.cn/archives/915.html

Flume内置channel,source,sink汇总

由于经常会使用到Flume的一些channel,source,sink,于是为了方便将这些channel,source,sink汇总出来,也共大家访问. Component Interface Type Alias Implementation Class *.Channel memory *.channel.MemoryChannel *.Channel jdbc *.channel.jdbc.JdbcChannel *.Channel file *.channel.file.FileChan

【Flume】【*】深入flume-ng的三大组件——source,channel,sink

概览 flume-ng中最重要的核心三大组件就是source,channel,sink source负责从源端收集数据,产出event channel负责暂存event,以备下游取走消费 sink负责消费通道中的event,写到最终的输出端上 以上是总体的一个简单结构图,下面我们来深入每一个组件的内部看看: 1.Source source接口的定义如下: @InterfaceAudience.Public @InterfaceStability.Stable public interface S

【Flume】从入口Application来分析flume的source和sink是如何与channel交互的

大家在启动flume的时候,输入的命令就可以看出flume的启动入口了 [[email protected] apache-flume-1.5.2-bin]# sh bin/flume-ng agent -c conf -f conf/server.conf -n a1 Info: Sourcing environment configuration script /home/flume/apache-flume-1.5.2-bin/conf/flume-env.sh + exec /home/

Flume NG安装部署及数据采集测试

转载请注明出处:http://www.cnblogs.com/xiaodf/ Flume作为日志收集工具,监控一个文件目录或者一个文件,当有新数据加入时,采集新数据发送给消息队列等. 1 安装部署Flume 若要采集数据节点的本地数据,每个节点都需要安装一个Flume工具,用来做数据采集. 1.1 下载并安装 到官网去下载最新版本的Flume 下载地址为:http://flume.apache.org/,目前最新版本为1.6.0,需要1.7及以上版本的JDK. 1.解压 tar -xzvf ap

Flume实时监控目录sink到hdfs

目标:Flume实时监控目录sink到hdfs,再用sparkStreaming监控hdfs的这个目录,对数据进行计算 1.flume的配置,配置spoolDirSource_hdfsSink.properties,监控本地的一个目录,上传到hdfs一个目录下. agent1.channels = ch1agent1.sources = spoolDir-source1agent1.sinks = hdfs-sink1 # 定义channelagent1.channels.ch1.type =

Flume -- 初识flume、source和sink

Flume – 初识flume.source和sink 目录基本概念常用源 Source常用sink 基本概念 ? 什么叫flume? 分布式,可靠的大量日志收集.聚合和移动工具. ? events 事件,是一行数据的字节数据,是flume发送文件的基本单位. ? flume配置文件 重命名flume-env.sh.template为flume-env.sh,并添加[export JAVA_HOME=/soft/jdk] ? flume的Agent source //从哪儿读数据. 负责监控并收

Flume监听文件目录sink至hdfs配置

一:flume介绍 Flume是一个分布式.可靠.和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据:同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力.,Flume架构分为三个部分 源-Source,接收器-Sink,通道-Channel. 二:配置文件 此配置文件source为一个目录,注意,该目录下的文件应为只读,不可写,且文件名不能相同,采用的channels为file,sink为hdfs,此处往hdfs写的策略是当时间达到3600s或者

Flume(4)实用环境搭建:source(spooldir)+channel(file)+sink(hdfs)方式

一.概述: 在实际的生产环境中,一般都会遇到将web服务器比如tomcat.Apache等中产生的日志倒入到HDFS中供分析使用的需求.这里的配置方式就是实现上述需求. 二.配置文件: #agent1 name agent1.sources=source1 agent1.sinks=sink1 agent1.channels=channel1 #Spooling Directory #set source1 agent1.sources.source1.type=spooldir agent1.