今天继续讨论几个agent的配置。
第一个agent是从终端捕获特定命令执行的输出结果,并将文件输出到特定目录。先看一下配置的代码:
agent2.sources = execsource //指定为从命令获取输出的source agent2.sinks = filesink //输出到文件的sink agent2.channels = filechannel //输出到文件的channel agent2.sources.execsource.type = exec //类型 agent2.sources.execsource.command = cat /home/leung/message //指定命令 agent2.sinks.filesink.type = FILE_ROLL agent2.sinks.filesink.sink.directory = /home/leung/flume/files //输出目录 agent2.sinks.filesink.sink.rollInterval = 0 agent2.channels.filechannel.type = file agent2.channels.filechannel.checkpointDir = /home/leung/flume/fc/checkpoint //检查点 agent2.channels.filechannel.dataDirs = /home/leung/flume/fc/data //channel的数据目录 agent2.sources.execsource.channels = filechannel agent2.sinks.filesink.channel = filechannel
OK,启动agent2,然后查看结果。
结果如下图。可以看到,执行 cat /home/leung/message命令之后,输出的结果与files目录中的文件内容是一致的,证明已经成功写入文件。
下一个agent是从网络端口 获取数据然后写到Hadoop集群的HDFS中。先看看配置代码:
agent4.sources = netsource agent4.sinks = hdfssink //HDFS sink agent4.channels = memorychannel agent4.sources.netsource.type = netcat agent4.sources.netsource.bind = localhost agent4.sources.netsource.port = 3000 agent4.sinks.hdfssink.type = hdfs agent4.sinks.hdfssink.hdfs.path = /flume //写出到HDFS上的文件目录,不需要提前创建 agent4.sinks.hdfssink.hdfs.filePrefix = log //指定写出文件的文件名前缀 agent4.sinks.hdfssink.hdfs.rollInterval = 0 agent4.sinks.hdfssink.hdfs.rollCount = 3 agent4.sinks.hdfssink.hdfs.fileType = DataStream agent4.channels.memorychannel.type = memory agent4.channels.memorychannel.capacity = 1000 agent4.channels.memorychannel.transactionCapacity = 100 agent4.sources.netsource.channels = memorychannel agent4.sinks.hdfssink.channel = memorychannel
下面启动agent4以及查看一下结果。
下面查看一下结果。发现在HDFS中已经新建了一个flume文件夹,并且已经写入了指定的内容。
接着我们为文件夹名加一个时间戳。详细看如下配置代码。
agent5.sources = netsource agent5.sinks = hdfssink agent5.channels = memorychannel agent5.sources.netsource.type = netcat agent5.sources.netsource.bind = localhost agent5.sources.netsource.port = 3000 agent5.sources.netsource.interceptors = ts agent5.sources.netsource.interceptors.ts.type = org.apache.flume.interceptor.TimestampInterceptor$Builder //引用这个类方法添加时间戳 agent5.sinks.hdfssink.type = hdfs agent5.sinks.hdfssink.hdfs.path = /flume-%Y-%m-%d //定义文件夹名格式 agent5.sinks.hdfssink.hdfs.filePrefix = log- agent5.sinks.hdfssink.hdfs.rollInterval = 0 agent5.sinks.hdfssink.hdfs.rollCount = 3 agent5.sinks.hdfssink.hdfs.fileType = DataStream agent5.channels.memorychannel.type = memory agent5.channels.memorychannel.capacity = 1000 agent5.channels.memorychannel.transactionCapacity = 100 agent5.sources.netsource.channels = memorychannel agent5.sinks.hdfssink.channel = memorychannel
OK,下面启动agent5。
下面查看一下结果。可以看到文件夹的名字被如期地加上了日期。
OK,先到这里,还有两个稍微复杂一点点的agent下次再讨论。本人水平有限,请各位不吝指正!谢谢!
时间: 2024-10-22 23:36:48