Flume MemoryChannel ChannelFullException

先贴上异常信息

 1 08 May 2015 17:55:23,751 WARN  [New I/O  worker #1] (org.apache.flume.source.AvroSource.append:350)  - Avro source r1: Unable to process event. Exception follows.
 2 org.apache.flume.ChannelException: Unable to put event on required channel: org.apache.flume.channel.MemoryChannel{name: c2}
 3     at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:275)
 4     at org.apache.flume.source.AvroSource.append(AvroSource.java:348)
 5     at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
 6     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 7     at java.lang.reflect.Method.invoke(Method.java:606)
 8     at org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88)
 9     at org.apache.avro.ipc.Responder.respond(Responder.java:149)
10     at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
11     at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
12     at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
13     at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:558)
14     at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:786)
15     at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
16     at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:458)
17     at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:439)
18     at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:311)
19     at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
20     at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:558)
21     at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:553)
22     at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
23     at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
24     at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:84)
25     at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:471)
26     at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:332)
27     at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)
28     at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:102)
29     at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
30     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
31     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
32     at java.lang.Thread.run(Thread.java:745)
33 Caused by: org.apache.flume.ChannelFullException: Space for commit to queue couldn‘t be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
34     at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:130)
35     at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
36     at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:267)
37     ... 29 more

问题分析

从标红部分大致可以看出,MemoryChannel满了,source写不进去。

从程序运行表现来看,可能导致flume堵死,不能传输日志了(我遇到的现象)

解决办法

参考 http://blog.csdn.net/hijk139/article/details/8465094

增大MemoryChannel的容量,增加处理时间

修改配置文件

# channel c2 的配置
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000000
a1.channels.c2.transactionCapacity = 80000
a1.channels.c2.keep-alive = 30
时间: 2024-10-10 19:58:37

Flume MemoryChannel ChannelFullException的相关文章

<Flume><Source Code><Flume源码阅读笔记>

Overview source采集的日志首先会传入ChannelProcessor, 在其内首先会通过Interceptors进行过滤加工,然后通过ChannelSelector选择channel. Source和Sink之间是异步的,sink只需要监听自己关系的Channel的变化即可. sink存在写失败的情况,flume提供了如下策略: 默认是一个sink,若写入失败,则该事务失败,稍后重试. 故障转移策略:给多个sink定义优先级,失败时会路由到下一个优先级的sink.sink只要抛出一

自定义Flume Sink:ElasticSearch Sink

Flume Sink的目的是从Flume Channel中获取数据然后输出到存储或者其他Flume Source中.Flume Agent启动的时候,它会为每一个Sink都启动一个SinkRunner的对象,SinkRunner.start()方法会启动一个新的线程去管理每一个Sink的生命周期.每一个Sink需要实现start().Stop()和process()方法.你可以在start方法中去初始化Sink的参数和状态,在stop方法中清理Sink的资源.最关键的是process方法,它将处

那些年踏过的Apache Flume之路

Flume作为日志采集系统,有着独特的应用和优势,那么Flume在实际的应用和实践中到底是怎样的呢?让我们一起踏上Flume之路. 1.  什么是Apache Flume (1)Apache Flume简单来讲是高性能.分布式的日志采集系统,和sqoop同属于数据采集系统组件,但是sqoop用来采集关系型数据库数据,而Flume用来采集流动型数据. (2)Flume名字来源于原始的近乎实时的日志数据采集工具,现在被广泛用于任何流事件数据的采集,它支持从很多数据源聚合数据到HDFS. (3)Flu

利用flume+kafka+storm+mysql构建大数据实时系统

架构图 数据流向图 1.Flume 的一些核心概念: 2.数据流模型 Flume以agent为最小的独立运行单位.一个agent就是一个JVM.单agent由Source.Sink和Channel三大组件构成,如下图: Flume的数据流由事件(Event)贯穿始终.事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source,比如上图中的Web Server生成.当Source捕获事件后会进行特定的格式化,然后Source会把事件

第二章 flume 的数据流模型

1. flume 的基本概念 本文中所有与 flume 相关术语都采用斜体英文表示,这些术语的含义如下所示. flume             一个可靠的,分布式的,用于采集,聚合,传输海量日志数据的工具. Web Server   一个产生 Events/数据 的客户端. Agent            flume 系统中的一个节点,它主要包含三个部件:Source, Channel, Sink. Event            事件,在 flume-agent 内部传输的数据结构.一个

flume原理及代码实现

转载标明出处:http://www.cnblogs.com/adealjason/p/6240122.html 最近想玩一下流计算,先看了flume的实现原理及源码 源码可以去apache 官网下载 下面整理下flume的原理及代码实现: flume是一个实时数据收集工具,hadoop的生态圈之一,主要用来在分布式环境下各服务器节点做数据收集,然后汇总到统一的数据存储平台,flume支持多种部署架构模式,单点agent部署,分层架构模式部署,如通过一个负载均衡agent将收集的数据分发到各个子a

Flume 开发者指南V1.5.2

介绍 概述 Apache Flume是一个用来从很多不同的源有效地收集,聚集和移动大量的日志数据到一个中心数据仓库的分布式的,可靠的和可用的系统. Apache Flume是Apache软件基金会的顶级项目.目前有两个可获得的发布代码路线,0.9.x版本和1.x版本.本文档适用于1.x代码线.对于0.9.x代码线,请看Flume 0.9.x开发指南. 结构 数据流模型 一个Event是在Flume代理之间流动的数据单元.Event从Source流动到Channel再到Sink,并由一个Event

Flume与Logstash比较

Flume与Logstash相比,个人的体会如下: Logstash比较偏重于字段的预处理:而Flume偏重数据的传输: Logstash有几十个插件,配置灵活:FLume则是强调用户的自定义开发(source和sink的种类也有一二十个吧,channel就比较少了). Logstash的input和filter还有output之间都存在buffer,进行缓冲:Flume直接使用channel做持久化(可以理解为没有filter) 一.Logstash浅谈: input负责数据的输入(产生或者说

Flume NG 学习笔记(一)简介

一.简介 Flume是一个分布式.可靠.高可用的海量日志聚合系统,支持在系统中定制各类数据发送方,用于收集数据:同时,Flume提供对数据的简单处理,并写到各种数据接收方的能力. Flume在0.9.x and 1.x之间有较大的架构调整,1.x版本之后的改称Flume NG(next generation),0.9.x的称为Flume OG(originalgeneration). 对于OG版本, Flume NG (1.x.x)的主要变化如下: 1.sources和sinks 使用chann