flume 日志搬家上半场

开一个监听持续间断的获取某个日志的续写的信息,并传入sink中,在flume默认的组建中并没用这样的功能,只能自己根据业务就行开发,下面flume获得source信息

概要:首先 我们在获得持续输出的日志并创建一个文件中记录我们获取这个日志的信息变化的位置,根据这个位置文件来完成,我们需要的断点续传功能.

所谓日志搬家我们必须要知道这个日志 是哪里来要搬到哪里去 这里是source我只做在哪里来,

首先上三段代码

package org.jueshizhanhun.flume.source.filemonitor;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import org.apache.log4j.Logger;

import com.google.common.base.Preconditions;
import com.ule.flume.util.Constants;

public class FileMonitorSource extends AbstractSource implements Configurable, EventDrivenSource {

    private static final Logger log =  Logger.getLogger("sourcelog");
    private String path = "";//设置本机ip
    private long rollInterval = 10;//启动监听获取信息时间间隔
    private boolean record = true;//是否开启获得IP和日志文件名 开关
    
    private ChannelProcessor channelProcessor;
    private RandomAccessFile monitorFile = null;
    private File coreFile = null;
    private long lastMod = 0L;
    private String monitorFilePath = null;
    private String monitorFileName = null;
    private String positionFile = null;
    private FileChannel monitorFileChannel = null;
    private ByteBuffer buffer = ByteBuffer.allocate(1 << 20);// 1MB
    private long positionValue = 0L;
    private ScheduledExecutorService executor;
    private FileMonitorThread runner;
    private PositionLog positionLog = null;
    private Charset charset = null;
    private CharsetDecoder decoder = null;
    private CharBuffer charBuffer = null;
    private long counter = 0L;
    private Map<String, String> headers = new HashMap<String, String>();// event
                                                                        // header
    private long lastFileSize = 0L;
    private long nowFileSize = 0L;

    private SourceCounter sourceCounter;

    @Override
    public synchronized void start() {
        channelProcessor = getChannelProcessor();
        executor = Executors.newSingleThreadScheduledExecutor();
        runner = new FileMonitorThread();
        executor.scheduleWithFixedDelay(runner, 0, rollInterval, TimeUnit.SECONDS);
        sourceCounter.start();
        super.start();
        log.info("FileMonitorSource source started");
    }

    @Override
    public synchronized void stop() {
        positionLog.setPosition(positionValue);
        log.debug("Set the positionValue {} when stopped : " +  positionValue);
        if (this.monitorFileChannel != null) {
            try {
                this.monitorFileChannel.close();
            } catch (IOException e) {
                log.error(this.monitorFilePath + " filechannel close Exception", e);
            }
        }
        if (this.monitorFile != null) {
            try {
                this.monitorFile.close();
            } catch (IOException e) {
                log.error(this.monitorFilePath + " file close Exception", e);
            }
        }
        executor.shutdown();
        try {
            executor.awaitTermination(10L, TimeUnit.SECONDS);
        } catch (InterruptedException ex) {
            log.error("Interrupted while awaiting termination", ex);
        }
        executor.shutdownNow();
        sourceCounter.stop();
        super.stop();
        log.info("FileMonitorSource source stopped");
    }

    @Override
    public void configure(Context context) {
        charset = Charset.forName("UTF-8");
        decoder = charset.newDecoder();
        //需要监测的文件
        this.monitorFilePath = context.getString("file");
        this.rollInterval = context.getInteger("rollInterval",10);
        this.path = context.getString("path", catchLocalIP().toString());
        this.record = context.getBoolean("record",true);
        //需要监测的文件的所在路径
        this.positionFile =  monitorFilePath + ".position";
        Preconditions.checkArgument(monitorFilePath != null, "The file can not be null !");
        try {
            for (String name : monitorFilePath.split("/")) {
                monitorFileName = name;
            }
        } catch (Exception e) {
            log.error("获得监控文件名称失败!",e);
        }
        try {
            //获得这个文件
            coreFile = new File(monitorFilePath);
            //获得文件的最后修改时间
            lastMod = coreFile.lastModified();
        } catch (Exception e) {
            log.error("Initialize the File/FileChannel Error", e);
            return;
        }
        
        try {
            positionLog = new PositionLog(positionFile);
            //获得当前位置文件中的数值
            positionValue = positionLog.initPosition();
        } catch (Exception e) {
            log.error("Initialize the positionValue in File positionLog", e);
            return;
        }
        lastFileSize = positionValue;
        if (sourceCounter == null) {
            sourceCounter = new SourceCounter(getName());
        }
    }

