基于Flume做FTP文件实时同步到本地磁盘的windows服务。

需求:做一个windows服务,实现从ftp服务器实时下载或者更新文件到本地磁盘。

功能挺简单的。直接写个ftp工具类用定时器跑就能搞定,那我为什么不用呢?

别问,问就是我无聊啊,然后研究一下Flume打发时间。哈哈~

一、Flume部分

Source组件和Sink组件用的都是第三方。

source组件:https://github.com/keedio/flume-ftp-source

Sink组件用的谁的目前已经找不到了,网上搜到了一个升级版的。

sink组件:https://github.com/huyanping/flume-sinks-safe-roll-file-sink

因为一些个性化的需求,所以我对他们源代码做了些变动。

具体代码参考:https://gitee.com/syher/spring-boot-project/tree/master/spring-boot-flume

Ftp-Source组件的关键技术是Apache FtpClient,而TailDir-sink则用的RandomAccessFile。

Junit测试类我已经写好了,如果不想安装服务又有兴趣了解的朋友,可以自己改下配置跑一下看看。

package com.syher.flume;

import com.google.common.collect.Lists;
import com.urey.flume.sink.taildir.SafeRollingFileSink;
import org.apache.flume.*;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.channel.ReplicatingChannelSelector;
import org.apache.flume.conf.Configurables;
import org.apache.flume.sink.DefaultSinkProcessor;
import org.apache.flume.sink.RollingFileSink;
import org.apache.flume.source.PollableSourceRunner;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.keedio.flume.source.ftp.source.Source;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

//@RunWith(SpringRunner.class)
//@SpringBootTest
public class SpringBootFlumeApplicationTests {
    Context defaultContext = new Context();

    @Before
    public void init() throws Exception {
        Map<String, String> prop = new HashMap<>();
        prop.put("channel.capacity", "1000");
        prop.put("channel.transactionCapacity", "1000");

        prop.put("source.client.source", "ftp");
        prop.put("source.name.server", "192.168.1.150");
        prop.put("source.port", "21");
        prop.put("source.user", "username");
        prop.put("source.password", "secret");
        prop.put("source.working.directory", "/ftp/source");
        prop.put("source.filter.pattern", ".+\\.pdf");
       // prop.put("source.folder", "/ftp");
        prop.put("source.flushlines", "false");

        prop.put("sink.sink.directory", "G:/ftp/target/rolling");
        prop.put("sink.sink.moveFile", "false");
        prop.put("sink.sink.targetDirectory", "G:/ftp/target/PDFfiles");
        prop.put("sink.sink.useCopy", "true");
        prop.put("sink.sink.copyDirectory", "G:/ftp/target/copy");
        prop.put("sink.sink.useFileSuffix", "false");
        prop.put("sink.sink.fileSuffix", ".log");
        defaultContext.putAll(prop);
    }

    public MemoryChannel getChannel() {
        MemoryChannel channel = new MemoryChannel();
        channel.setName("channel");

        configure(channel, "channel.");
        return channel;
    }

    public Source getSource(Channel channel) {
        Source source = new Source();
        source.setName("source");

        ChannelSelector selector = new ReplicatingChannelSelector();
        selector.setChannels(Lists.newArrayList(channel));

        ChannelProcessor processor = new ChannelProcessor(selector);
        source.setChannelProcessor(processor);

        configure(source, "source.");
        return source;
    }

    public Sink getSink(Channel channel) {
        SafeRollingFileSink sink = new SafeRollingFileSink();
        sink.setName("sink");

        sink.setChannel(channel);

        configure(sink, "sink.");
        return sink;
    }

    public void configure(Object target, String prefixProperty) {
        Context context = new Context();
        context.putAll(defaultContext.getSubProperties(prefixProperty));
        Configurables.configure(target, context);
    }

    @Test
    public void contextLoads() throws Exception {
        MemoryChannel channel = getChannel();
        Source source = getSource(channel);
        Sink sink = getSink(channel);

        PollableSourceRunner sourceRunner = new PollableSourceRunner();
        sourceRunner.setSource(source);
        channel.start();
        sourceRunner.start();

        SinkProcessor sinkProcessor = new DefaultSinkProcessor();
        sinkProcessor.setSinks(Arrays.<Sink>asList(sink));
        SinkRunner sinkRunner = new SinkRunner(sinkProcessor);

        channel.start();
        sourceRunner.start();
        sinkRunner.start();

        while (!Thread.interrupted()) {
            Thread.sleep(200);
        }
    }

}

二、JSW服务部分

用的java service wrapper把java程序做成了windows服务。

工具包已经上传在我上面提到的gitee码云项目上。flume-wrapper.zip。

解压后在conf目录可以看到两个配置文件。一个是flume的,一个是jsw的。

bin目录里面是一些装卸启停的批命令。

lib目录里面有项目运行依赖的jar包。

lib.d目录没啥用,是我备份了从flume拷出来的一些无用的jar包。可删。

具体的配置和用法可以看压缩包里的使用说明文档。

注意,jsw的logfile的日志级别最好指定ERROR级别的,不然听说、可能会造成内存不足。

三、采集结果

可以看到,采集效率还是很稳的。一分钟不到就搞定了。

原文地址:https://www.cnblogs.com/braska/p/10327247.html

时间: 2024-10-29 05:43:41

基于Flume做FTP文件实时同步到本地磁盘的windows服务。的相关文章

基于rsync+inotify实现文件实时同步

