flume 1.7 安装与使用

Flume安装

系统要求: 
需安装JDK 1.7及以上版本

1、 下载二进制包 
下载页面:http://flume.apache.org/download.html 
1.7.0下载地址:http://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

2、解压

$ cp ~/Downloads/apache-flume-1.7.0-bin.tar.gz ~
$ cd
$ tar -zxvf apache-flume-1.7.0-bin.tar.gz
$ cd apache-flume-1.7.0-bin

3、创建flume-env.sh文件

$ cp conf/flume-env.sh.template conf/flume-env.sh

简单实例-传输指定文件

场景:两台机器,一台为client,一台为agent,在client上将指定文件传输到agent机器上。

1、创建配置文件

根据flume自身提供的模板,创建flume.conf配置文件。

$ cp conf/flume-conf.properties.template conf/flume.conf

编辑文件flume.conf:

$ vi conf/flume.conf

在文件末尾加入以下配置:

# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory

# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 41414

# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = logger

# Finally, now that we‘ve defined all of our components, tell
# agent1 which ones we want to activate.
agent1.channels = ch1
agent1.sources = avro-source1
agent1.sinks = log-sink1

保存,并且退出:

2、启动flume server 
在作为agent的机器上执行以下:

bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n agent1

3、在新的窗口开启client 
在作为client的机器上执行以下: 
(由于当前环境是在单机上模拟两台机器,所以,直接在新的终端中输入以下命令)

$ bin/flume-ng avro-client --conf conf -H localhost -p 41414 -F /etc/passwd -Dflume.root.logger=DEBUG,console

4、结果 
这个时候,你可以看到以下消息:

2012-03-16 16:39:17,124 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java:175)] Finished
2012-03-16 16:39:17,127 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java:178)] Closing reader
2012-03-16 16:39:17,127 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java:183)] Closing transceiver
2012-03-16 16:39:17,129 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.main(AvroCLIClient.java:73)] Exiting

在前面那个开启flume server的窗口,可以看到如下消息:

2012-03-16 16:39:16,738 (New I/O server boss #1 ([id: 0x49e808ca, /0:0:0:0:0:0:0:0:41414])) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:123)] [id: 0x0b92a848, /1
27.0.0.1:39577 => /127.0.0.1:41414] OPEN
2012-03-16 16:39:16,742 (New I/O server worker #1-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:123)] [id: 0x0b92a848, /127.0.0.1:39577 => /127.0.0.1:41414] BOU
ND: /127.0.0.1:41414
2012-03-16 16:39:16,742 (New I/O server worker #1-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:123)] [id: 0x0b92a848, /127.0.0.1:39577 => /127.0.0.1:41414] CON
NECTED: /127.0.0.1:39577
2012-03-16 16:39:17,129 (New I/O server worker #1-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:123)] [id: 0x0b92a848, /127.0.0.1:39577 :> /127.0.0.1:41414] DISCONNECTED
2012-03-16 16:39:17,129 (New I/O server worker #1-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:123)] [id: 0x0b92a848, /127.0.0.1:39577 :> /127.0.0.1:41414] UNBOUND
2012-03-16 16:39:17,129 (New I/O server worker #1-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:123)] [id: 0x0b92a848, /127.0.0.1:39577 :> /127.0.0.1:41414] CLOSED
2012-03-16 16:39:17,302 (Thread-1) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:68)] Event: { headers:{} body:[B@5c1ae90c }
2012-03-16 16:39:17,302 (Thread-1) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:68)] Event: { headers:{} body:[B@6aba4211 }
2012-03-16 16:39:17,302 (Thread-1) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:68)] Event: { headers:{} body:[B@6a47a0d4 }
2012-03-16 16:39:17,302 (Thread-1) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:68)] Event: { headers:{} body:[B@48ff4cf }
...

简单实例-将目录文件上传到HDFS

场景:将机器上的某个文件夹下的文件上传到HDFS上。

1、配置conf/flume.conf

# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory

# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources.spooldir-source1.channels = ch1
agent1.sources.spooldir-source1.type = spooldir
agent1.sources.spooldir-source1.spoolDir=/home/hadoop/flume-1.7.0/tmpData
agent1.sources.spooldir-source1.bind = 0.0.0.0
agent1.sources.spooldir-source1.port = 41414

# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.hdfs-sink1.channel = ch1
agent1.sinks.hdfs-sink1.type = hdfs
agent1.sinks.hdfs-sink1.hdfs.path = hdfs://master:9000/test
agent1.sinks.hdfs-sink1.hdfs.filePrefix = events-
agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.hdfs-sink1.hdfs.round = true
agent1.sinks.hdfs-sink1.hdfs.roundValue = 10

# Finally, now that we‘ve defined all of our components, tell
# agent1 which ones we want to activate.
agent1.channels = ch1
agent1.sources = spooldir-source1
agent1.sinks = hdfs-sink1

其中,/home/hadoop/flume-1.7.0/tmpData是我要上传的文件所在目录,也就是,我要将此文件夹下的文件都上传到HDFS上的hdfs://master:9000/test目录。

注意

  • 这样的配置会产生许多小文件,因为默认情况下,一个文件存储10个event,这个配置由rollCount控制,默认为10,此外还有一个参数为rollSize,这个是控制一个文件的大小,如果文件大于这个数值,就是另起一文件。
  • 此时的文件名都是以event开头,如果想保留原来文件的名字,可以使用以下配置(其中,basenameHeader是相对source而言,filePrefix是相对sink而言,分别这样设置之后,上传到hdfs上的文件名就会变成“原始文件名.时间戳”):
agent1.sources.spooldir-source1.basenameHeader = true
agent1.sinks.hdfs-sink1.hdfs.filePrefix = %{basename}

2、启动agent 
使用以下命令启动agent:

bin/flume-ng agent --conf ./conf/ -f ./conf/flume.conf --name agent1 -Dflume.root.logger=DEBUG,console

3、查看结果 
到Hadoop提供的WEB GUI界面可以看到刚刚上传的文件是否成功。 
GUI界面地址为:http://master:50070/explorer.html#/test 
其中,master为Hadoop的Namenode所在的机器名。

4、总结 
在这个场景,需要将文件上传到HDFS上,会使用到几个Hadoop的jar包,分别是:

${HADOOP_HOME}share/hadoop/common/hadoop-common-2.4.0.jar
${HADOOP_HOME}share/hadoop/common/lib/commons-configuration-1.6.jar
${HADOOP_HOME}share/hadoop/common/lib/hadoop-auth-2.4.0.jar
${HADOOP_HOME}share/hadoop/hdfs/hadoop-hdfs-2.4.0.jar

异常

Failed to start agent because dependencies were not found in classpath. Error follows. java.lang.NoClassDefFoundError org/apache/hadoop/io/SequenceFile$CompressionType

2016-11-03 14:49:35,278 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:146)] Failed to start agent because dependencies were not found in classpath. Error follows.
java.lang.NoClassDefFoundError: org/apache/hadoop/io/SequenceFile$CompressionType

