flume channel monitor实现

对于flume的监控,只需要监控channel的性能数据即可,source和sink的性能一部分可以从channel中表现出来。
以MemoryChannel为例,在MemoryTransaction的构造函数中会实例化一个org.apache.flume.instrumentation.ChannelCounter对象

    public MemoryTransaction( int transCapacity, ChannelCounter counter) {
      putList = new LinkedBlockingDeque<Event>(transCapacity);
      takeList = new LinkedBlockingDeque<Event>(transCapacity);
      channelCounter = counter;
    }

org.apache.flume.instrumentation.ChannelCounter 定义了几个计数器用来记录channel的性能数据

  private static final String COUNTER_CHANNEL_SIZE = "channel.current.size"; //已经使用的容量大小
  private static final String COUNTER_EVENT_PUT_ATTEMPT = "channel.event.put.attempt"; //source到channel尝试插入的数据(不管是否成功)
  private static final String COUNTER_EVENT_TAKE_ATTEMPT = "channel.event.take.attempt"; //sink从channel尝试消费的数据(不管是否成功)
  private static final String COUNTER_EVENT_PUT_SUCCESS = "channel.event.put.success"; //source到channel成功插入的数据
  private static final String COUNTER_EVENT_TAKE_SUCCESS = "channel.event.take.success"; //sink从channel成功消费的数据
  private static final String COUNTER_CHANNEL_CAPACITY = "channel.capacity";  //总容量大小

并封装了相关的方法,来操作这些性能计数器:

比如channel.event.put.attempt的由getEventPutAttemptCount和incrementEventPutAttemptCount操作:

  public long incrementEventPutAttemptCount() { //用于数量增加1
    return increment( COUNTER_EVENT_PUT_ATTEMPT);
  }
  public long getEventPutAttemptCount() { //用于获取值
    return get( COUNTER_EVENT_PUT_ATTEMPT);
  }

而在channel的相关操作中会使用到这些方法:

    protected void doPut(Event event) throws InterruptedException {
      channelCounter.incrementEventPutAttemptCount(); //比如在插入数据的操作开始时,增加channel.event.put.attempt的值
      int eventByteSize = (int)Math.ceil(estimateEventSize(event)/ byteCapacitySlotSize);
      if (! putList.offer(event)) {
        throw new ChannelException(
          "Put queue for MemoryTransaction of capacity " +
            putList.size() + " full, consider committing more frequently, " +
            "increasing capacity or increasing thread count" );
      }
      putByteCounter += eventByteSize;
    }

counter的注册使用,以memorychannel相关为例:
ChannelCounter扩展了MonitoredCounterGroup类并实现了ChannelCounterMBean接口
MonitoredCounterGroup是一个抽象类,其具体的实现类定义了具体的组件的性能计数器和对应的封装方法

ChannelCounter中包含的所有的可用的counter:

private static final String[] ATTRIBUTES = {
    COUNTER_CHANNEL_SIZE, COUNTER_EVENT_PUT_ATTEMPT,
    COUNTER_EVENT_TAKE_ATTEMPT, COUNTER_EVENT_PUT_SUCCESS,
    COUNTER_EVENT_TAKE_SUCCESS, COUNTER_CHANNEL_CAPACITY
  };

ChannelCounter的构造方法调用MonitoredCounterGroup的构造方法:

  public ChannelCounter(String name) {
    super(MonitoredCounterGroup.Type. CHANNEL, name, ATTRIBUTES ); //调用MonitoredCounterGroup构造方法
  }
MonitoredCounterGroup构造方法:
private final Map<String, AtomicLong> counterMap;
....
  protected MonitoredCounterGroup(Type type, String name, String... attrs) {
    this. type = type;
    this. name = name;
    Map<String, AtomicLong> counterInitMap = new HashMap<String, AtomicLong>(); // 声明一个初始的hashmap,用来存放counter name到value的对应关系
    // Initialize the counters
    for (String attribute : attrs) {
      counterInitMap.put(attribute, new AtomicLong(0L)); //初始value都为0
    }
    counterMap = Collections.unmodifiableMap(counterInitMap); //返回hashmap不可更改的映射视图
    startTime = new AtomicLong(0L);
    stopTime = new AtomicLong(0L);
  }

这里Type是一个enum类型,可取值:

  public static enum Type {
    SOURCE,
    CHANNEL_PROCESSOR,
    CHANNEL,
    SINK_PROCESSOR,
    SINK,
    INTERCEPTOR,
    SERIALIZER,
    OTHER
  };

在MemoryChannel中的start方法中启动:

  public synchronized void start() {
    channelCounter.start(); //调用MonitoredCounterGroup的start方法
    channelCounter.setChannelSize( queue.size());
    channelCounter.setChannelCapacity(Long. valueOf(
            queue.size() + queue.remainingCapacity()));
    super.start();
  }