    class FileMonitorThread implements Runnable {

        /**
         * a thread to check whether the file is modified
         */
        @Override
        public void run() {
              {
                log.info("FileMonitorThread running ...");
                // coreFile = new File(monitorFilePath);
                long nowModified = coreFile.lastModified();
                // the file has been changed
                if (lastMod != nowModified) {
                    log.debug(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>File modified ...");
                    // you must record the last modified and now file size as
                    // soon
                    // as possible
                    lastMod = nowModified;
                    nowFileSize = coreFile.length();
                    int readDataBytesLen = 0;
                    try {
                        log.debug("The LastlastFileSize:" +lastFileSize+" nowFileSize :"+nowFileSize);
                        // it indicated the file is rolled by log4j
                        if (nowFileSize <= lastFileSize) {
                            log.debug("The file size is changed to be lower,it indicated that the file is rolled by log4j.");
                            positionValue = 0L;
                        }
                        lastFileSize = nowFileSize;
                        monitorFile = new RandomAccessFile(coreFile, "r");
                        // you must be instantiate the file channel Object when
                        // the
                        // file
                        // changed
                        monitorFileChannel = monitorFile.getChannel();
                        monitorFileChannel.position(positionValue);
                        // read file content into buffer
                        int bytesRead = monitorFileChannel.read(buffer);
                        // this while for it can not read all the data when the
                        // file
                        // modified
                        while (bytesRead != -1) {
                            log.debug("How many bytes read in this loop ? -->  {} = "+ bytesRead);
                            String contents = buffer2String(buffer);
                            // every read,the last byte is \n,this can make sure
                            // the
                            // integrity of read data
                            // include the \n
                            int lastLineBreak = contents.lastIndexOf("\n") + 1;
                            String readData = contents.substring(0, lastLineBreak);
                            byte[] readDataBytes = readData.getBytes();
                            readDataBytesLen = readDataBytes.length;
                            positionValue += readDataBytesLen;
                            // change the position value for next read
                            String infoString ="";
                            int infoLength = 0;
                            if (record) {
                                infoString = Constants.SPLIT+path+Constants.SPLIT+monitorFileName;
                                infoLength = infoString.getBytes().length; 
                                log.info("infoString:" + infoString);
                                log.debug("data: "+(new String(readDataBytes) + infoString)+" end");
                            }

                            monitorFileChannel.position(positionValue+infoLength);
//                            headers.put(Constants.KEY_DATA_SIZE, String.valueOf(readDataBytesLen));
//                            headers.put(Constants.KEY_DATA_LINE, String.valueOf(readData.split("\n")));
                            sourceCounter.incrementEventReceivedCount();
//                            channelProcessor.processEvent(EventBuilder.withBody(readDataBytes,headers));
                            if (record) {
                                channelProcessor.processEvent(EventBuilder.withBody((new String(readDataBytes) + infoString).getBytes()));
                            }else {
                                channelProcessor.processEvent(EventBuilder.withBody(readDataBytes));
                            }
                            sourceCounter.addToEventAcceptedCount(1);
                            // channelProcessor.processEventBatch(getEventByReadData(readData));
                            log.debug("Change the next read position {} = "+ positionValue);
                            buffer.clear();
                            bytesRead = monitorFileChannel.read(buffer);
                        }
                    } catch (Exception e) {
                        log.error("Read data into Channel Error", e);
                        log.debug("Save the last positionValue {} into Disk File = positionValue- readDataBytesLen :"+ (positionValue- readDataBytesLen));
                        positionLog.setPosition(positionValue - readDataBytesLen);
                    }
                    counter++;
                    if (counter % Constants.POSITION_SAVE_COUNTER == 0) {
                        log.debug(
                                Constants.POSITION_SAVE_COUNTER
                                        + " times file modified checked,save the position Value {} into Disk file = "+"positionValue:"+
                                positionValue);
                        positionLog.setPosition(positionValue);
                    }
                }
            }
        }

    }

    public List<Event> getEventByReadData(String readData) {
        String str[] = readData.split("\n");
        int len = str.length;
        List<Event> events = new ArrayList<Event>();
        for (int i = 0; i < len; i++) {
            Event event = EventBuilder.withBody((str[i]).getBytes());
            events.add(event);
        }
        return events;
    }

