java实时监听日志写入kafka(转)

原文链接:http://www.sjsjw.com/kf_cloud/article/020376ABA013802.asp

目的

实时监听某目录下的日志文件,如有新文件切换到新文件,并同步写入kafka,同时记录日志文件的行位置,以应对进程异常退出,能从上次的文件位置开始读取(考虑到效率,这里是每100条记一次,可调整)

源码:

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.LineNumberReader;
import java.io.PrintWriter;
import java.io.RandomAccessFile;
import java.net.NoRouteToHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;  

/*
 * 自己在源服务器写生产者往kafka插入数据,注意文件"producer.properties放在linux下该jar文件同一目录
 * 监听某个目录下的文件数据然后写入kafka
 * nohup java -jar portallog_producer.jar portallog /var/apache/logs portallog.position  >/home/sre/portalhandler/handler.log 2>&1 &
 *
 *
 */
public class PortalLogTail_Line {  

    private Producer<String,String> inner;
    java.util.Random ran = new Random();
    public PortalLogTail_Line() throws FileNotFoundException, IOException {
        Properties properties = new Properties();
     //   properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));  

        properties.load(new FileInputStream("producer.properties"));  

        ProducerConfig config = new ProducerConfig(properties); 

        inner = new Producer<String, String>(config);  

    }  

    public void send(String topicName,String message) {
        if(topicName == null || message == null){
            return;
        }
     //   KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);
        //随机作为key,hash分散到各个分区
      KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,String.valueOf(ran.nextInt(9)),message);
     //   KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message,message);
        inner.send(km);  

    }  

    public void send(String topicName,Collection<String> messages) {
        if(topicName == null || messages == null){
            return;
        }
        if(messages.isEmpty()){
            return;
        }
        List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
        for(String entry : messages){
            KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);
            kms.add(km);
        }
        inner.send(kms);
    }  

    public void close(){
        inner.close();
    }  

    public String getNewFile(File file)
    {
        File[] fs=file.listFiles();
        long maxtime=0;
        String newfilename="";
        for (int i=0;i<fs.length;i++)
        {
            if (fs[i].lastModified()>maxtime && fs[i].getName().contains("access"))
            {
                maxtime=fs[i].lastModified();
                newfilename=fs[i].getAbsolutePath();

            }
        }
        return newfilename;
    }
      //写入文件名及行号
    public void writePosition(String path,int rn,String positionpath)
    {
        try {
               BufferedWriter out = new BufferedWriter(new FileWriter(positionpath));
               out.write(path+","+rn);
               out.close();
        } catch (IOException e) {
        }
    }
    LineNumberReader randomFile=null;
     String newfile=null;
     String thisfile=null;
     String prefile=null;
     int ln=0;
     int beginln=0;
    public void realtimeShowLog(final File file,final String topicname, final String positionpath) throws IOException{     

        //启动一个线程每1秒钟读取新增的日志信息
       new Thread(new Runnable(){
            public void run() {
                   thisfile=getNewFile(file);
                 prefile=thisfile;
                 //访问position文件,如果记录了文件路径,及行号,则定位,否则使用最新的文件
                 try {
                     BufferedReader br=new BufferedReader(new FileReader(positionpath));
                     String line=br.readLine();
                     if (line!=null &&line.contains(","))
                     {
                         thisfile=line.split(",")[0];
                          prefile=thisfile;
                          beginln=Integer.parseInt(line.split(",")[1]);
                     }

                 } catch (FileNotFoundException e2) {
                     // TODO Auto-generated catch block
                     e2.printStackTrace();
                 }
                  catch (IOException e2) {
                         // TODO Auto-generated catch block
                         e2.printStackTrace();
                     }

                 //指定文件可读可写
                     try {
                         randomFile = new LineNumberReader(new FileReader(thisfile));
                     } catch (FileNotFoundException e) {
                         // TODO Auto-generated catch block
                         e.printStackTrace();
                     }
              while (true)
              {
                  try {
                     Thread.sleep(100);

                 } catch (InterruptedException e1) {
                     // TODO Auto-generated catch block
                     e1.printStackTrace();
                 }
                  try {
                      //获得变化部分的
                    //  randomFile.seek(lastTimeFileSize);
                      String tmp = "";
                      while( (tmp = randomFile.readLine())!= null) {
                          int currln=randomFile.getLineNumber();
                          //beginln默认为0
                          if (currln>beginln)
                              send(topicname,new String(tmp.getBytes("utf8")));

                          ln++;

                          //每发生一条写一次影响效率,连续发100次后再记录位置
                          if (ln>100)
                              {
                              writePosition(thisfile,currln,positionpath);
                              ln=0;
                              }

                      }
                     thisfile=getNewFile(file);
                     if(!thisfile.equals(prefile))

                     {
                         randomFile.close();
                         randomFile = new LineNumberReader(new FileReader(thisfile));
                        prefile=thisfile;
                       beginln=0;
                     }

                  } catch (IOException e) {
                      throw new RuntimeException(e);
                  }
              }
        }}).start();
    }     

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        PortalLogTail_Line producer = new PortalLogTail_Line();
        if (args.length!=3)
        {
            System.out.println("usage:topicname pathname positionpath");
            System.exit(1);
        }
        String topicname=args[0];
        String pathname=args[1];
        String positionpath=args[2];
        final File tmpLogFile = new File(pathname);
        producer.realtimeShowLog(tmpLogFile,topicname,positionpath); 

    }  

} 
producer.properties文件放在同级目录下