问题原因:缺少依赖包,这个依赖包是以下jar文件:

${HADOOP_HOME}share/hadoop/common/hadoop-common-2.4.0.jar

解决方法:找到这个jar文件,copy到flume安装目录下的lib目录下就ok了。

java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null

2016-11-03 16:32:06,741 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:447)] process failed
java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
    at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
    at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:256)
    at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:465)
    at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:368)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
    at java.lang.Thread.run(Thread.java:745)

解决方法: 
编辑conf/flume.conf文件,其中agent1,sink1替换成你自己的agent和sink

agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

java.lang.NoClassDefFoundError: org/apache/commons/configuration/Configuration

2016-11-03 16:32:55,594 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:447)] process failed
java.lang.NoClassDefFoundError: org/apache/commons/configuration/Configuration
    at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.<init>(DefaultMetricsSystem.java:38)
    at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.<clinit>(DefaultMetricsSystem.java:36)
    at org.apache.hadoop.security.UserGroupInformation$UgiMetrics.create(UserGroupInformation.java:106)
    at org.apache.hadoop.security.UserGroupInformation.<clinit>(UserGroupInformation.java:208)
    at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:2554)
    at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:2546)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2412)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
    at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:240)
    at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:232)
    at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:668)
    at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
    at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:665)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.commons.configuration.Configuration
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 18 more

解决方法: 
缺少的依赖在commons-configuration-1.6.jar包里,这个包在${HADOOP_HOME}share/hadoop/common/lib/下,将其拷贝到flume的lib目录下。

cp ${HADOOP_HOME}share/hadoop/common/lib/commons-configuration-1.6.jar ${FLUME_HOME}/lib/

java.lang.NoClassDefFoundError: org/apache/hadoop/util/PlatformName

2016-11-03 16:41:54,629 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:447)] process failed
java.lang.NoClassDefFoundError: org/apache/hadoop/util/PlatformName

解决方法: 
缺少hadoop-auth-2.4.0.jar依赖,同样将其拷贝到flume的lib目录下:

cp ${HADOOP_HOME}share/hadoop/common/lib/hadoop-auth-2.4.0.jar ${FLUME_HOME}/lib/

HDFS IO error java.io.IOException: No FileSystem for scheme: hdfs

2016-11-03 16:49:26,638 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:443)] HDFS IO error
java.io.IOException: No FileSystem for scheme: hdfs

