一个Flume线上问题的排查

最近在做一个分布式调用链跟踪系统,

在两个地方采用了flume ,一个是宿主系统 ,用flume agent进行日志搜集。 一个是从kafka拉日志分析后写入hbase.

后面这个flume(从kafka拉日志分析后写入flume)用了3台  , 系统上线以后 ,线上抛了一个这样的异常:

Caused by: org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 100 full, consider committing more frequently, increasing capacity or increasing thread count
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doPut(MemoryChannel.java:84)
at org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
at org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:80)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189)

从异常信息直观理解是MemoryChannel的事务的Put队列满了,为什么会这样呢?

我们先从Flume的体系结构说起,Flume是apache一个负责日志采集和传输的开源工具,它的特点是能够很灵活的通过配置实现不同数据存储之间的数据转换,通过简单的agent还能实现一个日志搜集平台(后面另写文章来进行总结),

它有三个最主要的组件:

Source : 负责从数据源获取数据,包含两种类型的Source . EventDrivenSource 和 PollableSource ,  前者指的是事件驱动型数据源,故名思议,就是需要外部系统主动送数据  ,比如AvroSource ,ThriftSource ; 而PollableSource 指的是需要主动从数据源拉取数据  ,比如KafkaSource ,Source 获取到数据以后向Channel 写入Event, Flume的 Event包含headers和body两部分,前者是键值对组成的Map .

Sink : 负责从Channel拉取Event , 写入下游存储或者对接其他Agent.

Channel:用于实现Source和Sink之间的数据缓冲, 主要有文件通道和内存通道两类。

Flume的架构图如下:

而我的flume 配置如下:

a1.sources = kafkasource
a1.sinks = hdfssink hbasesink
a1.channels = hdfschannel hbasechannel

a1.sources.kafkasource.channels = hdfschannel hbasechannel

a1.sinks.hdfssink.channel = hdfschannel
a1.sinks.hbasesink.channel = hbasechannel

a1.sources.kafkasource.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.kafkasource.zookeeperConnect = zk1:2181,zk2:2181,zk3:2181
a1.sources.kafkasource.topic = nagual_topic
a1.sources.kafkasource.groupId = flume
a1.sources.kafkasource.kafka.consumer.timeout.ms = 500

a1.sinks.hdfssink.type = hdfs
a1.sinks.hdfssink.hdfs.path = hdfs://namenode:8020/flume/kafka_events/%y-%m-%d/%H%M
a1.sinks.hdfssink.hdfs.filePrefix = events-prefix
a1.sinks.hdfssink.hdfs.round = true
a1.sinks.hdfssink.hdfs.roundValue = 10
a1.sinks.hdfssink.hdfs.roundUnit = minute
a1.sinks.hdfssink.hdfs.fileType = SequenceFile
a1.sinks.hdfssink.hdfs.writeFormat = Writable
a1.sinks.hdfssink.hdfs.rollInterval = 60
a1.sinks.hdfssink.hdfs.rollCount = -1
a1.sinks.hdfssink.hdfs.rollSize = -1

a1.sinks.hbasesink.type = hbase
a1.sinks.hbasesink.table = htable_nagual_tracelog
a1.sinks.hbasesink.index_table = htable_nagual_tracelog_index
a1.sinks.hbasesink.serializer =NagualTraceLogEventSerializer
a1.sinks.hbasesink.columnFamily = rpcid
a1.sinks.hbasesink.zookeeperQuorum = zk1:2181,zk2:2181,zk3:2181

a1.channels.hdfschannel.type = memory
a1.channels.hdfschannel.capacity= 10000
a1.channels.hdfschannel.byteCapacityBufferPercentage = 20
a1.channels.hdfschannel.byteCapacity = 536870912

也就是说我的flume agent从kafka拉取日志以后,转换成hbase 的row put操作,中间采用了memchannel , 为什么会出现之前提到的异常呢?花了一个下午的时间,对flume的源码通读了一遍, 基本找到了问题所在 。

我们把源码拆解成以下几个主要步骤来分析:

1、flume 的启动:

如上图所示, 整个flume 启动的主要流程是这样的:

flume_home中的flume-ng启动脚本启动Application , Application创建一个PollingPropertiesFileConfigurationProvider, 这个Provider的作用是启动 一个配置文件的监控线程FileWatcherRunnable ,定时监控配置文件的变更,

一旦配置文件变更,则重新得到SinkRunner, SourceRunner以及channel的配置, 包装成MaterialedConfiguration,通过google guava的eventbus 推送配置变更给Application ,Application启动一个LifeCycleSupervisor,由它来负责监控

SourceRunner ,SinkRunner,Channel的运行情况。而这几个用绿框标识的组件都实现或者继承了LifeCycleAware接口,监控的方式有点意思:通过定时检查这些组件的期望Status是否和当前Status一致, 如果不一致则调用Status对应的方法。

比如在启动的时候,期望SinkRunner的状态是Running ,则调用SinkRunner的Start方法。

启动的顺序是Channel -> SinkRunner -> SourceRunner ( 形象的理解为先有水管,再有水盆, 再开水龙头。)