rsync英文名remote  synchronization,可使本地和远程两台主机之间的数据快速复制同步镜像 远程备份的功能,rsync在拷贝时候会先比较源数据跟目标数据,不一致才复制,实现增量拷贝 监听于873/tcp rsync有许多选项: -n: 在不确定命令是否能按意愿执行时,务必要事先测试:-n可以完成此功能: -v: --verbose,详细输出模式 -q: --quiet,静默模式 尽可能输出少 -c: --checksum,开启校验功能,强制对文件传输进行校验 -r: --r

Rsync+inotify实现文件实时同步

数据备份.文件备份是运维.DBA等岗位最熟悉不过的话题,这里不介绍数据库的备份,简单介绍一下文件同步工具,这样的工具有很多,Windows环境下有Goodsync.FreeFileSync等,Linux下rsync.unison等,常用的实时同步,是几种工具的组合,经过组合的工具达到文件实时同步的效果. 一.常用实时同步方案 1.NFS网络文件系统 该方案是分布式架构中,解决不同节点对同一资源访问的问题,搭建NFS服务器,将其挂载在不同的节点,每个节点将公用的数据存储在NFS服务器上,实现文件的

基于sersync海量文件实时同步

今天我们主要讲解海量文件实时同步,最近涉及到数百万图片数据迁移,为了使图片数据快速迁移,并保证数据数据的一致性,无缝切换.尝试了多种方案. 方案1:rsync+inotify同步,最先想到的是此方案,以前在多台服务器间的做代码同步就常用此方法,因为代码文件数并不多.现在用在海量文件同步时,其缺陷也暴露出来,分析原理后,我们知道,每次新增数据都会做一次全量同步,在数据量不大的情况下同步挺快,但当文件数达到数百万就相当慢,每次对比整个目录的数量庞大的文件:第二,inotify监听的事件也非常多,创建

rsync+inotify 实现服务器之间目录文件实时同步(转)

软件简介: 1.rsync 与传统的 cp. tar 备份方式相比,rsync 具有安全性高.备份迅速.支持增量备份等优点,通过 rsync 可 以解决对实时性要求不高的数据备份需求,例如定期的备份文件服务器数据到远端服务器,对本地磁盘定 期做数据镜像等. 随着应用系统规模的不断扩大,对数据的安全性和可靠性也提出的更好的要求,rsync 在高端业务系统中 也逐渐暴露出了很多不足,首先,rsync 同步数据时,需要扫描所有文件后进行比对,进行差量传输.如 果文件数量达到了百万甚至千万量级,扫描所有

sersync基于rsync+inotify实现数据实时同步

一.环境描述 需求:服务器A与服务器B为主备服务模式,需要保持文件一致性,现采用sersync基于rsync+inotify实现数据实时同步 主服务器A:192.168.1.23 从服务器B:192.168.1.243 实时同步/var/atlassian目录到从服务器. 二.实施 1.从服务器192.168.1.243 rsync服务搭建 1.1安装软件包 wget http://rsync.samba.org/ftp/rsync/src/rsync-3.1.1.tar.gz tar xf r

Centos 6.5 rsync+inotify 两台服务器文件实时同步

rsync和inotify是什么我这里就不在介绍了,有专门的文章介绍这两个工具. 1.两台服务器IP地址分别为: 源服务器:192.168.1.2 目标服务器:192.168.1.3 @todo:从源服务器(192.168.1.2)的/www/目录下的所有的文件实时同步到目标服务器(192.168.1.3)的/www_bak/目录下 源服务器下需要安装rsync和inotify,源服务器做为server端,实时的向目标服务器client端发送数据 2.安装 rsync 一般centos6.5下都

rsync+ssh+inotify实现服务器文件实时同步

如何实现两台web服务器的文件同步的,答案是rsync,但是,如何做到实时同步呢,cron已经达不到这样的要求了,同步的再快,也会有时间间隔,cron时刻执行,也会浪费系统的资源,下面,我将介绍ssh+rsync+inotify来实现两台web间的文件实时同步. 拓扑如下:  实验的linux系统为CentOS 6.5,实验之前确保开发环境已安装完毕 首先,我们介绍一下inotify,这是linux的一个新特性,在2.6的内核开始加入,它是监控文件系统,并且及时的向我们的rsync发出相关信息,

linux下rsync+inotify实现服务器之间文件实时同步

先介绍一下rsync与inotify. 1.rsync 与传统的cp.tar备份方式相比,rsync具有安全性高.备份迅速.支持增量备份等优点,通过rsync可以解决对实时性要求不高的数据备份需求,例如定期的备份文件服务器数据到远端服务器,对本地磁盘定期做数据镜像等.随着应用系统规模的不断扩大,对数据的安全性和可靠性也提出的更好的要求,rsync在高端业务系统中也逐渐暴露出了很多不足,首先,rsync同步数据时,需要扫描所有文件后进行比对,进行差量传输.如果文件数量达到了百万甚至千万量级,扫描所

rsync + inotify 打造多服务器间文件实时同步

在上篇文章ssh无密码登陆服务器的基础之上,可以利用rsync + Inotify 在多服务器间实现文件自动同步. 如下测试机基于三台服务器做的,内网IP分别如下: 172.16.3.91    (主机) 172.16.3.92 (备份机1) 172.16.3.89 (备份机2) 现在想对主机上的/opt/sites/yutian_project目录下相关文件的任何操作同步到2台备份机上. 1.安装rsync 在三台机器上分别检查是否安装了rsync [[email protected] ~]#