flume 收集日志,写入hdfs

首先安装flume:

建议和Hadoop保持统一用户来安装Hadoop,flume

本次我采用Hadoop用户安装flume

http://douya.blog.51cto.com/6173221/1860390

开始配置:

1,配置文件编写:

vim  flume_hdfs.conf

# Define a memory channel called ch1 on agent1

agent1.channels.ch1.type = memory

agent1.channels.ch1.capacity = 10000

agent1.channels.ch1.transactionCapacity = 100

#agent1.channels.ch1.keep-alive = 30

#define source monitor a file

agent1.sources.avro-source1.type = exec

agent1.sources.avro-source1.shell = /bin/bash -c

agent1.sources.avro-source1.command = tail -n +0 -F /root/logs/appcenter.log

agent1.sources.avro-source1.channels = ch1

agent1.sources.avro-source1.threads = 5

agent1.sources.avro-source1.interceptors = i1

agent1.sources.avro-source1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

# Define a logger sink that simply logs all events it receives

# and connect it to the other end of the same channel.

agent1.sinks.log-sink1.channel = ch1

agent1.sinks.log-sink1.type = hdfs

agent1.sinks.log-sink1.hdfs.path = hdfs://ns1/flume/%y%m%d

agent1.sinks.log-sink1.hdfs.writeFormat = events-

agent1.sinks.log-sink1.hdfs.fileType = DataStream

agent1.sinks.log-sink1.hdfs.rollInterval = 60

agent1.sinks.log-sink1.hdfs.rollSize = 134217728

agent1.sinks.log-sink1.hdfs.rollCount = 0

#agent1.sinks.log-sink1.hdfs.batchSize = 100000

#agent1.sinks.log-sink1.hdfs.txnEventMax = 100000

#agent1.sinks.log-sink1.hdfs.callTimeout = 60000

#agent1.sinks.log-sink1.hdfs.appendTimeout = 60000

# Finally, now that we‘ve defined all of our components, tell

# agent1 which ones we want to activate.

agent1.channels = ch1

agent1.sources = avro-source1

agent1.sinks = log-sink1

2,前提是Hadoop集群正常启动,并且可用、

3,启动flume,开始收集日志

启动:

flume-ng agent -c /usr/local/ELK/apache-flume/conf/  -f /usr/local/ELK/apache-flume/conf/flume_hdfs.conf  -Dflume.root.logger=INFO,console -n agent1

错误一:

2016-12-06 11:24:49,036 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:234)] Creating hdfs://ns1/flume//FlumeData.1480994688831.tmp

2016-12-06 11:24:49,190 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:459)] process failed

java.lang.NoClassDefFoundError: org/apache/hadoop/util/PlatformName

at org.apache.hadoop.security.UserGroupInformation.getOSLoginModuleName(UserGroupInformation.java:366)

at org.apache.hadoop.security.UserGroupInformation.<clinit>(UserGroupInformation.java:411)

at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:2828)

at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:2818)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2684)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)

at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)

at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:243)

at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:235)

at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)

at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)

at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:676)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.util.PlatformName

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

... 16 more

解决:

1,cd /usr/local/hadoop/hadoop/share/hadoop/common/

cp  hadoop-common-2.7.3.jar

2, cd /usr/local/hadoop/hadoop/share/hadoop/common/lib

hadoop-auth-2.7.3.jar

将lib 目录下此jar包复制到flume客户端的lib目录下即可

但是会出现很多报错,所以懒人做法,将lib下所有的jar包拷贝到flume端的lib下

原因:因为flume要把数据写人hdfs,所以需要依赖一些hdfs的包

启动:

flume-ng agent -c /usr/local/ELK/apache-flume/conf/  -f /usr/local/ELK/apache-flume/conf/flume_hdfs.conf  -Dflume.root.logger=INFO,console -n agent1

错误二:

2016-12-06 11:36:52,791 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:234)] Creating hdfs://ns1/flume//FlumeData.1480995177465.tmp

2016-12-06 11:36:52,793 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:455)] HDFS IO error

java.io.IOException: No FileSystem for scheme: hdfs

at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)

at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)

at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)

at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)

at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:243)

at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:235)

at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)

at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)

at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:676)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

2016-12-06 11:36:57,798 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:234)] Creating hdfs://ns1/flume//FlumeData.1480995177466.tmp

2016-12-06 11:36:57,799 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:455)] HDFS IO error

java.io.IOException: No FileSystem for scheme: hdfs

分析,此时flume端识别不到hadoop的ns1

解决方法:

cd   /usr/local/hadoop/hadoop/etc/hadoop

将core-site.xml hdfs-site.xml 拷贝到 flume端的conf下

绑定Hadoop机器的机器

vim /etc/hosts

172.16.9.250 hadoop1

172.16.9.252 hadoop2

172.16.9.253 hadoop3

此时还差一个hadoop-hdfs-2.7.3.jar