    public String buffer2String(ByteBuffer buffer) {
        buffer.flip();
        try {
            charBuffer = decoder.decode(buffer);
            return charBuffer.toString();
        } catch (Exception ex) {
            ex.printStackTrace();
            return "";
        }
    }
    //取得本机IP地址  
    public static String catchLocalIP()  
    {  
        InetAddress LocalIP =null;
        try   
        {  
            LocalIP = InetAddress.getLocalHost(); 
        } catch (UnknownHostException e)   
        {  
              
        }  
        return LocalIP.getHostAddress();  
          
    }  
}
package org.jueshizhanhun.flume.source.filemonitor;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PositionLog {

    private static final Logger log = LoggerFactory
            .getLogger(PositionLog.class);
    
    private FileChannel positionFileChannel;
    private String postionFile;
    private RandomAccessFile raf = null;
    private String filePath = null;
    public FileChannel getPositionFileChannel() {
        return positionFileChannel;
    }
    public void setPositionFileChannel(FileChannel positionFileChannel) {
        this.positionFileChannel = positionFileChannel;
    }
    public String getPostionFilePath() {
        return postionFile;
    }
    public void setPostionFilePath(String postionFilePath) {
        this.postionFile = postionFilePath;
    }
    
    public PositionLog() {
    }
    public PositionLog(String postionFilePath) {
        this.postionFile = postionFilePath;
    }
    public long initPosition() throws Exception {

        filePath = postionFile;
        File file = new File(filePath);
        if (!file.exists()) {
            try {
                file.createNewFile();
                log.debug("Create the postionFile file");
            } catch (IOException e) {
                log.error("Create the postionFile error", e);
                throw e;
            }
        }
        
        try {
            //使用找个文件的权限   rw  读写权限
            raf = new RandomAccessFile(filePath, "rw");
            //获得文件的通道
            this.positionFileChannel =raf.getChannel();
            long fileSize = positionFileChannel.size();
            if(fileSize==0) {
                log.debug("The file content is null,init the value 0");
                ByteBuffer buffer = ByteBuffer.allocate(1);
                buffer.put("0".getBytes());
                buffer.flip();
                positionFileChannel.write(buffer);
                raf.close();
                return 0L;
            }else {
                return getPosition();
            }
        } catch (Exception e) {
            log.error("Init the position file error",e);
            throw e;
        } 
    }
    
    public long getPosition() {
        try {
            raf = new RandomAccessFile(filePath, "rw");
            this.positionFileChannel =raf.getChannel();
            long fileSize = positionFileChannel.size();
            ByteBuffer buffer = ByteBuffer.allocate((int) fileSize);
            int bytesRead = positionFileChannel.read(buffer);
            StringBuffer sb = new StringBuffer();
            while(bytesRead!=-1) {
                buffer.flip();
                while(buffer.hasRemaining()) {
                    sb.append((char)buffer.get());
                }
                buffer.clear();
                bytesRead = positionFileChannel.read(buffer);
            }
            raf.close();
            return Long.parseLong(sb.toString());
        } catch (Exception e) {
            log.error("Get Position Value Error",e);
            return -1;
        }
    }
    
    public long setPosition(Long position) {
        try {
            raf = new RandomAccessFile(filePath, "rw");
            this.positionFileChannel =raf.getChannel();
            String positionStr = String.valueOf(position);
            int bufferSize = positionStr.length();
            ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
            buffer.clear();
            buffer.put(positionStr.getBytes());
            buffer.flip();
            while(buffer.hasRemaining()) {
                this.positionFileChannel.write(buffer);
            }
            raf.close();
            log.debug("Set Position Value Successfully {}",position);
            return position;
        } catch (Exception e) {
            log.error("Set Position Value Error",e);
            return -1;
        }
    }
    
}
package org.jueshizhanhun.flume.util;

public class Constants {

    public static String SPLIT = ":jbgsn:";
    
    public static long POSITION_INIT_VALUE = 0L;
    
    public static String KEY_DATA_SIZE = "readDataSize";
    
    public static String KEY_DATA_LINE = "readDataLine";
    
