需求:做一个windows服务,实现从ftp服务器实时下载或者更新文件到本地磁盘。
功能挺简单的。直接写个ftp工具类用定时器跑就能搞定,那我为什么不用呢?
别问,问就是我无聊啊,然后研究一下Flume打发时间。哈哈~
一、Flume部分
Source组件和Sink组件用的都是第三方。
source组件:https://github.com/keedio/flume-ftp-source
Sink组件用的谁的目前已经找不到了,网上搜到了一个升级版的。
sink组件:https://github.com/huyanping/flume-sinks-safe-roll-file-sink
因为一些个性化的需求,所以我对他们源代码做了些变动。
具体代码参考:https://gitee.com/syher/spring-boot-project/tree/master/spring-boot-flume
Ftp-Source组件的关键技术是Apache FtpClient,而TailDir-sink则用的RandomAccessFile。
Junit测试类我已经写好了,如果不想安装服务又有兴趣了解的朋友,可以自己改下配置跑一下看看。
package com.syher.flume; import com.google.common.collect.Lists; import com.urey.flume.sink.taildir.SafeRollingFileSink; import org.apache.flume.*; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.channel.ReplicatingChannelSelector; import org.apache.flume.conf.Configurables; import org.apache.flume.sink.DefaultSinkProcessor; import org.apache.flume.sink.RollingFileSink; import org.apache.flume.source.PollableSourceRunner; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.keedio.flume.source.ftp.source.Source; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; //@RunWith(SpringRunner.class) //@SpringBootTest public class SpringBootFlumeApplicationTests { Context defaultContext = new Context(); @Before public void init() throws Exception { Map<String, String> prop = new HashMap<>(); prop.put("channel.capacity", "1000"); prop.put("channel.transactionCapacity", "1000"); prop.put("source.client.source", "ftp"); prop.put("source.name.server", "192.168.1.150"); prop.put("source.port", "21"); prop.put("source.user", "username"); prop.put("source.password", "secret"); prop.put("source.working.directory", "/ftp/source"); prop.put("source.filter.pattern", ".+\\.pdf"); // prop.put("source.folder", "/ftp"); prop.put("source.flushlines", "false"); prop.put("sink.sink.directory", "G:/ftp/target/rolling"); prop.put("sink.sink.moveFile", "false"); prop.put("sink.sink.targetDirectory", "G:/ftp/target/PDFfiles"); prop.put("sink.sink.useCopy", "true"); prop.put("sink.sink.copyDirectory", "G:/ftp/target/copy"); prop.put("sink.sink.useFileSuffix", "false"); prop.put("sink.sink.fileSuffix", ".log"); defaultContext.putAll(prop); } public MemoryChannel getChannel() { MemoryChannel channel = new MemoryChannel(); channel.setName("channel"); configure(channel, "channel."); return channel; } public Source getSource(Channel channel) { Source source = new Source(); source.setName("source"); ChannelSelector selector = new ReplicatingChannelSelector(); selector.setChannels(Lists.newArrayList(channel)); ChannelProcessor processor = new ChannelProcessor(selector); source.setChannelProcessor(processor); configure(source, "source."); return source; } public Sink getSink(Channel channel) { SafeRollingFileSink sink = new SafeRollingFileSink(); sink.setName("sink"); sink.setChannel(channel); configure(sink, "sink."); return sink; } public void configure(Object target, String prefixProperty) { Context context = new Context(); context.putAll(defaultContext.getSubProperties(prefixProperty)); Configurables.configure(target, context); } @Test public void contextLoads() throws Exception { MemoryChannel channel = getChannel(); Source source = getSource(channel); Sink sink = getSink(channel); PollableSourceRunner sourceRunner = new PollableSourceRunner(); sourceRunner.setSource(source); channel.start(); sourceRunner.start(); SinkProcessor sinkProcessor = new DefaultSinkProcessor(); sinkProcessor.setSinks(Arrays.<Sink>asList(sink)); SinkRunner sinkRunner = new SinkRunner(sinkProcessor); channel.start(); sourceRunner.start(); sinkRunner.start(); while (!Thread.interrupted()) { Thread.sleep(200); } } }
二、JSW服务部分
用的java service wrapper把java程序做成了windows服务。
工具包已经上传在我上面提到的gitee码云项目上。flume-wrapper.zip。
解压后在conf目录可以看到两个配置文件。一个是flume的,一个是jsw的。
bin目录里面是一些装卸启停的批命令。
lib目录里面有项目运行依赖的jar包。
lib.d目录没啥用,是我备份了从flume拷出来的一些无用的jar包。可删。
具体的配置和用法可以看压缩包里的使用说明文档。
注意,jsw的logfile的日志级别最好指定ERROR级别的,不然听说、可能会造成内存不足。
三、采集结果
可以看到,采集效率还是很稳的。一分钟不到就搞定了。
原文地址:https://www.cnblogs.com/braska/p/10327247.html