flume读取日志文件并存储到HDFS

配置hadoop环境

配置flume环境

配置flume文件

D:\Soft\apache-flume-1.8.0-bin\conf

将 flume-conf.properties.template 重新命名为  hdfs.properties

# 组装 agent
a1.sources = s1
a1.channels = c1
a1.sinks = k1

# 配置source:从目录中读取文件
a1.sources.s1.type = spooldir
a1.sources.s1.channels = c1
a1.sources.s1.spoolDir = E:\log2s
# 包括所有日志文件
a1.sources.s1.includePattern=^.*$
# 忽略当前正在写入的日志文件
a1.sources.s1.ignorePattern=^.*log$
a1.sources.s1.deletePolicy=never
a1.sources.s1.fileHeader = true
## 增加时间header
a1.sources.s1.interceptors=i1
a1.sources.s1.interceptors.i1.type=timestamp

# 配置channel:缓存到文件中
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

# 配置sink:保存到hdfs中
a1.sinks.k1.channel=c1
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://127.0.0.1:9000/flume/accesslog/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix=logs
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.hdfs.rollSize=0
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.batchSize=100
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.minBlockReplicas=1
flume启动命令

flume-ng agent --conf conf --conf-file ../conf/hdfs.properties --name a1

编写日志java程序

public class App
{
  protected static final Logger logger = Logger.getLogger(App.class);

  public static void main( String[] args )
  {
    while (true) {
    logger.info("hello world:"+ String.valueOf(new Date().getTime()));
    try {
      Thread.sleep(500);
    } catch (InterruptedException e) {
      e.printStackTrace();
      }
    }
  }
}

log4j配置

### set log levels ###
log4j.rootLogger=INFO, stdout, file

### stdout ###
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Threshold=INFO
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n

### file ###
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
# 日志路径
log4j.appender.file.file=E:/log2s/log.log
log4j.appender.file.Threshold=INFO
log4j.appender.file.Append=true
# 每分钟生成1个新文件
log4j.appender.file.DatePattern=‘.‘yyyy-MM-dd-HH-mm
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n

启动java程序生成日志

flume执行结果

07/24 17:19:27 INFO node.Application: Starting Channel c1
07/24 17:19:27 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
07/24 17:19:27 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
07/24 17:19:27 INFO node.Application: Starting Sink k1
07/24 17:19:27 INFO node.Application: Starting Source s1
07/24 17:19:27 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
07/24 17:19:27 INFO source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: E:log2s
07/24 17:19:27 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
07/24 17:19:27 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: s1: Successfully registered new MBean.
07/24 17:19:27 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: s1 started
07/24 17:19:28 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.
07/24 17:19:28 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file E:\log2s\log.log.2018-07-24-16-46 to E:\log2s\log.log.2018-07-24-16-46.COMPLETED
07/24 17:19:28 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.
07/24 17:19:28 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file E:\log2s\log.log.2018-07-24-16-47 to E:\log2s\log.log.2018-07-24-16-47.COMPLETED
07/24 17:19:28 INFO hdfs.HDFSSequenceFile: writeFormat = Text, UseRawLocalFileSystem = false
07/24 17:19:28 INFO hdfs.BucketWriter: Creating hdfs://127.0.0.1:9000/flume/accesslog/2018-07-24/logs.1532423968027.tmp
07/24 17:19:39 INFO hdfs.BucketWriter: Closing hdfs://127.0.0.1:9000/flume/accesslog/2018-07-24/logs.1532423968027.tmp
07/24 17:19:39 INFO hdfs.BucketWriter: Renaming hdfs://127.0.0.1:9000/flume/accesslog/2018-07-24/logs.1532423968027.tmp to hdfs://127.0.0.1:9000/flume/accesslog/2018-07-24/logs.1532423968027
07/24 17:19:39 INFO hdfs.HDFSEventSink: Writer callback called.
07/24 17:19:59 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.
07/24 17:19:59 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file E:\log2s\log.log.2018-07-24-16-48 to E:\log2s\log.log.2018-07-24-16-48.COMPLETED
07/24 17:20:00 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.
07/24 17:20:00 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file E:\log2s\log.log.2018-07-24-17-19 to E:\log2s\log.log.2018-07-24-17-19.COMPLETED
07/24 17:20:02 INFO hdfs.HDFSSequenceFile: writeFormat = Text, UseRawLocalFileSystem = false
07/24 17:20:02 INFO hdfs.BucketWriter: Creating hdfs://127.0.0.1:9000/flume/accesslog/2018-07-24/logs.1532424002903.tmp
07/24 17:20:13 INFO hdfs.BucketWriter: Closing hdfs://127.0.0.1:9000/flume/accesslog/2018-07-24/logs.1532424002903.tmp
07/24 17:20:13 INFO hdfs.BucketWriter: Renaming hdfs://127.0.0.1:9000/flume/accesslog/2018-07-24/logs.1532424002903.tmp to hdfs://127.0.0.1:9000/flume/accesslog/2018-07-24/logs.1532424002903
07/24 17:20:13 INFO hdfs.HDFSEventSink: Writer callback called.
07/24 17:21:00 INFO hdfs.HDFSSequenceFile: writeFormat = Text, UseRawLocalFileSystem = false
07/24 17:21:00 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.
07/24 17:21:00 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file E:\log2s\log.log.2018-07-24-17-20 to E:\log2s\log.log.2018-07-24-17-20.COMPLETED
07/24 17:21:00 INFO hdfs.BucketWriter: Creating hdfs://127.0.0.1:9000/flume/accesslog/2018-07-24/logs.1532424060382.tmp
07/24 17:21:10 INFO hdfs.BucketWriter: Closing hdfs://127.0.0.1:9000/flume/accesslog/2018-07-24/logs.1532424060382.tmp
07/24 17:21:10 INFO hdfs.BucketWriter: Renaming hdfs://127.0.0.1:9000/flume/accesslog/2018-07-24/logs.1532424060382.tmp to hdfs://127.0.0.1:9000/flume/accesslog/2018-07-24/logs.1532424060382
07/24 17:21:10 INFO hdfs.HDFSEventSink: Writer callback called.

