storm-hdfs RotationActions 接口用法

storm-hdfs 这个插件支持  Rotation Actions 这个功能,官方文档解释是这样的,

### File Rotation Actions

Both the HDFS bolt and Trident State implementation allow you to register any number of `RotationAction`s.

What `RotationAction`s do is provide a hook to allow you to perform some action right after a file is rotated. For

example, moving a file to a different location or renaming it.

大概意思就是,写完hdfs文件之后会调用这个注册的方法,可以做个善后工作,比如移动个文件夹啥的。官方给了个例子,MoveFileAction 也是当文件写完之后移动到另外一个文件夹中,可是坑爹的是这个类不好用。于是没办法只能自己手动写一个了,把这个公布出来大家一起分享。

package lanbo.storm.kafka.examples;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.storm.hdfs.common.rotation.RotationAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
 * Created by lanbo on 15-2-13.
 */
public class CopyFileAction implements RotationAction {
    private static final Logger LOG = LoggerFactory.getLogger(CopyFileAction.class);

    private String destination;

    public CopyFileAction toDestination(String destDir){
        destination = destDir;
        return this;
    }

    @Override
    public void execute(FileSystem fileSystem, Path filePath) throws IOException {
        Path destPath = new Path(destination, filePath.getName());
        LOG.info("Moving file {} to {}", filePath, destPath);
        InputStream ins = fileSystem.open(filePath);
        OutputStream os = fileSystem.create(destPath);
        try{
            IOUtils.copy(ins, os);
            fileSystem.delete(filePath,true);
        }catch(Exception e){
            throw new IOException(e);
        }finally{
            ins.close();
            os.close();

        }

        return;
    }
}

这个类简单的就能实现移动文件夹,以后会陆续更新很多更有用的工具

时间: 2024-10-08 14:15:13

storm-hdfs RotationActions 接口用法的相关文章

(实用篇)php支付宝接口用法分析

本文实例讲述了php支付宝接口用法.分享给大家供大家参考.具体分析如下: 现在流行的网站支持平台,支付宝当仁不让的老大了,现在我们就来告诉你如何使用支付宝api来做第三方支付,把支付宝放到自己网站来, alipay_config.php配置程序如下: <?php */ //alipay_config.php 配置程序 $interfaceurl = "https://www.alipay.com/payto:"; $sitename = "网站名称"; $we

Android中Parcelable与Serializable接口用法

转自: Android中Parcelable接口用法 1. Parcelable接口 Interface for classes whose instances can be written to and restored from a Parcel. Classes implementing the Parcelable interface must also have a static field called CREATOR, which is an object implementing

Kafka+Storm+HDFS 整合示例

消息通过各种方式进入到Kafka消息中间件,比如可以通过使用Flume来收集日志数据,然后在Kafka中路由暂存,然后再由实时计算程序Storm做实时分析,最后将结果保存在HDFS中,这时我们就需要将在Storm的Spout中读取Kafka中的消息,然后交由具体的Spot组件去分析处理.下面开发一个简单WordCount示例程序,从Kafka读取订阅的消息行,通过空格拆分出单个单词,然后再做词频统计计算,最后将结果保存至HDFS. 1. kafka程序 package com.dxss.stor

Java语言Socket接口用法详解

Socket接口用法详解   在Java中,基于TCP协议实现网络通信的类有两个,在客户端的Socket类和在服务器端的ServerSocket类,ServerSocket类的功能是建立一个Server,并通过accept()方法随时监听客户端的连接请求. 扩展: ServerSocket中常用的构造函数及方法 构造函数:ServerSocket(int port) 这是一个构造方法,用于在当前的服务器默认的IP地址上监听一个指定的端口,即在指定的IP和端口创建一个ServerSocket对象

Kafka+Storm+HDFS整合实践

在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实时的需求Hive就不合适了.实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处理.为了统一离线和实时计算,一般情况下,我们都希望将离线和实时计算的数据源的集合统一起来作为输入,然后将数据的流向分别经由实时系统和离线分析系统,分别进行分析处理,这时我们可以考虑将数据源(如使用Flume收集日志)直接连接

[转载] Kafka+Storm+HDFS整合实践

转载自http://www.tuicool.com/articles/NzyqAn 在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实时的需求Hive就不合适了.实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处理.为了统一离线和实时计算,一般情况下,我们都希望将离线和实时计算的数据源的集合统一起来作为输入,然后将数据的流向分别经由实时系统和离线分析

大数据架构:flume-ng+Kafka+Storm+HDFS 实时系统组合

做软件开发的都知道模块化思想,这样设计的原因有两方面: 一方面是可以模块化,功能划分更加清晰,从"数据采集--数据接入--流失计算--数据输出/存储" 1).数据采集 负责从各节点上实时采集数据,选用cloudera的flume来实现 2).数据接入 由于采集数据的速度和数据处理的速度不一定同步,因此添加一个消息中间件来作为缓冲,选用apache的kafka 3).流式计算 对采集到的数据进行实时分析,选用apache的storm 4).数据输出 对分析后的结果持久化,暂定用mysql

转:大数据架构:flume-ng+Kafka+Storm+HDFS 实时系统组合

虽然比较久,但是这套架构已经很成熟了,记录一下 一般数据流向,从“数据采集--数据接入--流失计算--数据输出/存储”<ignore_js_op> 1).数据采集 负责从各节点上实时采集数据,选用cloudera的flume来实现 2).数据接入 由于采集数据的速度和数据处理的速度不一定同步,因此添加一个消息中间件来作为缓冲,选用apache的kafka 3).流式计算 对采集到的数据进行实时分析,选用apache的storm 4).数据输出 对分析后的结果持久化,暂定用mysql 另一方面是

[转]flume-ng+Kafka+Storm+HDFS 实时系统搭建

http://blog.csdn.net/weijonathan/article/details/18301321 一直以来都想接触Storm实时计算这块的东西,最近在群里看到上海一哥们罗宝写的Flume+Kafka+Storm的实时日志流系统的搭建文档,自己也跟着整了一遍,之前罗宝的文章中有一些要注意点没提到的,以后一些写错的点,在这边我会做修正:内容应该说绝大部分引用罗宝的文章的,这里要谢谢罗宝兄弟,还有写这篇文章@晨色星空J2EE也给了我很大帮助,这里也谢谢@晨色星空J2EE 之前在弄这个