【原创】大数据基础之Flume(2)Sink代码解析

flume sink核心类结构

1 核心接口Sink

org.apache.flume.Sink

  /**
   * <p>Requests the sink to attempt to consume data from attached channel</p>
   * <p><strong>Note</strong>: This method should be consuming from the channel
   * within the bounds of a Transaction. On successful delivery, the transaction
   * should be committed, and on failure it should be rolled back.
   * @return READY if 1 or more Events were successfully delivered, BACKOFF if
   * no data could be retrieved from the channel feeding this sink
   * @throws EventDeliveryException In case of any kind of failure to
   * deliver data to the next hop destination.
   */
  public Status process() throws EventDeliveryException;

  public static enum Status {
    READY, BACKOFF
  }

process为核心接口,返回值为状态,只有两个:ready和backoff,调用方会根据返回值做相应处理,后边会看到;
这个接口也是扩展flume sink需要实现的接口,比如KuduSink;

2 Sink封装

org.apache.flume.SinkProcessor

/**
 * <p>
 * Interface for a device that allows abstraction of the behavior of multiple
 * sinks, always assigned to a SinkRunner
 * </p>
 * <p>
 * A sink processors {@link SinkProcessor#process()} method will only be
 * accessed by a single runner thread. However configuration methods
 * such as {@link Configurable#configure} may be concurrently accessed.
 *
 * @see org.apache.flume.Sink
 * @see org.apache.flume.SinkRunner
 * @see org.apache.flume.sink.SinkGroup
 */
public interface SinkProcessor extends LifecycleAware, Configurable {
  /**
   * <p>Handle a request to poll the owned sinks.</p>
   *
   * <p>The processor is expected to call {@linkplain Sink#process()} on
   *  whatever sink(s) appropriate, handling failures as appropriate and
   *  throwing {@link EventDeliveryException} when there is a failure to
   *  deliver any events according to the delivery policy defined by the
   *  sink processor implementation. See specific implementations of this
   *  interface for delivery behavior and policies.</p>
   *
   * @return Returns {@code READY} if events were successfully consumed,
   * or {@code BACKOFF} if no events were available in the channel to consume.
   * @throws EventDeliveryException if the behavior guaranteed by the processor
   * couldn‘t be carried out.
   */
  Status process() throws EventDeliveryException;

这个类负责封装单个sink或者sink group的处理,常用的子类有:

1)单个sink

org.apache.flume.sink.DefaultSinkProcessor

  @Override
  public Status process() throws EventDeliveryException {
    return sink.process();
  }

DefaultSinkProcessor的process会直接调用内部sink的process;

2)sink group

org.apache.flume.sink.LoadBalancingSinkProcessor
org.apache.flume.sink.FailoverSinkProcessor.FailedSink

3 sink的调用方为SinkRunner

org.apache.flume.SinkRunner

/**
 * <p>
 * A driver for {@linkplain Sink sinks} that polls them, attempting to
 * {@linkplain Sink#process() process} events if any are available in the
 * {@link Channel}.
 * </p>
 *
 * <p>
 * Note that, unlike {@linkplain Source sources}, all sinks are polled.
 * </p>
 *
 * @see org.apache.flume.Sink
 * @see org.apache.flume.SourceRunner
 */
public class SinkRunner implements LifecycleAware {
...
  private static final long backoffSleepIncrement = 1000;
  private static final long maxBackoffSleep = 5000;

org.apache.flume.SinkRunner.PollingRunner

  public static class PollingRunner implements Runnable {

    private SinkProcessor policy;
    private AtomicBoolean shouldStop;
    private CounterGroup counterGroup;

    @Override
    public void run() {
      logger.debug("Polling sink runner starting");

      while (!shouldStop.get()) {
        try {
          if (policy.process().equals(Sink.Status.BACKOFF)) {
            counterGroup.incrementAndGet("runner.backoffs");

            Thread.sleep(Math.min(
                counterGroup.incrementAndGet("runner.backoffs.consecutive")
                * backoffSleepIncrement, maxBackoffSleep));
          } else {
            counterGroup.set("runner.backoffs.consecutive", 0L);
          }
        } catch (InterruptedException e) {
          logger.debug("Interrupted while processing an event. Exiting.");
          counterGroup.incrementAndGet("runner.interruptions");
        } catch (Exception e) {
          logger.error("Unable to deliver event. Exception follows.", e);
          if (e instanceof EventDeliveryException) {
            counterGroup.incrementAndGet("runner.deliveryErrors");
          } else {
            counterGroup.incrementAndGet("runner.errors");
          }
          try {
            Thread.sleep(maxBackoffSleep);
          } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
          }
        }
      }
      logger.debug("Polling runner exiting. Metrics:{}", counterGroup);
    }

  }

