flume的memeryChannel中transactionCapacity和sink的batchsize需要注意事项

fluem中出现,transactionCapacity查询一下,得出一下这些:

最近在做flume的实时日志收集,用flume默认的配置后,发现不是完全实时的,于是看了一下,原来是memeryChannel的transactionCapacity在作怪,因为他默认是100,也就是说收集端的sink会在收集到了100条以后再去提交事务(即发送到下一个目的地),于是我修改了transactionCapacity到10,想看看是不是会更加实时一点,结果发现收集日志的agent启动的时候报错了。

16/04/29 09:36:15 ERROR sink.AbstractRpcSink: Rpc Sink avro-sink: Unable to get event from channel memoryChannel. Exception follows.
org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 10 full, consider committing more frequently, increasing capacity, or increasing thread count
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.Java:96)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:354)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)

于是很纳闷,为什么默认值100可以,而设置10就会说小了呢,于是查阅资料,发现原来是sink的batchsize参数在作怪,下面,我就来理一理这个来龙去脉,这个sink的batchsize是什么意思呢,就是sink会一次从channel中取多少个event去发送,而这个发送是要最终以事务的形式去发送的,因此这个batchsize的event会传送到一个事务的缓存队列中(takeList),这是一个双向队列,这个队列可以在事务失败时进行回滚(也就是把取出来的数据吐memeryChannel的queue中),它的初始大小就是transactionCapacity定义的大小,源码中有: takeList = new LinkedBlockingDeque<Event>(transCapacity); 源码来自https://segmentfault.com/a/1190000003586635的分享。

再看这个错误抛出的地方:

if(takeList.remainingCapacity() == 0) {
        throw new ChannelException("Take list for MemoryTransaction, capacity " +
            takeList.size() + " full, consider committing more frequently, " +
            "increasing capacity, or increasing thread count");
    }

在上面的情况中,sink一次取100个events,塞到takelist中,在塞了10个后,就会引发上述异常,因此,这个错误的解决办法就是:在sink中,channel的transactionCapacity参数不能小于sink的batchsize。

时间: 2024-12-13 11:31:42

flume的memeryChannel中transactionCapacity和sink的batchsize需要注意事项的相关文章

带你看懂大数据采集引擎之Flume&amp;采集目录中的日志

欢迎关注大数据和人工智能技术文章发布的微信公众号:清研学堂,在这里你可以学到夜白(作者笔名)精心整理的笔记,让我们每天进步一点点,让优秀成为一种习惯! 一.Flume的介绍: Flume由Cloudera公司开发,是一种提供高可用.高可靠.分布式海量日志采集.聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于采集数据:同时,flume提供对数据进行简单处理,并写到各种数据接收方的能力,如果能用一句话概括Flume,那么Flume是实时采集日志的数据采集引擎. 二.Flume的体

【Apache Flume系列】Flume-ng failover 以及Load balance测试及注意事项

好久没写博客了.最近在研究storm.flume和kafka.今天给大伙写下我测试flume failover以及load balance的场景以及一些结论: 测试环境包含5个配置文件,也就是5个agent. 一个主的配置文件,也就是我们配置failover以及load balance关系的配置文件(flume-sink.properties),这个文件在下面的场景 会变动,所以这里就不列举出来了,会在具体的场景中写明: 其他4个配置文件类似: #Name the compents on thi

关于数据库中varchar/nvarchar类型数据的获取注意事项

当在页面后台获取数据库表中某字段的数据时,需注意该数据的类型.防止因实际数据的字符长度因达不到指定数据类型规定的字符长度而导致空格的占位符. 比如: MSSQL中某一表的结构如下:   表中的数据: 当从表中获取某一个用户名(userName)或密码(userPwd)时,如果取出的数据作为条件进行判断是,需注意获取处的数据最好进行Trim()处理,去除数据两边的空格占位符 比如: 没有对数据进行Trim()处理前,从数据库中获取的数据因为原本的数据长度不够而导致空格占位符 对数据进行Trim()

在Js中得到元素的子元素集合注意事项

http://www.cnblogs.com/phonefans/archive/2008/09/04/1283739.html 在Js中得到元素的子元素集合注意事项 费话少说,直接看例子: 1 <ul>2  <li>this's one</li>3  <li>this's two</li>4  <li>this's three</li>5  <li>this's four</li>6 <ul

Windows 已在 数据结构_顺序表.exe 中触发一个断点——new和delete注意事项

实现数据结构的顺序表的类时,输入,改,删,查都可以,但是最后析构函数时持续出错 错误提示"Windows 已在 数据结构_顺序表.exe 中触发一个断点" int *elem=new int(LIST_INIT_SIZE); if(!elem)cout<<"overflow"<<endl; if(leng>LIST_INIT_SIZE) cout<<"error"; else {    length=le

AppStore中使用IDFA后提交应用的注意事项

在ios7.0出来以前,我们都是通过wifi的mac来当作IOS设备的唯一标识符.如何在ios下获取设备的MAC,你可以参数这篇文章:获取ios的MAC地址   在没有使用IDFA之前,我们在ios7及以上的版本中使用的是CFUUID string createCUID(){ string strRet = ""; CFUUIDRef uuid_ref = CFUUIDCreate(NULL); CFStringRef uuid_string_ref= CFUUIDCreateStri

使用CSS中margin和padding的基础和注意事项

在CSS中,margin和padding是页面布局的主要属性,如何灵活有效使用对于基于DIV+CSS设计网页方法是非常重要的,笔者经常使用且经常误使用,所以根据经验和网上资料整理出切合自己的内容,以备以后使用. 一.首先了解CSS盒模型 Box Model 通过这个CSS盒模型模型就很容易理解Margin.padding和Border.. W3C定义的盒模式如下: width和height定义的是Content部分的宽度和高度,padding border margin的宽度依次加在外面.背景会

【Flume】flume中transactionCapacity和batchSize概念的具体分析和解惑

不知道各位用过flume的读者对这两个概念是否熟悉了解 一开始本人的确有点迷惑,觉得这是不是重复了啊? 没感觉到transactionCapacity的作用啊? batchSize又是干啥的啊? -- -- 带着这些问题,我们深入源码来看一下: batchSize batchSize这个概念首先它出现在哪里呢? kafkaSink的process方法 HDFS Sink Exec Source 通过上面这三张图,相信大家应该知道batchSize从哪来的了 batchSize是针对Source和

flume从kafka中读取数据

a1.sources = r1 a1.sinks = k1 a1.channels = c1 #使用内置kafka source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource #kafka连接的zookeeper a1.sources.r1.zookeeperConnect = localhost:2181 a1.sources.r1.topic = kkt-test-topic a1.sources.r1.batc