MonitoredCounterGroup.start:

  public void start() {
    register(); //调用register方法注册counter,主要是调用ManagementFactory. getPlatformMBeanServer().registerMBean( this, objName);进行注册mbean操作,把ChannelCounter对象作为mbean进行注册
    stopTime.set(0L);
    for (String counter : counterMap.keySet()) {
      counterMap.get(counter).set(0L); //设置值都为0
    }
    startTime.set(System. currentTimeMillis());
    logger.info( "Component type: " + type + ", name: " + name + " started" );
  }

这样就可以通过jmx获取的监控项

时间: 2024-10-08 10:04:20

flume channel monitor实现的相关文章

[flume] channel 和 sink

上周把安卓日志手机的客户端sdk完成跑通,这周开始调试日志服务器端. 使用flume进行日志收集,然后转kafka.在测试的时候总是发现漏掉一些event,后来才知道对 channel 和 sink 的使用有误.当多个sink使用同一个channel时,event是会分流共同消耗的,而不是每个sink都复制.最后,改成多个channel,每个channel对应一个sink的模式. 有篇不错的博客,http://shiyanjun.cn/archives/915.html

Flume Channel Selector

Flume 基于Channel Selector可以实现扇入.扇出. 同一个数据源分发到不同的目的,如下图. 在source上可以定义channel selector: 1 2 3 4 5 6 7 8 9 a1.sources=r1 ... a1.channels=c1 c2 ... a1.sources.r1.selector.type=multiplexing a1.sources.r1.selector.header=type a1.sources.r1.selector.mapping.

Flume NG 学习笔记(十) Transaction、Sink、Source和Channel开发

版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] 一.Transaction interface Transaction接口是基于flume的稳定性考虑的.所有主要的组件(sources.sinks.channels)都必须使用Flume Transaction.我们也可以理解Transaction接口就是flume的事务,sources和sinks的发送数据与接受数据都是在一个Transaction里完成的. 从上图中可以看出,一个Transaction在Channel实

flume单channel多sink的测试

说明: 该结果是亲自测试,只提供简单的数据分析,很简陋,结果可能不准确. 先说一下结果,多sink可以直接按常规配置,这样的话每个sink会启动一个sinkrunner,相当于每个线程一个sink,互不干扰,负载均衡是通过channel实现的,效率会提高为n倍,如果在此基础上加入 sinkgroup,则sinkgroup会启动一个sinkrunner,就是单线程,sinkgroup从channel中读取数据,然后分发到下面挂载的sink中,效率和单sink一样,没有提高,但是可以实现两个sink

flume组件汇总 source、sink、channel

Flume Source Source类型 说明 Avro Source 支持Avro协议(实际上是Avro RPC),内置支持 Thrift Source 支持Thrift协议,内置支持 Exec Source 基于Unix的command在标准输出上生产数据 JMS Source 从JMS系统(消息.主题)中读取数据,ActiveMQ已经测试过 Spooling Directory Source 监控指定目录内数据变更 Twitter 1% firehose Source 通过API持续下载

flume file channel 异常解决

1. 错误提示 2016-04-21 05:40:51,393 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows. java.lang.IllegalStateException: Channel closed [

flume的source, channel, sink 列表

Flume Source Source类型 说明 Avro Source 支持Avro协议(实际上是Avro RPC),内置支持 Thrift Source 支持Thrift协议,内置支持 Exec Source 基于Unix的command在标准输出上生产数据 JMS Source 从JMS系统(消息.主题)中读取数据,ActiveMQ已经测试过 Spooling Directory Source 监控指定目录内数据变更 Twitter 1% firehose Source 通过API持续下载

flume 收集日志,写入hdfs

首先安装flume: 建议和Hadoop保持统一用户来安装Hadoop,flume 本次我采用Hadoop用户安装flume http://douya.blog.51cto.com/6173221/1860390 开始配置: 1,配置文件编写: vim  flume_hdfs.conf # Define a memory channel called ch1 on agent1 agent1.channels.ch1.type = memory agent1.channels.ch1.capac

自定义Flume Sink:ElasticSearch Sink

Flume Sink的目的是从Flume Channel中获取数据然后输出到存储或者其他Flume Source中.Flume Agent启动的时候,它会为每一个Sink都启动一个SinkRunner的对象,SinkRunner.start()方法会启动一个新的线程去管理每一个Sink的生命周期.每一个Sink需要实现start().Stop()和process()方法.你可以在start方法中去初始化Sink的参数和状态,在stop方法中清理Sink的资源.最关键的是process方法,它将处