说明:
该结果是亲自测试,只提供简单的数据分析,很简陋,结果可能不准确。
先说一下结果,多sink可以直接按常规配置,这样的话每个sink会启动一个sinkrunner,相当于每个线程一个sink,互不干扰,负载均衡是通过channel实现的,效率会提高为n倍,如果在此基础上加入
sinkgroup,则sinkgroup会启动一个sinkrunner,就是单线程,sinkgroup从channel中读取数据,然后分发到下面挂载的sink中,效率和单sink一样,没有提高,但是可以实现两个sink的负载均衡或者热备模式。
上面的分析即参考了源码也参考了网上的文章,不保证绝对正确,见谅。
个人认为实际使用中还是直接配置多sink,可以提高效率,达到负载均衡,至于热备,可以通过其他负载均衡软件或者硬件提供虚IP实现。
贴一下测试的配置
配置是一样的,用的时候打开或者关闭sinkgroup的注释即可。
这是采集结点的配置
#flume配置文件
agent1.sources=execSource
agent1.sinks= avrosink1 avrosink2
agent1.channels=filechannel
#sink groups 非常影响性能
#agent1.sinkgroups=avroGroup
#agent1.sinkgroups.avroGroup.sinks = avrosink1 avrosink2
#sink调度模式 load_balance failover
#agent1.sinkgroups.avroGroup.processor.type=load_balance
#负载均衡模式 轮询 random round_robin
#agent1.sinkgroups.avroGroup.processor.selector=round_robin
#失效降级
#agent1.sinkgroups.avroGroup.processor.backoff=true
#降级时间30秒
#agent1.sinkgroups.avroGroup.processor.maxTimeOut=30000
#配置execSource
#channel
agent1.sources.execSource.channels=filechannel
#souorce 类型
agent1.sources.execSource.type=exec
#监控正在写入的日志文件
agent1.sources.execSource.command=tail -F /home/flume/log/test.log
#如果命令死掉是否重新启动
agent1.sources.execSource.restart=true
#重新启动命令的间隔时间
agent1.sources.execSource.restartThrottle=2000
#记录命令的错误日志
agent1.sources.execSource.logStdErr=true
#批量提交的大小
agent1.sources.execSource.batchSize=1000
#批量提交的超时 单位毫秒
agent1.sources.execSource.batchTimeout=1000
#配置filechannel
#channel类型 file memory
agent1.channels.filechannel.type=memory
#agent1.channels.filechannel.checkpointDir=/home/flume/channel/log/ckpdir
#agent1.channels.filechannel.dataDirs=/home/flume/channel/log/data
#单个文件大小 100M
#agent1.channels.filechannel.maxFileSize=204800000
#channel的event个数
agent1.channels.filechannel.capacity=20000000
#事务event个数
agent1.channels.filechannel.transactionCapacity=10000
#内存channel占用内存大小 默认是jvm内存的0.8
agent1.channels.filechannel.byteCapacity=1024000000
#配置avrosink1
#sink的channel
agent1.sinks.avrosink1.channel=filechannel
#sink类型 avro thrift
agent1.sinks.avrosink1.type=avro
#ip地址
agent1.sinks.avrosink1.hostname=10.8.6.161
#端口
agent1.sinks.avrosink1.port=1463
#批量提交的个数
agent1.sinks.avrosink1.batch-size=1000
#连接超时 毫秒
agent1.sinks.avrosink1.connect-timeout=3000
#请求超时 毫秒
agent1.sinks.avrosink1.request-timeout=20000
#重新连接source的时间 单位秒 用于后端负载均衡的轮询
agent1.sinks.avrosink1.reset-connection-interval=300
#最大连接数 默认5
agent1.sinks.avrosink1.maxConnections=5
#配置avrosink2
#sink的channel
agent1.sinks.avrosink2.channel=filechannel
#sink类型 avro thrift
agent1.sinks.avrosink2.type=avro
#ip地址
agent1.sinks.avrosink2.hostname=10.8.6.160
#端口
agent1.sinks.avrosink2.port=1463
#批量提交的个数
agent1.sinks.avrosink2.batch-size=1000
#连接超时 毫秒
agent1.sinks.avrosink2.connect-timeout=3000
#请求超时 毫秒
agent1.sinks.avrosink2.request-timeout=20000
#重新连接source的时间 单位秒 用于后端负载均衡的轮询
agent1.sinks.avrosink2.reset-connection-interval=300
#最大连接数 默认5
agent1.sinks.avrosink2.maxConnections=5
这是汇聚结点的配置
#flume配置文件
agent1.sources=avrosource
agent1.sinks=hdfssink1 hdfssink2
agent1.channels=filechannel
#sink groups 可以用空格分开配置多个 非常影响性能关闭
#agent1.sinkgroups=hdfsGroup
#agent1.sinkgroups.hdfsGroup.sinks = hdfssink1 hdfssink2
#sink调度模式 load_balance failover
#agent1.sinkgroups.hdfsGroup.processor.type=load_balance
#负载均衡模式 轮询 random round_robin
#agent1.sinkgroups.hdfsGroup.processor.selector=round_robin
#失效降级
#agent1.sinkgroups.hdfsGroup.processor.backoff=true
#降级时间30秒
#agent1.sinkgroups.hdfsGroup.processor.maxTimeOut=30000
#配置avrosource
#channel
agent1.sources.avrosource.channels=filechannel
#source 类型 thrift avro
agent1.sources.avrosource.type=avro
#监控正在写入的日志文件
agent1.sources.avrosource.bind=0.0.0.0
#端口
agent1.sources.avrosource.port=1463
#线程数
agent1.sources.avrosource.threads=24
#增加拦截器 可以用空格分开配置多个
agent1.sources.avrosource.interceptors=i1
#拦截器类型 必须配置Builder 由Builder来创建Interceptor
agent1.sources.avrosource.interceptors.i1.type=com.cfto.flume.interceptor.TimeStampInterceptor$Builder
#配置filechannel
#channel类型 file memory
agent1.channels.filechannel.type=memory
agent1.channels.filechannel.checkpointDir=/tmp/flume1/channel/log/ckpdir
agent1.channels.filechannel.dataDirs=/tmp/flume1/channel/log/data
#单个文件大小 100M
#agent1.channels.filechannel.maxFileSize=204800000
#channel的event个数
agent1.channels.filechannel.capacity=200000000
#事务event个数
agent1.channels.filechannel.transactionCapacity=10000
#内存channel占用内存大小 默认是jvm内存的0.8
agent1.channels.filechannel.byteCapacity=1024000000
#配置hdfssink1
#连接的channel
agent1.sinks.hdfssink1.channel=filechannel
#sink的类型
agent1.sinks.hdfssink1.type=hdfs
#写入hdfs的路径 %{}是从header里取属性 %是自己解析属性 %Y/%m/%d
#最后不要有/
agent1.sinks.hdfssink1.hdfs.path = hdfs://nameservice1/flumelog/%{dateDir}
#文件名前缀
agent1.sinks.hdfssink1.hdfs.filePrefix=hostxx_1
#是否是用本地时间戳 header里没有timestamp属性且需要获取时间是必须设置为true
agent1.sinks.hdfssink1.hdfs.useLocalTimeStamp = true
#文件类型 SequenceFile(默认) DataStream(不压缩) CompressedStream(压缩)
agent1.sinks.hdfssink1.hdfs.fileType=CompressedStream
#压缩编码
agent1.sinks.hdfssink1.hdfs.codeC=lzop
#文件写入格式 Text Writable
agent1.sinks.hdfssink1.hdfs.writeFormat=Text
#按时间滚动文件 单位秒 默认30秒 0不滚动
agent1.sinks.hdfssink1.hdfs.rollInterval=0
#按文件大小滚动文件 单位字节 1G
agent1.sinks.hdfssink1.hdfs.rollSize=1024000000
#按event是个数滚动文件 默认10 0不滚动
agent1.sinks.hdfssink1.hdfs.rollCount=0
##批量提交大小
agent1.sinks.hdfssink1.hdfs.batchSize=1000
#HDFS IO操作的线程池大小
agent1.sinks.hdfssink1.hdfs.threadsPoolSize=10
#hdfs文件访问超时时间 默认 100000 单位毫秒
agent1.sinks.hdfssink1.hdfs.callTimeout=30000
#文件关闭前空闲时间 默认0 不关闭 单位秒
agent1.sinks.hdfssink1.hdfs.idleTimeout=300
#写入hdfs文件的用户
agent1.sinks.hdfssink1.hdfs.proxyUser=hadoop
#hdfs文件操作失败后的重试时间 单位秒 默认180
agent1.sinks.hdfssink1.hdfs.retryInterval = 3
#配置hdfssink2
#连接的channel
agent1.sinks.hdfssink2.channel=filechannel
#sink的类型
agent1.sinks.hdfssink2.type=hdfs
#写入hdfs的路径 %{}是从header里取属性 %是自己解析属性 %Y/%m/%d
#最后不要有/
agent1.sinks.hdfssink2.hdfs.path = hdfs://nameservice1/flumelog/%{dateDir}
#文件名前缀
agent1.sinks.hdfssink2.hdfs.filePrefix=hostxx_2
#是否是用本地时间戳 header里没有timestamp属性且需要获取时间是必须设置为true
agent1.sinks.hdfssink2.hdfs.useLocalTimeStamp = true
#文件类型 SequenceFile(默认) DataStream(不压缩) CompressedStream(压缩)
agent1.sinks.hdfssink2.hdfs.fileType=CompressedStream
#压缩编码
agent1.sinks.hdfssink2.hdfs.codeC=lzop
#文件写入格式 Text Writable
agent1.sinks.hdfssink2.hdfs.writeFormat=Text
#按时间滚动文件 单位秒 默认30秒 0不滚动
agent1.sinks.hdfssink2.hdfs.rollInterval=0
#按文件大小滚动文件 单位字节 1G
agent1.sinks.hdfssink2.hdfs.rollSize=1024000000
#按event是个数滚动文件 默认10 0不滚动
agent1.sinks.hdfssink2.hdfs.rollCount=0
##批量提交大小
agent1.sinks.hdfssink2.hdfs.batchSize=1000
#HDFS IO操作的线程池大小
agent1.sinks.hdfssink2.hdfs.threadsPoolSize=10
#hdfs文件访问超时时间 默认 100000 单位毫秒
agent1.sinks.hdfssink2.hdfs.callTimeout=30000
#文件关闭前空闲时间 默认0 不关闭 单位秒
agent1.sinks.hdfssink2.hdfs.idleTimeout=300
#写入hdfs文件的用户
agent1.sinks.hdfssink2.hdfs.proxyUser=hadoop
#hdfs文件操作失败后的重试时间 单位秒 默认180
agent1.sinks.hdfssink2.hdfs.retryInterval = 3