metadata.broker.list=xxx:10909,xxx:10909

# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=

# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=sync
#producer.type=async

# specify the compression codec for all data generated: none , gzip, snappy.
# the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally
compression.codec=none
#compression.codec=gzip

# message encoder
serializer.class=kafka.serializer.StringEncoder

测试

最后执行:

 nohup java -jar portallog_producer.jar portallog /var/apache/logs portallog.position  >/home/sre/portalhandler/handler.log 2>&1 &
时间: 2024-10-11 07:55:45

java实时监听日志写入kafka(转)的相关文章

java实时监听日志写入kafka

目的 实时监听某目录下的日志文件,如有新文件切换到新文件,并同步写入kafka,同时记录日志文件的行位置,以应对进程异常退出,能从上次的文件位置开始读取(考虑到效率,这里是每100条记一次,可调整) 源码: [java] view plain copy import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileInputStream; import j

java实时监听日志写入kafka(多目录)

目的 实时监听多个目录下的日志文件,如有新文件切换到新文件,并同步写入kafka,同时记录日志文件的行位置,以应对进程异常退出,能从上次的文件位置开始读取(考虑到效率,这里是每100条记一次,可调整) 源码 [java] view plain copy import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileInputStream; import j

利用Maxwell组件实时监听Mysql的binlog日志

一:在linux环境下安装部署好mysql 开启binlog Vim  /etc/my.cnf mysql的binlog格式有3种,为了把binlog解析成json数据格式,要设置binlog的格式为row(binlog有三种格式:Statement.Row以及Mixed) 3重启msyql服务 service mysqld restart 查看是否已经开启binlog 查看是否已经开启binlog 是否改为row模式 /var/lib/mysql下看到生成了相应的binlog监听日志文件,如图

ORACLE清理、截断监听日志文件(listener.log)

在ORACLE数据库中,如果不对监听日志文件(listener.log)进行截断,那么监听日志文件(listener.log)会变得越来越大,想必不少人听说过关于"LISTENER.LOG日志大小不能超过2GB,超过会导致LISTENER监听器无法处理新的连接",当然这个不是真理,不会绝对出现,只是发生在老旧的32bit Linux或Unix系统下面,真实的原因是一些32bit OS自带的文件系统不支持2GB以上的文件,导致监听服务进程(tnslsnr)append write日志文件

sql server 警报管理,实时监听数据库动向,运筹帷幄之中

原文:sql server 警报管理,实时监听数据库动向,运筹帷幄之中 工作这么多年了,无论是身边的同学还是同事,发现只要搞程序员的都有一个通病---懒.懒到谁都不愿意加班,尤其是"义务"加班.即使大家都不愿意加班,但是很多时候项目赶着上线或者上线之后出错啊什么的,总得有人看着,这时候就诞生了一种新的工作制度,叫做7*24.顾名思义就是这种岗位实时都得有人看着,这确实是一件让人头疼的事情.虽然说在项目刚上线不可避免的得有7*24,但是我们可以尽量减少7*24的工作量(ps:因为7*24

Rsync+Inotify实时监听备份

说明,下面的inotify是建立在rsync的配置过程 大前提是rsync daemon 配置成功,rsync配置看上一遍博文,在客户端可以推拉数据,然后才能配置inotify服务----inotify是在客户端安装,监听需要备份的目录,然后推送到服务端 查看当前系统是否支持inotify [[email protected] bier]# uname -r 2.6.32-431.el6.i686 [[email protected] bier]# ls -l /proc/sys/fs/inot

有关监听日志的清理问题

近日,有开发人员向我反馈:“代码时有连不上数据库的情况发生”.在了解了一些基本信息之后,希望能通过查看监听日志获取问题线索.首先是通过如下方式确定监听日志的存放路径: LSNRCTL for Linux: Version 10.2.0.4.0 - Production on 21-JUN-2016 21:19:56 Copyright (c) 1991, 2007, Oracle. All rights reserved. Connecting to (ADDRESS=(PROTOCOL=tcp

移动端用js与jquery实时监听输入框值的改动

背景: 在一次移动端H5开发中,需要监听输入框值的实时变动. onchange事件肯定抛弃,因为只能失去焦点才触发. 而keyPress在Android可以触发,iOS不可以. 又不想用Android和iOS都可以触发的keyDown和keyUp. 于是,百度出了新东西:oninput![需要配合propertychange,兼容 IE9 以下版本] 用法: JS: if(isIE) { document.getElementById("input").onpropertychange

我所不知道的的监听日志问题

今天,有开发人员向我反馈:“代码时有连不上数据库的情况发生”.在了解了一些基本情况之后,希望能通过查看监听日志获取问题线索.首先是通过如下方式确定监听日志的存放路径: LSNRCTL for Linux: Version 10.2.0.4.0 - Production on 21-JUN-2016 21:19:56 Copyright (c) 1991, 2007, Oracle. All rights reserved. Connecting to (ADDRESS=(PROTOCOL=tcp