    public static int POSITION_SAVE_COUNTER = 10;
    
}

获取文件的核心代码每间隔一段时间扫描一次日志文件将变动的信息获得,并添加标记信息,在传输到sink中

#这里要写自己的类
home.sources.s1.type = org.jueshizhanhun.flume.source.filemonitor.FileMonitorSource
home.sources.s1.channels = c1
#自定义 监控文件日志的全路径
home.sources.s1.file = /tmp/flume/hostnamelog.log
#自定义 搬家后需要创建的文件路径
home.sources.s1.path = pub/message/172.24.138.40
#home.sources.s1.rollInterval = 30      //间隔多少秒 获取一次日志信息 默认10秒
#home.sources.s1.record = true       //获取日志信息后 是否添加ip与日志名称信息 默认开启
时间: 2024-12-10 21:41:59

flume 日志搬家上半场的相关文章

flume 日志搬家下半场

续上面 获得资源后我们要将转换为相应的日志,落在统一的服务器中. 在flume中的对file操作的sink只有RollingFileSink但这个对我们来一点用都没有, package com.ule.flume.sink.file; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.i

【转】Flume日志收集

from:http://www.cnblogs.com/oubo/archive/2012/05/25/2517751.html Flume日志收集 一.Flume介绍 Flume是一个分布式.可靠.和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据:同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力. 设计目标: (1) 可靠性 当节点出现故障时,日志能够被传送到其他节点上而不会丢失.Flume提供了三种级别的可靠性保障,从强到弱依次分别为:e

scribe、chukwa、kafka、flume日志系统对比

scribe.chukwa.kafka.flume日志系统对比 1. 背景介绍许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等),处理 这些日志需要特定的日志系统,一般而言,这些系统需要具有以下特征:(1) 构建应用系统和分析系统的桥梁,并将它们之间的关联解耦:(2) 支持近实时的在线分析系统和类似于Hadoop之类的离线分析系统:(3) 具有高可扩展性.即:当数据量增加时,可以通过增加节点进行水平扩展. 本文从设计架构,负载均衡,可扩展性和容错性等方面对比了当今开

flume日志采集框架使用

flume日志采集框架使用 本次学习使用的全部过程均不在集群上,均在本机环境,供学习参考 先决条件: flume-ng-1.6.0-cdh5.8.3.tar  去cloudrea下载flume框架,笔者是用cdh5.8.3的套餐 flume的使用环境: 采集特定目录到hdfs环境以供分析离线数据 监听特定端口的socket流数据 本次将以上两种情况的使用加以记录供以后参考 解压 flume-ng-1.6.0-cdh5.8.3.tar mv flume-ng-1.6.0-cdh5.8.3 flum

Flume日志收集系统架构详解--转

2017-09-06 朱洁 大数据和云计算技术 任何一个生产系统在运行过程中都会产生大量的日志,日志往往隐藏了很多有价值的信息.在没有分析方法之前,这些日志存储一段时间后就会被清理.随着技术的发展和分析能力的提高,日志的价值被重新重视起来.在分析这些日志之前,需要将分散在各个生产系统中的日志收集起来.本节介绍广泛应用的Flume日志收集系统. 一.概述 Flume是Cloudera公司的一款高性能.高可用的分布式日志收集系统,现在已经是Apache的顶级项目.同Flume相似的日志收集系统还有F

Flume日志收集(转载)

Flume日志收集 一.Flume介绍 Flume是一个分布式.可靠.和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据:同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力. 设计目标: (1) 可靠性 当节点出现故障时,日志能够被传送到其他节点上而不会丢失.Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end-to-end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除:如果数据发送失败,可以重新发送.),S

大数据flume日志采集系统详解

一.flume介绍 flume 是一个cloudera提供的 高可用高可靠,分布式的海量日志收集聚合传输系统.Flume支持日志系统中定制各类数据发送方,用于收集数据.同时flume提供对数据进行简单处理,并写到各种数据接收方(可定制)的能力. 二.功能介绍   日志收集 Flume最早是Cloudera提供的日志收集系统,目前是Apache下的一个孵化项目,Flume支持在日志系统中定制各类数据发送方,用于收集数据. 流程:恒生数据接收中间件---file.txt  哪个端口进行监控 ---

FLUME日志收集

一.FLUME介绍 Flume是一个分布式.可靠.和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据:同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力. 设计目标: (1) 可靠性 当节点出现故障时,日志能够被传送到其他节点上而不会丢失.Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end-to-end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除:如果数据发送失败,可以重新发送.),Store on fa

Flume日志收集系统介绍

转自:http://blog.csdn.net/a2011480169/article/details/51544664 在具体介绍本文内容之前,先给大家看一下Hadoop业务的整体开发流程: 从Hadoop的业务开发流程图中可以看出,在大数据的业务处理过程中,对于数据的采集是十分重要的一步,也是不可避免的一步,从而引出我们本文的主角-Flume.本文将围绕Flume的架构.Flume的应用(日志采集)进行详细的介绍. (一)Flume架构介绍 1.Flume的概念 flume是分布式的日志收集