flume 使用 spool source的时候字符集出错

1. 错误所在

2016-04-21 02:23:05,508 (pool-3-thread-1) [ERROR - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:256)] FATAL: Spool Directory source source1: { spoolDir: /home/hadoop_admin/movielog/ }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.nio.charset.MalformedInputException: Input length = 1
        at java.nio.charset.CoderResult.throwException(CoderResult.java:277)
        at org.apache.flume.serialization.ResettableFileInputStream.readChar(ResettableFileInputStream.java:195)
        at org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:134)
        at org.apache.flume.serialization.LineDeserializer.readEvent(LineDeserializer.java:72)
        at org.apache.flume.serialization.LineDeserializer.readEvents(LineDeserializer.java:91)
        at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:238)
        at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

2. 解决方法

  原因的inputCharset属性的默认值UTF-8,但是所读取的日志文件的字符集却是GBK,所以更改一下这个属性值就可以了

agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1

# For each one of the sources, the type is defined
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir =/home/hadoop_admin/movielog/
agent1.sources.source1.inputCharset = GBK
agent1.sources.source1.fileHeader = true
agent1.sources.source1.deletePolicy = immediate
agent1.sources.source1.batchSize = 1000
agent1.sources.source1.channels = channel1

# Each sink‘s type must be defined
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://master:9000/flumeTest
agent1.sinks.sink1.hdfs.filePrefix = master-
agent1.sinks.sink1.hdfs.writeFormat = Text
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.rollInterval = 0
agent1.sinks.sink1.hdfs.rollSize = 10240
agent1.sinks.sink1.hdfs.batchSize = 100
agent1.sinks.sink1.hdfs.callTimeout = 30000
agent1.sinks.sink1.channel = channel1

# Each channel‘s type is defined.
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 100000
agent1.channels.channel1.transactionCapacity = 100000
agent1.channels.channel1.keep-alive = 30

  

时间: 2024-07-29 16:45:47

flume 使用 spool source的时候字符集出错的相关文章

flume组件汇总 source、sink、channel

Flume Source Source类型 说明 Avro Source 支持Avro协议(实际上是Avro RPC),内置支持 Thrift Source 支持Thrift协议,内置支持 Exec Source 基于Unix的command在标准输出上生产数据 JMS Source 从JMS系统(消息.主题)中读取数据,ActiveMQ已经测试过 Spooling Directory Source 监控指定目录内数据变更 Twitter 1% firehose Source 通过API持续下载

Flume Spool Source 源码过程分析(未运行)

主要涉及到的类: SpoolDirectorySource 读取用户配置,并按照batchSize去读取这么多量的Event从用户指定的Spooling Dir中.SpoolDirectorySource 不会去读取某一个具体的文件,而是通过内部的reader去读取.文件切换等操作,都是reader去实现 内部类:SpoolDirectoryRunnable是一个线程,其中的run方法,完成从Spooling Dir读取Event(使用reader去读取) 1 @Override 2 publi

Hadoop实战-Flume之自定义Source(十八)

import java.nio.charset.Charset; import java.util.HashMap; import java.util.Random; import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; im

【Flume】flume中Avro Source到Avro Sink之间通过SSL传输数据的实现分析及使用

首先你需要了解JAVA KEYSTORE 该SSL用于Avro Sink到Avro Source之间的数据传输 该场景主要用于分布式Flume之间的数据传输,从分散的各个flume agent到中心汇集节点的flume agent 下面来看下如何实现的? Avro Sink SSL 在这个传输过程中,sink其实就相当于socket的client端了 flume源码中有个类NettyAvroRpcClient,该类中还有个内部类SSLCompressionChannelFactory 其中定义了

日志收集框架 Flume 组件之Source使用

上一篇简单介绍了Flume几个组件,今天介绍下组件其一的source,整理这些,也是二次学习的过程,也是梳理知识的过程. Source 中文译为来源,源作用:采集数据,然后把数据传输到channel上.例如:监控某个文件或者某个端口或某个目录,新增数据,新增文件的变化,然后传输到channel. 常用的的source类型,也是平常用的比较多的几种类型,如下: source类型 说明 Avro Source 支持avro协议,内置支持 Thrift Source 支持Thirft rpc ,内置支

Flume NG 学习笔记(四)Source配置

首先.这节水的东西就比较少了,大部分是例子. 一.Avro Source与Thrift Source Avro端口监听并接收来自外部的Avro客户流的事件.当内置Avro 去Sinks另一个配对Flume代理,它就可以创建分层采集的拓扑结构.官网说的比较绕,当然我的翻译也很弱,其实就是flume可以多级代理,然后代理与代理之间用Avro去连接 下面是官网给出的source的配置,加粗的参数是必选,描述就不解释了. Property Name Default Description channel

Flume学习之路 (二)Flume的Source类型

一.概述 官方文档介绍:http://flume.apache.org/FlumeUserGuide.html#flume-sources 二.Flume Sources 描述 2.1 Avro Source 2.1.1 介绍 Avro端口监听并接收来自外部的Avro客户流的事件.当内置Avro去Sinks另一个配对Flume代理,它就可以创建分层采集的拓扑结构.官网说的比较绕,当然我的翻译也很弱,其实就是flume可以多级代理,然后代理与代理之间用Avro去连接.==字体加粗的属性必须进行设置

Flume简介与使用(二)——Thrift Source采集数据

Flume简介与使用(二)——Thrift Source采集数据 继上一篇安装Flume后,本篇将介绍如何使用Thrift Source采集数据. Thrift是Google开发的用于跨语言RPC通信,它拥有功能强大的软件堆栈和代码生成引擎,允许定义一个简单的IDL文件来生成不同语言的代码,服务器端和客户端通过共享这个IDL文件来构建来完成通信. Flume的Thrift Source是其实现的众多Source中的一个,Flume已经实现了服务器端,因此我们可以用任意自己熟悉的语言编写自己的Th

Flume自定义Source

大家好. 公司有个需求.要求Flumne 从MQ 取消息存储到DFS ,写了Flume自定义的source .,由于我也是刚接触Flume . 所以有啥不对的请谅解. 查看了Flume-ng的源码.  一般都是根据不同的场景  extends AbstractSource implements EventDrivenSource, Configurable MQSource 代码如下: 1 public class MQSource extends AbstractSource implemen