无论process返回backoff或者抛exception,都会sleep一段时间,所以flume的sink一旦遇到大量异常数据或者自定义sink返回backoff,都会非常慢;

原文地址:https://www.cnblogs.com/barneywill/p/10570545.html

时间: 2024-08-02 17:30:52

【原创】大数据基础之Flume(2)Sink代码解析的相关文章

大数据基础(1)zookeeper源代码解析

五 源代码解析 public enum ServerState { LOOKING, FOLLOWING, LEADING, OBSERVING;}zookeeper服务器状态:刚启动LOOKING,follower是FOLLOWING,leader是LEADING,observer是OBSERVING: public enum LearnerType { PARTICIPANT, OBSERVER;} 简单来说,zookeeper启动的核心类是QuorumPeerMain,启动之后会加载配置,

区块链这些技术与h5房卡斗牛平台出售,大数据基础软件干货不容错过

在IT产业发展中,包括CPU.操作系统h5房卡斗牛平台出售 官网:h5.super-mans.com 企娥:2012035031 vx和tel:17061863513 h5房卡斗牛平台出售在内的基础软硬件地位独特,不但让美国赢得了产业发展的先机,成就了产业巨头,而且因为技术.标准和生态形成的壁垒,主宰了整个产业的发展.错失这几十年的发展机遇,对于企业和国家都是痛心的. 当大数据迎面而来,并有望成就一个巨大的应用和产业机会时,企业和国家都虎视眈眈,不想错再失这一难得的机遇.与传统的IT产业一样,大

大数据基础教程:创建RDD的二种方式

大数据基础教程:创建RDD的二种方式 1.从集合中创建RDD val conf = new SparkConf().setAppName("Test").setMaster("local")      val sc = new SparkContext(conf)      //这两个方法都有第二参数是一个默认值2  分片数量(partition的数量)      //scala集合通过makeRDD创建RDD,底层实现也是parallelize      val 

【原创】大数据基础之Impala(1)简介、安装、使用

impala2.12 官方:http://impala.apache.org/ 一 简介 Apache Impala is the open source, native analytic database for Apache Hadoop. Impala is shipped by Cloudera, MapR, Oracle, and Amazon. impala是hadoop上的开源分析性数据库: Do BI-style Queries on Hadoop Impala provides

【原创】大数据基础之Spark(4)RDD原理及代码解析

一 简介 spark核心是RDD,官方文档地址:https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds官方描述如下:重点是可容错,可并行处理 Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant colle

低调、奢华、有内涵的敏捷式大数据方案:Flume+Cassandra+Presto+SpagoBI

基于FacebookPresto+Cassandra的敏捷式大数据 目录 1      概述...3 1.1       Cassandra.3 1.1.1        特点...3 1.1.2        系统架构...4 1.2       Presto.4 1.2.1        特点...4 1.2.2        系统架构...5 2      环境准备...5 2.1       主机...5 2.2       用户...5 2.3       程序包...6 2.4   

学完大数据基础,可以按照我写的顺序学下去

首先给大家介绍什么叫大数据,大数据最早是在2006年谷歌提出来的,百度给他的定义为巨量数据集合,辅相成在今天大数据技术任然随着互联网的发展,更加迅速的成长,小到个人,企业,达到国家安全,大数据的作用可见一斑,也就是近几年大数据这个概念,随着云计算的出现才凸显出其价值,云计算与大数据的关系就像硬币的正反面一样,相密不可分.但是大数据的人才缺失少之又少,这就拖延了大数据的发展.所以人才培养真的很重要. 大数据的定义.大数据,又称巨量资料,指的是所涉及的数据资料量规模巨大到无法通过人脑甚至主流软件工具

大数据基础学习

什么是大数据? 举例: 1.商品推荐:问题: (1)大量的订单如何存储? (2)大量的订单如何计算? 2.天气预报:问题: (1)大量的天气数据如何存储? (2)大量的天气数据如何计算? 如果你想要学好大数据最好加入一个好的学习环境,可以来这个Q群251956502 这样大家学习的话就比较方便,还能够共同交流和分享资料 什么是大数据,本质? (1)数据的存储:分布式文件系统(分布式存储) (2)数据的计算:分布式计算 Java和大数据是什么关系? 1.Hadoop:基于Java语言开发 2.Sp

“大数据“基础知识普及

大数据,官方定义是指那些数据量特别大.数据类别特别复杂的数据集,这种数据集无法用传统的数据库进行存储,管理和处理.大数据的主要特点为数据量大(Volume),数据类别复杂(Variety),数据处理速度快(Velocity)和数据真实性高(Veracity),合起来被称为4V. 大数据中的数据量非常巨大,达到了PB级别.而且这庞大的数据之中,不仅仅包括结构化数据(如数字.符号等数据),还包括非结构化数据(如文本.图像.声音.视频等数据).这使得大数据的存储,管理和处理很难利用传统的关系型数据库去