Flume 远程写HDFS

现在的需求是在一台Flume采集机器上,往Hadoop集群上写HDFS,该机器没有安装Hadoop。

这里的Flume版本是1.6.0,Hadoop版本是2.7.1.

把Hadoop集群的hdfs-site.xml、core-site.xml两个配置文件复制到 flume安装目录的conf目录去,把hadoop-hdfs-2.7.1.jar复制到 Flume  lib目录。

一、Flume配置文件:

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = syslogtcp
a1.sources.r1.bind = 192.168.110.160 # 本机ip
a1.sources.r1.port = 23003
a1.sources.r1.workerThreads  = 10

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 100000
a1.channels.c1.keep-alive = 6
a1.channels.c1.byteCapacityBufferPercentage = 20

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://clusterpc/test/flume/%y-%m-%d
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp=true

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  启动: bin/flume-ng agent --conf conf --conf-file conf/flume-tcp-memory-hdfs.conf --name a1 -Dflume.root.logger=info,console

二、错误集:

1、 找不到主机名

2016-09-19 16:15:48,518 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:459)] process failed
java.lang.IllegalArgumentException: java.net.UnknownHostException: cluster
        at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
        at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
        at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
        at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2653)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2669)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:170)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:355)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
        at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:243)
        at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:235)
        at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)
        at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
        at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:676)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.net.UnknownHostException: cluster

  cluster是公司Hadoop集群NameService的名字,这个错误是由于找不到Hadoop集群NameService造成的,所以需要把hdfs-site.xml复制到flume/conf目录。

2、

java.io.IOException: Mkdirs failed to create /test/flume/16-09-19 (exists=false, cwd=file:/data/apache-flume-1.6.0-bin)
        at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:450)
        at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:435)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:787)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:776)
        at org.apache.flume.sink.hdfs.HDFSSequenceFile.open(HDFSSequenceFile.java:96)
        at org.apache.flume.sink.hdfs.HDFSSequenceFile.open(HDFSSequenceFile.java:78)
        at org.apache.flume.sink.hdfs.HDFSSequenceFile.open(HDFSSequenceFile.java:69)
        at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:246)
        at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:235)
        at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)
        at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
        at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:676)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)

  把 core-site.xml复制到flume/conf目录

3、

java.io.IOException: No FileSystem for scheme: hdfs
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2644)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2651)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2669)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:170)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:355)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
        at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:243)
        at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:235)
        at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)
        at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
        at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:676)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)

  把hadoop-hdfs-2.7.1.jar复制到flume/lib目录下

4、HDFS权限不足,这里往HDFS写文件的用户是登录Flume采集机器的用户。

org.apache.hadoop.security.AccessControlException: Permission denied: user=kafka, access=WRITE, inode="/test/flume/16-09-19/events-.1474268726127.tmp":hadoop:supergroup:drwxr-xr-x
        at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:319)
        at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:292)
        at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:213)
        at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
        at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1698)
        at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1682)
        at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1665)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2517)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2452)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2335)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:623)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:397)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

  HDFS 权限不足,要授权。hadoop fs -chmod -R 777 /test/

5、时间戳

java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
        at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
        at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:228)
        at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:432)
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:380)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:744)

  原因是Event对象headers没有设置timestamp造成的,解决办法:设置a1.sinks.k1.hdfs.useLocalTimeStamp=true,使用本地时间戳。

时间: 2024-10-04 06:25:42

Flume 远程写HDFS的相关文章

flume按照日志时间写hdfs实现

