Hadoop实战-Flume之自定义Sink(十九)

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySinks extends AbstractSink implements Configurable {

    private static final Logger logger = LoggerFactory.getLogger(MySinks.class);

    private static final String PROP_KEY_ROOTPATH = "fileName";

    private String fileName;

    @Override
    public Status process() throws EventDeliveryException {
        // TODO Auto-generated method stub

        Channel ch = getChannel();
        Transaction txn = ch.getTransaction();
        Event event = null;
        txn.begin();
        while (true) {
            event = ch.take();
            if (event != null) {
                break;
            }
        }
        try {
            logger.debug("Get event.");
            String body = new String(event.getBody());
            System.out.println("event.getBody()-----" + body);
            String res = body + ":" + System.currentTimeMillis() + "\r\n";
            File file = new File(fileName);
            FileOutputStream fos = null;
            try {
                fos = new FileOutputStream(file, true);
            } catch (FileNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            try {
                fos.write(res.getBytes());
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            try {
                fos.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            txn.commit();
            return Status.READY;

        } catch (Throwable th) {
            txn.rollback();
            if (th instanceof Error) {
                throw (Error) th;
            } else {
                throw new EventDeliveryException(th);
            }
        } finally {
            txn.close();
        }
    }

    @Override
    public void configure(Context context) {
        // TODO Auto-generated method stub
        fileName = context.getString(PROP_KEY_ROOTPATH);
    }

}
时间: 2024-12-28 08:30:50

Hadoop实战-Flume之自定义Sink(十九)的相关文章

Hadoop实战-Flume之Hdfs Sink(十)

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink #a1.sinks.k1.type = logger a1.sinks.k1.type = hdfs a1.sinks.k1.hd

Hadoop实战-Flume之自定义Source(十八)

import java.nio.charset.Charset; import java.util.HashMap; import java.util.Random; import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; im

Hadoop实战-Flume之Source replicating(十四)

a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type =file_roll a1.sinks.k1.sink.directory=/hom

Hadoop实战-Flume之Source regex_extractor(十二)

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 a1.sources.r1.interceptors = i1 i2 i3 i4 a1.sources.r1.interceptors.i4.type = timesta

【Flume】自定义sink kafka,并编译打包jar,unapproval license的问题解决

如图所示,新建一个JAVA工程,编辑pom文件,pom文件内容如下[这里取出了parent]: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLoca

flume中自定义sink InterCeptor

SinkProcessor: ============================ FailOver: Load balancing : //负载均衡处理器 //round_robin 轮询 1-2-3-1-2-3-... //random 随机 1-3-2-3-1-... 1.round_robin 轮询 1-2-3-1-2-3-... 2.random 随机: 自定义Sink && InterCeptor ======================================

Android实战简易教程-第三十九枪(第三方短信验证平台Mob和验证码自动填入功能结合实例)

用户注册或者找回密码时一般会用到短信验证功能,这里我们使用第三方的短信平台进行验证实例. 我们用到第三方短信验证平台是Mob,地址为:http://mob.com/ 一.注册用户.获取SDK 大家可以自行注册,得到APPKEY和APPSECRET,然后下载SDK,包的导入方式如截图: 二.主要代码 SMSSendForRegisterActivity.java:(获取验证码页) package com.qiandaobao.activity; import java.util.regex.Mat

Hadoop实战-Flume之Hello world(九)

环境介绍: 主服务器ip:192.168.80.128 1.准备apache-flume-1.7.0-bin.tar文件 2.上传到master(192.168.80.128)服务器上 3.解压apache-flume-1.7.0-bin.tar tar -zxvf apache-flume-1.7.0-bin.tar 4.进入到Flume的配置文件目录 cd /apache-flume-1.7.0-bin/conf 5.修改apache-flume-1.7.0-bin/conf的里面内容 a1

Hadoop实战-Flume之Sink Load-balancing(十七)

a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type =file_roll a1.sinks.k1.sink.directory=/home/c