一、flume-ng简介
请参考官方文档:http://flume.apache.org/FlumeUserGuide.html
二、实例
需求说明:需要监控一个目录,并自动上传到服务器,且需要在传输过程中进行加密。
整体方案:n个client-agent -->server-agent
client-agent:
a1.sources = r1 a1.channels = c1 a1.sinks = k1 #source a1.sources.r1.type = spooldir a1.sources.r1.channels = c1 a1.sources.r1.basenameHeader = true #a1.sources.r1.ignorePattern = .+\.log$ a1.sources.r1.bufferMaxLineLength = 1048576 a1.sources.r1.batchSize = 5000 #拦截器 a1.sources.r1.interceptors = i1 i2 i3 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = id a1.sources.r1.interceptors.i2.type = static a1.sources.r1.interceptors.i2.key = key a1.sources.r1.interceptors.i3.type = com.landray.behavior.interceptor.BehaviorClientSerurityHDFSInterceptor$Builder #file channel a1.channels = c1 a1.channels.c1.type = file a1.channels.c1.checkpointDir = ./checkpoint a1.channels.c1.dataDirs = ./data #sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.compression-type = deflate a1.sinks.k1.compression-leve = 9 a1.sinks.k1.batch-size = 5000 a1.sinks.k1.hostname = http://test.com.cn a1.sinks.k1.port = 5281 a1.sinks.k1.request-timeout = 20000 #user define #需要上传的日志目录 a1.sources.r1.spoolDir =D:/flume_tes/source #客户唯一ID a1.sources.r1.interceptors.i1.value = 123456 #秘钥 a1.sources.r1.interceptors.i2.value = 密钥
server-agent:
a2.sources = r2 a2.channels = c2 a2.sinks = k2 #source a2.sources.r2.type = avro a2.sources.r2.channels = c2 a2.sources.r2.compression-type = deflate a2.sources.r2.bind = localhost a2.sources.r2.port = 5281 a2.sources.r2.interceptors = i1 a2.sources.r2.interceptors.i1.type = com.landray.behavior.interceptor.BehaviorServerSerurityInterceptor$Builder a2.channels = c2 a2.channels.c2.type = file a2.channels.c2.checkpointDir = ./checkpoint a2.channels.c2.dataDirs = ./data a2.channels.c2.transactionCapacity = 20000 #a2.channels.c2.type = memory #a2.channels.c2.capacity = 1000000 #a2.channels.c2.transactionCapacity = 20000 #a2.channels.c2.byteCapacityBufferPercentage = 20 #default 80% #a2.channels.c2.byteCapacity = 800000 a2.sinks.k2.type = com.landray.behavior.sink.BehaviorRollingFileSink a2.sinks.k2.channel = c2 #no check a2.sinks.k2.sink.rollInterval = 0 a2.sinks.k2.sink.batchSize = 20000 #user define #windows a2.sinks.k2.sink.directory = D:/behavior/logs #linux #a2.sinks.k2.sink.directory = /home/nemo/behavior
三、启动脚本
目录截图:
目录说明:
checkpoint:设置的是flume中的检查点
conf配置的是:log4j.properties
data:是flume的memory
Libs:flume启动的jar包
source.conf:配置文件
官网给出的是命令启动模式,比较不灵活,这里给出linux和window版本两种启动方式
linux:
#!/bin/sh JAVA_OPTS=-Xmx1024m $JAVA_HOME/bin/java $JAVA_OPTS -Dlog4j.configuration=file:./conf/log4j.properties -cp "./lib/*" org.apache.flume.node.Application --conf-file ./target.conf --name a2
windows:
@echo on set FLUME_HOME=%~dp0 echo %~dp0 rem 配置JAVA环境变量 set JAVA_HOME=%FLUME_HOME%..\jdk1.6 set PATH=%JAVA_HOME%\bin echo %JAVA_HOME% java -version java %JAVA_OPTS% -Dlog4j.configuration=file:./conf/log4j.properties -cp .;./lib/* org.apache.flume.node.Application--conf-file ./target_hdfs.conf --name a2 PAUSE
时间: 2024-10-14 19:40:45