flume写hdfs的操作在HDFSEventSink.process方法中,路径创建由BucketPath完成 分析其源码(参考:http://caiguangguang.blog.51cto.com/1652935/1619539) 可以使用%{}变量替换的形式实现,只需要获取event中时间字段(nginx日志的local time)传入hdfs.path即可 具体实现如下: 1.在KafkaSource的process方法中增加:           dt = KafkaSourceUt

客户端用java api 远程操作HDFS以及远程提交MR任务(源码和异常处理)

两个类,一个HDFS文件操作类,一个是wordcount 词数统计类,都是从网上看来的.上代码: package mapreduce; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apac

shell脚本监控Flume输出到HDFS上文件合法性

在使用flume中发现由于网络.HDFS等其它原因,使得经过Flume收集到HDFS上得日志有一些异常,表现为: 1.有未关闭的文件:以tmp(默认)结尾的文件.加入存到HDFS上得文件应该是gz压缩文件,以tmp为结尾的文件就无法使用: 2.有大小为0的文件,比如gz压缩文件大小为0,我们单独拿下这个文件解压发现是无限循环压缩的...这个也不能直接用来跑mapreduce 目前发现上述两种情况,其它还暂未发现.至于出现上述情况还没明确原因,且这两种情况都会影响hive.MapReduce的正常

flume远程调试

项目开发的时候,出现问题的时候,通常在IDE里面直接进行调试,但有时候我们可能用的是另外的一些开源框架,甚至运行程序里面没有一行代码是我们自己写的,如果出现一些较复杂的问题,那么我们可能就会用到远程调试.最近正好进行过flume的远程调试,就简单记录一下吧: 如果有远程调试tomcat经历,那就简单了,一样是通过JMX开启远程调试. 找到flume安装目录,进入bin目录,修改启动文件flume-ng,找到: # set default params FLUME_CLASSPATH=""

webhdfs追加写HDFS异常

问题 {:timestamp=>"2015-03-04T00:02:47.224000+0800", :message=>"Retrying webhdfs write for multiple times. Maybe you should increase retry_interval or reduce number of workers.", :level=>:warn}{:timestamp=>"2015-03-04T00

flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统

一.Hadoop配置安装 注意:apache提供的hadoop-2.6.0的安装包是在32位操作系统编译的,因为hadoop依赖一些C++的本地库, 所以如果在64位的操作上安装hadoop-2.4.1就需要重新在64操作系统上重新编译 1.修改Linux主机名 2.修改IP 3.修改主机名和IP的映射关系 ######注意######如果你们公司是租用的服务器或是使用的云主机(如华为用主机.阿里云主机等) /etc/hosts里面要配置的是内网IP地址和主机名的映射关系 4.关闭防火墙 5.s

【Flume】flume ng中HDFS sink设置按天滚动,0点滚动文件,修改源码实现

HDFS sink里有个属性hdfs.rollInterval=86400,这个属性你设置了24小时滚动一次,它的确就到了24小时才滚动,但是我们的需求往往是到了0点就滚动文件了,因为离线的job因为都会放在夜里执行. 如果flume是早上9点启动的,那么要到明天早上9点,hdfs的文件才会关闭,难道job要等到9点后才执行,这显然不合适,所以通过修改源码使其能够在0点滚动文件. 首先添加一个属性,可配置为day,hour,min private String timeRollerFlag; t

flume data to hdfs

flume 开发梳理 flume 数据到hadoop conf/hdfsAgent.conf #配置sources.channels.sinks a1.sources=r1 a1.channels=c1 a1.sinks=k1 #sources.r1 配置源数据类型 a1.sources.r1.type=exec a1.sources.r1.shell=bin/bash -c a1.sources.r1.command=ping 192.168.1.125 #channels c1 配置传输通道

大数据开发:(三)flume上传HDFS

开启hadoop:start-dfs.sh 通过浏览器访问node节点,http://IP:50070 检查 (如果无法访问,将防火墙关闭) 如果jps查看缺少了某个节点,首先查看xml文件是否正确,如果正确,删除hadoop/tmp文件夹,然后再次格式化,(格式化会重新创建hadoop/tmp),再次启动 操作分布式文件存储系统HDFS 查看hdfs中的文件内容 hadoop fs -ls / 查看hdfs中的详细内容hadoop fs -ls / 在HDFS中创建文件夹hadoop fs -