HDFS目录

原文地址:https://www.cnblogs.com/Soy-technology/p/11658647.html

时间: 2024-08-05 04:07:07

flume读取日志文件并存储到HDFS的相关文章

实时读取日志文件

需求:在生成日志文件的过程中,需要实时读取该日志,需要每次读取时记录一次读取的位置,下一次从该位置读取 参考:http://sunnylocus.iteye.com/blog/694666 问题:由于我是每次读取几十行日志就停止一次,如果使用上面连接的代码RandomAccessFile.length()记录文件位置,发现记录的位置远大于读取的位置,这样为导致丢失一部分信息 方法1:因为日志是一直在写,一开始从0开始,可以先记录一下当前读取位置的的指针,下一次开始读的时候从该处再次读取 方法1需

Flume+hbase 日志数据采集与存储

了解过flume的人,差不多都看过这张或则类似的图片,本文即实现上图部分内容.(由于条件有限,目前是单机上实现) flume-agent配置文件 #flume agent conf source_agent.sources = server source_agent.sinks = avroSink source_agent.channels = memoryChannel source_agent.sources.server.type = exec source_agent.sources.

读取日志文件,并判断IP次数>200的加入黑名单

import timepoint=0 #第一次开始读取时从0开始while True: #服务器一直再运行所以写个死循环 ips={}#定义一个空字典存放IP用,放在循环下边是因为每次读取时都会重新定义 f = open('access.log',encoding='utf-8') f.seek(point)#每次读取时的开始位置 for line in f.readlines():#获取到所有内容,每条进行循环 ip=line.split()[0] #按照空格分隔,取到第一个元素ip if i

读取日志文件,搜索关键字,打印关键字前5行。yield、deque实例

from collections import deque def search(lines, pattern, history=5): previous_lines = deque(maxlen=history) for line in lines: if pattern in line: yield line,previous_lines previous_lines.append(line) if __name__ == '__main__': with open('log.txt','r

Flume采集处理日志文件

Flume简介 Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集.聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据:同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力. 系统功能 日志收集 Flume最早是Cloudera提供的日志收集系统,目前是Apache下的一个孵化项目,Flume支持在日志系统中定制各类数据发送方,用于收集数据. 数据处理 Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的

rsync采集二进制日志文件

本日志转载请注明出处,否则将追究责任! rsync简介 rsync是Unix下的一款应用软件,它能同步更新两处计算机的文件与目录,并适当利用差分编码以减少数据传输.rsync中一项与其他大部分类似程序或协议中所未见的重要特性是镜像对每个目标只需要一次发送.rsync可拷贝/显示目录属性,以及拷贝文件,并可选择性的压缩以及递归拷贝. 在常驻模式(daemon mode)下,rsync默认监听TCP端口873,以原生rsync传输协议或者通过远程shell如RSH或者SSH伺服文件.需要备份的终端为

mysql 的日志文件

mysql的日志文件 日志文件大致分为  error log, binary log, query log, slow query log, innodb redo log ;如图: 1.error log the error log file contains information indicating when mysqld was started and stopped also any critical errors that occur while the server is run

oracle_重做日志文件--笔记

重做日志文件(redo log file) 目录       重做日志文件相关.       重做日志文件简介.       查询重做日志文件的信息.       日志切换.       管理日志文件组       增删日志文件组.       增删日志文件成员.       归档与非归档模式. 一.重做日志文件相关.        Oracle引入重做日志的目的:数据库的恢复.    Oracle相关进程:重做日志写进程(LGWR).    重做日志性质:联机日志文件,oracle服务器运行时

遍历日志文件并打印

<?phperror_reporting(E_ALL);ini_set('display_errors', 1); function load($file){ //No.1 //开始写代码,读取日志文件 $file = fopen($file,'r'); $contents = array(); while(!feof($file)) { $contents[] = fgets($file); } return $contents;} function eliminate($lines){ $n