缺少依赖:hadoop-hdfs-2.4.0.jar

cp ${HADOOP_HOME}share/hadoop/hdfs/hadoop-hdfs-2.4.0.jar ${FLUME_HOME}/lib/
时间: 2024-11-16 02:50:26

flume 1.7 安装与使用的相关文章

详细图解 Flume介绍、安装配置

写在前面一: 本文总结"Hadoop生态系统"中的其中一员--Apache Flume 写在前面二: 所用软件说明: 一.什么是Apache Flume 官网:Flume is a distributed, reliable, and availableservicefor efficientlycollecting, aggregating, and moving large amounts of log data. It has a simple and flexible arch

flume伪分布式安装

flume伪分布式安装: 1.导包:apache-flume-1.7.0-bin.tar.gz 2.配置环境变量:/etc/profile export FLUME_HOME=/yang/apache-flume-1.7.0-bin export PATH=$PATH:$FLUME_HOME/bin 3.在conf目录下创建example.conf的空文件: # a1是给这个agent取的一个名字 # sources  sinks  channels是一个agent下的三个组件,下面这三行是为三

新版flume+kafka+storm安装部署

安装步骤: 1.版本介绍: zookeeper3.4.6 flume-ng1.6 kafka2.10-0.8.2 storm0.9.5 2.安装zookeeper 1.下载最新release版zookeeper http://zookeeper.apache.org/releases.html#download 2.修改zookeeper配置文件 $zookeeper_home/conf $ cp zoo_sample.cfg zoo_sample.cfg.bak $ mv zoo_sample

Flume介绍与安装

搭建环境 部署节点操作系统为CentOS,防火墙和SElinux禁用,创建了一个shiyanlou用户并在系统根目录下创建/app目录,用于存放 Hadoop等组件运行包.因为该目录用于安装hadoop等组件程序,用户对shiyanlou必须赋予rwx权限(一般做法是root用户在根目录下 创建/app目录,并修改该目录拥有者为shiyanlou(chown –R shiyanlou:shiyanlou /app). Hadoop搭建环境: 虚拟机操作系统: CentOS6.6 64位,单核,1

Flume介绍及其安装

一. Flume是什么? Flume是一个分布式,可靠的系统.它能够高效的收集,整合数据,还可以将来自不同源的大量数据移动到数据中心存储. Flume是Apache下的一个顶级项目.Flume不仅可以收集整合日志数据,因为数据源是可以自定义的,Flume能够用于传输大量日志数据,这些数据包含到不仅限于网络传输数据.社交媒体生成的数据.邮件信息等等. 当前的版本有0.9.x和1.x.新版本具有更加灵活的配置和性能上的改进,推荐使用1.x版本.本文介绍使用的是1.8版本. 二.Flume的数据流模型

Flume原理、安装和使用

1.flume是分布式的日志收集系统,把收集来的数据传送到目的地去. 2.flume里面有个核心概念,叫做agent.agent是一个java进程,运行在日志收集节点. 3.agent里面包含3个核心组件:source.channel.sink. 3.1 source组件是专用于收集日志的,可以处理各种类型各种格式的日志数据,包括avro.thrift.exec.jms.spooling directory.netcat.sequence generator.syslog.http.legacy

flume 集群安装

./pssh -h ./host/all.txt -P mkdir /usr/local/app ./pssh -h ./host/all.txt -P tar zxf /usr/local/software/apache-flume-1.6.0-bin.tar.gz -C /usr/local/app ./pssh -h ./host/all.txt -P mv /usr/local/app/apache-flume-1.6.0-bin /usr/local/app/apache-flume-

flume安装与使用

日志采集框架Flume Flume介绍 概述 Flume是一个分布式.可靠.和高可用的海量日志采集.聚合和传输的系统. Flume可以采集文件,socket数据包.文件.文件夹.kafka等各种形式源数据,又可以将采集到的数据(下沉sink)输出到HDFS.hbase.hive.kafka等众多外部存储系统中 运行机制 Flume分布式系统最核心的角色是agent,flume采集系统就是由一个个agent所连接起来而成 每一个agent相当于一个数据传递员,内部有三个组件: Source:采集组

大数据flume日志采集系统详解

一.flume介绍 flume 是一个cloudera提供的 高可用高可靠,分布式的海量日志收集聚合传输系统.Flume支持日志系统中定制各类数据发送方,用于收集数据.同时flume提供对数据进行简单处理,并写到各种数据接收方(可定制)的能力. 二.功能介绍   日志收集 Flume最早是Cloudera提供的日志收集系统,目前是Apache下的一个孵化项目,Flume支持在日志系统中定制各类数据发送方,用于收集数据. 流程:恒生数据接收中间件---file.txt  哪个端口进行监控 ---