以我的flume配置文件为例, 使用了MemChannel  。在AbstractConfigurationProvider 配置阶段,会创建好一个LinkedBlockingDeque ( 这个队列是一个全局唯一的有最大长度的双端队列), 而在AbstractConfigurationProvider创建Channel的时候,一种类型的通道又只会被创建一次(具体可以看AbstractConfigurationProvider的getOrCreateChannel方法) ,所以我们也可以理解为什么在Flume的memory channel配置中关于byteCapacity的配置参数会有这么一句话:

Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events

也就是说即使配置了多个内存通道, 也共享的是一个双端队列。

未完,待续。

时间: 2024-10-12 00:11:21

一个Flume线上问题的排查的相关文章

线上应用故障排查之二:高内存占用

搞Java开发的,经常会碰到下面两种异常: 1.java.lang.OutOfMemoryError: PermGen space 2.java.lang.OutOfMemoryError: Java heap space 要详细解释这两种异常,需要简单重提下Java内存模型. Java内存模型是描述Java程序中各变量(实例域.静态域和数组元素)之间的关系,以及在实际计算机系统中将变量存储到内存和从内存取出变量这样的低层细节. 在Java虚拟机中,内存分为三个代:新生代(New).老生代(Ol

线上应用故障排查之二:高内存占用(转)

搞Java开发的,经常会碰到下面两种异常: 1.java.lang.OutOfMemoryError: PermGen space 2.java.lang.OutOfMemoryError: Java heap space 要详细解释这两种异常,需要简单重提下Java内存模型. (友情提示:本博文章转载,出处:hankchen,http://www.blogjava.net/hankchen) Java内存模型是描述Java程序中各变量(实例域.静态域和数组元素)之间的关系,以及在实际计算机系统

java线上服务问题排查

1.业务日志相关 如果系统出现异常或者业务有异常,首先想到的都是查看业务日志 查看日志工具: less 或者more grep tail -f filename 查看实时的最新内容 ps:切忌vim直接打开大日志文件,因为会直接加载到内存的 2.数据库相关 java应用很多瓶颈在数据库,一条sql没写好导致慢查询,可能就会带来应用带来致命危害. 如果出现Could not get JDBC Connection .接口响应慢.线程打满等, 需要登录线上库, 查看数据库连接情况:show proc

Java线上应用故障排查之一:高CPU占用

一个应用占用CPU很高,除了确实是计算密集型应用之外,通常原因都是出现了死循环. (友情提示:本博文章欢迎转载,但请注明出处:hankchen,http://www.blogjava.net/hankchen) 以我们最近出现的一个实际故障为例,介绍怎么定位和解决这类问题. 根据top命令,发现PID为28555的Java进程占用CPU高达200%,出现故障. 通过ps aux | grep PID命令,可以进一步确定是tomcat进程出现了问题.但是,怎么定位到具体线程或者代码呢? 首先显示线

线上应用故障排查之一:高CPU占用

一个应用占用CPU很高,除了确实是计算密集型应用之外,通常原因都是出现了死循环. 以我们最近出现的一个实际故障为例,介绍怎么定位和解决这类问题. 根据top命令,发现PID为28555的Java进程占用CPU高达200%,出现故障. 通过ps aux | grep PID命令,可以进一步确定是tomcat进程出现了问题.但是,怎么定位到具体线程或者代码呢? 首先显示线程列表: ps -mp pid -o THREAD,tid,time 找到了耗时最高的线程28802,占用CPU时间快两个小时了!

一次线上死循环的排查

1.问题发现 ??????Prometheus报警某服务的一个节点 Old GC过多,需要排查. 2.查看GC日志 ??????使用tail -f gc.log命令查看异常节点的GC日志,从日志可以看出Young GC过于频繁,竟然在1s内有9次Young GC: ??????使用tail -f gc.log命令查看正常节点的GC日志,从日志可以看出,正常节点,很久才进行一次Young GC: ??????两个节点的JVM参数配置是完全一样的,并且负载均衡策略使用的是Ribbon默认的轮询策略,

搭建一个类似线上的线下测试环境

所有环境搭建全部基于64位CentOS6.5,并且是基本安装. 2台WEB         172.16.30.10 172.16.30.11 2台MQ          172.16.30.12 172.16.30.13 2台Mysql       172.16.30.14 172.16.30.15 1台LVS         172.16.30.16  VIP:172.16.30.30 LVS采用ipvsadm来实现管理 WEB采用NGINX+TOMCAT动静分离 MQ采用集群共享方式 MY

线上ZK问题排查

问题描述 测试环境ZK集群的三个节点中zk1状态虽然是follower,启动也能正常启动(通过telnet也能telnet 2181端口); 无法通过zk客户端去连接2181端口,状态一致是CONNECTING 查看zk集群所有节点状态 /data/zookeeper-new-1/bin/zkServer.sh status /data/zookeeper-new-2/bin/zkServer.sh status /data/zookeeper-new-3/bin/zkServer.sh sta

Cpu飚高show-busy-java-threads一件脚本排查与Arthas线上诊断工具排查实战

spring boot 模拟飚高代码 @Servicepublic class TestWhile{    /* 操作内存对象 */    ConcurrentHashMap map = new ConcurrentHashMap();    private void whileTrue(String threadName) {        // 不设置退出条件,死循环        while (true) {            // 在死循环中不断的对map执行put操作,导致内存gc