下载地址:

https://www.versioneye.com/java/org.apache.hadoop:hadoop-hdfs/2.7.3

上传至flume lib目录下

再次成功启动

flume-ng agent -c /usr/local/ELK/apache-flume/conf/  -f /usr/local/ELK/apache-flume/conf/flume_hdfs.conf  -Dflume.root.logger=INFO,console -n agent1

Info: Sourcing environment configuration script /usr/local/ELK/apache-flume/conf/flume-env.sh

Info: Including Hive libraries found via () for Hive access

+ exec /soft/jdk1.8.0_101/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp ‘/usr/local/ELK/apache-flume/conf:/usr/local/ELK/apache-flume/lib/*:/usr/local/ELK/apache-flume:/lib/*‘ -Djava.library.path= org.apache.flume.node.Application -f /usr/local/ELK/apache-flume/conf/flume_hdfs.conf -n agent1

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/usr/local/ELK/apache-flume/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/usr/local/ELK/apache-flume/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

2016-12-06 13:35:05,379 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting

2016-12-06 13:35:05,384 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:/usr/local/ELK/apache-flume/conf/flume_hdfs.conf

2016-12-06 13:35:05,391 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:log-sink1

2016-12-06 13:35:05,391 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:log-sink1

2016-12-06 13:35:05,392 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:log-sink1

2016-12-06 13:35:05,392 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:log-sink1

2016-12-06 13:35:05,392 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:931)] Added sinks: log-sink1 Agent: agent1

2016-12-06 13:35:05,392 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:log-sink1

2016-12-06 13:35:05,392 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:log-sink1

2016-12-06 13:35:05,392 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:log-sink1

2016-12-06 13:35:05,392 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:log-sink1

2016-12-06 13:35:05,403 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:141)] Post-validation flume configuration contains configuration for agents: [agent1]

2016-12-06 13:35:05,403 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:145)] Creating channels

2016-12-06 13:35:05,410 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel ch1 type memory

2016-12-06 13:35:05,417 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:200)] Created channel ch1

2016-12-06 13:35:05,418 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source avro-source1, type exec

2016-12-06 13:35:05,456 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: log-sink1, type: hdfs

2016-12-06 13:35:05,465 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:114)] Channel ch1 connected to [avro-source1, log-sink1]

2016-12-06 13:35:05,472 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{avro-source1=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:avro-source1,state:IDLE} }} sinkRunners:{log-sink1=SinkRunner: { policy:[email protected] counterGroup:{ name:null counters:{} } }} channels:{ch1=org.apache.flume.channel.MemoryChannel{name: ch1}} }

2016-12-06 13:35:05,472 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel ch1

2016-12-06 13:35:05,535 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: CHANNEL, name: ch1: Successfully registered new MBean.

2016-12-06 13:35:05,535 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: CHANNEL, name: ch1 started

2016-12-06 13:35:05,536 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink log-sink1

2016-12-06 13:35:05,540 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: SINK, name: log-sink1: Successfully registered new MBean.

2016-12-06 13:35:05,540 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SINK, name: log-sink1 started

2016-12-06 13:35:05,540 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source avro-source1

2016-12-06 13:35:05,541 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:169)] Exec source starting with command:tail -n +0 -F /opt/logs/appcenter.log

2016-12-06 13:35:05,543 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: SOURCE, name: avro-source1: Successfully registered new MBean.

2016-12-06 13:35:05,543 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: avro-source1 started

2016-12-06 13:35:05,934 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSDataStream.configure(HDFSDataStream.java:58)] Serializer = TEXT, UseRawLocalFileSystem = false

2016-12-06 13:35:06,279 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:234)] Creating hdfs://ns1/flume/161206/FlumeData.1481002505934.tmp

2016-12-06 13:35:06,586 (hdfs-log-sink1-call-runner-0) [WARN - org.apache.hadoop.util.NativeCodeLoader.<clinit>(NativeCodeLoader.java:62)] Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

2016-12-06 13:36:07,615 (hdfs-log-sink1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:363)] Closing hdfs://ns1/flume/161206/FlumeData.1481002505934.tmp

2016-12-06 13:36:07,710 (hdfs-log-sink1-call-runner-7) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:629)] Renaming hdfs://ns1/flume/161206/FlumeData.1481002505934.tmp to hdfs://ns1/flume/161206/FlumeData.1481002505934

2016-12-06 13:36:07,760 (hdfs-log-sink1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:394)] Writer callback called.

时间: 2024-10-22 23:36:50

flume 收集日志,写入hdfs的相关文章

利用开源日志收集软件fluentd收集日志到HDFS文件系统中

说明:本来研究开源日志的系统是flume,后来发现配置比较麻烦,网上搜索到fluentd也是开源的日志收集系统,配置简单多了,性能不错,所以就改研究这个东东了!官方主页,大家可以看看:fluentd.org,支持300+的plugins,应该是不错的! fluentd是通过hadoop中的webHDFS与HDFS进行通信的,所以在配置fluentd时,一定要保证webHDFS能正常通信,和通过webHDFS写数据到hdfs中! 原理图如下: webHDFS的相关配置与测试,请看这篇文章:http

nginx日志切割并使用flume-ng收集日志

nginx的日志文件没有rotate功能.如果你不处理,日志文件将变得越来越大,还好我们可以写一个nginx日志切割脚本来自动切割日志文件.第一步就是重命名日志文件,不用担心重命名后nginx找不到日志文件而丢失日志.在你未重新打开原名字的日志文件前,nginx还是会向你重命名的文件写日志,linux是靠文件描述符而不是文件名定位文件.第二步向nginx主进程发送USR1信号.nginx主进程接到信号后会从配置文件中读取日志文件名称,重新打开日志文件(以配置文件中的日志名称命名),并以工作进程的

flume分布式日志收集测试

官方参考文档 https://flume.apache.org/FlumeUserGuide.html#file-channel Flume NG是一个分布式.可靠.可用的系统,它能够将不同数据源的海量日志数据进行高效收集.聚合.移动,最后存储到一个中心化数据存储系统中.由原来的Flume OG到现在的Flume NG,进行了架构重构,并且现在NG版本完全不兼容原来的OG版本.经过架构重构后,Flume NG更像是一个轻量的小工具,非常简单,容易适应各种方式日志收集,并支持failover和负载

Flume分布式日志收集系统

1.flume是分布式的日志收集系统,把收集来的数据传送到目的地去.2.flume里面有个核心概念,叫做agent.agent是一个java进程,运行在日志收集节点.通过agent接收日志,然后暂存起来,再发送到目的地.3.agent里面包含3个核心组件:source.channel.sink. 3.1 source组件是专用于收集日志的,可以处理各种类型各种格式的日志数据,包括avro.thrift.exec.jms.spooling directory.netcat.sequence gen

日志系统之基于flume收集docker容器日志

最近我在日志收集的功能中加入了对docker容器日志的支持.这篇文章简单谈谈策略选择和处理方式. 关于docker的容器日志 docker 我就不多说了,这两年火得发烫.最近我也正在把日志系统的一些组件往docker里部署.很显然,组件跑在容器里之后很多东西都会受到容器的制约,比如日志文件就是其中之一. 当一个组件部署到docker中时,你可以通过如下命令在标准输出流(命令行)中查看这个组件的日志: docker logs ${containerName} 日志形如: 但这种方式并不能让你实时获

Flume NG源码分析(五)使用ThriftSource通过RPC方式收集日志

上一篇说了利用ExecSource从本地日志文件异步的收集日志,这篇说说采用RPC方式同步收集日志的方式.笔者对Thrift比较熟悉,所以用ThriftSource来介绍RPC的日志收集方式. 整体的结构图如下: 1. ThriftSource包含了一个Thrift Server,以及一个Thrift Service服务的实现.这里的Thrift Service是由ThriftSourceProtocol定义 2. 应用程序调用Thrift Service的客户端,以RPC的方式将日志发送到Th

Flume 读取JMS 消息队列消息,并将消息写入HDFS

利用Apache Flume 读取JMS 消息队列消息.并将消息写入HDFS,flume agent配置例如以下: flume-agent.conf #name the  components on this agent agentHdfs.sources  = jms_source agentHdfs.sinks =  hdfs_sink agentHdfs.channels  = mem_channel #  Describe/configure the source agentHdfs.s

Syslog-ng+Rsyslog收集日志:写入数据库MySQ, MS-SQL,SQLite, mSQL(六)

为了统计方便,我们要从日志中选择一些消息放到数据库.对数据库读写支持要在编译时就要加上参数,还要在配置文件中开启对应的模块.模块如果很多监控都需要到数据库模块,可以放到/etc/rsyslog.conf全局配置文件里,如果只是某个监控收集用到那就放到/etc/rsyslog.d/的对应局部配置文件里. 1.编译. ./configure --enable-mysql 2.模块.生成的模板. ommysql # mysql输出模块 ompgsql # PostgreSQL的输出模块 omlibdb

_00017 Flume的体系结构介绍以及Flume入门案例(往HDFS上传数据)

博文作者:妳那伊抹微笑 个性签名:世界上最遥远的距离不是天涯,也不是海角,而是我站在妳的面前,妳却感觉不到我的存在 技术方向:hadoop,数据分析与挖掘 转载声明:可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作! qq交流群:214293307  (期待与你一起学习,共同进步) # 学习前言 想学习一下Flume,网上找了好多文章基本上都说的很简单,只有一半什么的,简直就是坑爹,饿顿时怒火就上来了,学个东西真不容易,然后自己耐心的把这些零零碎碎的东西整理整理,各种