Flume之核心架构深入解析

我们一起来了解Source、Channel和Sink的全链路过程。

一、Flume架构分析

这个图中核心的组件是:

Source,ChannelProcessor,Channel,Sink。他们的关系结构如下:

Source  {
    ChannelProcessor  {
             Channel  ch1
             Channel  ch2
             …
    }
}
Sink  {
   Channel  ch;
}
SinkGroup {
   Channel ch;
   Sink s1;
   Sink s2;
   …
}

二、各组件详细介绍

1、Source组件

Source是数据源的总称,我们往往设定好源后,数据将源源不断的被抓取或者被推送。

常见的数据源有:ExecSource,KafkaSource,HttpSource,NetcatSource,JmsSource,AvroSource等等。

所有的数据源统一实现一个接口类如下:

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Source extends LifecycleAware, NamedComponent {

  /**
   * Specifies which channel processor will handle this source‘s events.
   *
   * @param channelProcessor
   */
  public void setChannelProcessor(ChannelProcessor channelProcessor);

  /**
   * Returns the channel processor that will handle this source‘s events.
   */
  public ChannelProcessor getChannelProcessor();

}

Source提供了两种机制: PollableSource(轮询拉取)和EventDrivenSource(事件驱动):

上图展示的Source继承关系类图。

通过类图我们可以看到NetcatSource,ExecSource和HttpSource属于事件驱动模型。KafkaSource,SequenceGeneratorSource和JmsSource属于轮询拉取模型。

Source接口继承了LifecycleAware接口,它的的所有逻辑的实现在接口的start和stop方法中进行。

下图是类关系方法图:

Source接口定义的是最终的实现过程,比如通过日志抓取日志,这个抓取的过程和实际操作就是在对应的Source实现中,比如:ExecSource。那么这些Source实现由谁来驱动的呢?现在我们将介绍SourceRunner类。看一下类继承结构图:

我们看一下PollableSourceRunner和EventDrivenSourceRunner的具体实现:

//PollableSourceRunner:
public void start() {
  PollableSource source = (PollableSource) getSource();
  ChannelProcessor cp = source.getChannelProcessor();
  cp.initialize();
  source.start();

  runner = new PollingRunner();

  runner.source = source; //Source实现类就在这里被赋与。
  runner.counterGroup = counterGroup;
  runner.shouldStop = shouldStop;

  runnerThread = new Thread(runner);
  runnerThread.setName(getClass().getSimpleName() + "-" +
      source.getClass().getSimpleName() + "-" + source.getName());
  runnerThread.start();

  lifecycleState = LifecycleState.START;
}

//EventDrivenSourceRunner:
@Override
public void start() {
  Source source = getSource();
  ChannelProcessor cp = source.getChannelProcessor();
  cp.initialize();
  source.start();
  lifecycleState = LifecycleState.START;
}

注:其实所有的Source实现类内部都维护着线程,执行source.start()其实就是启动了相应的线程。

刚才我们看代码,代码中一直都在展示channelProcessor这个类,同时最上面架构设计图里面也提到了这个类,那它到底是干什么呢,下面我们就对其分解。

2、Channel组件

Channel用于连接Source和Sink,Source将日志信息发送到Channel,Sink从Channel消费日志信息;Channel是中转日志信息的一个临时存储,保存有Source组件传递过来的日志信息。

先看代码如下:

ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration();

ChannelSelector selector = ChannelSelectorFactory.create(sourceChannels, selectorConfig);

ChannelProcessor channelProcessor = new ChannelProcessor(selector);
Configurables.configure(channelProcessor, config);

source.setChannelProcessor(channelProcessor);

ChannelSelectorFactory.create方法实现如下:

public static ChannelSelector create(List<Channel> channels,
    ChannelSelectorConfiguration conf) {
  String type = ChannelSelectorType.REPLICATING.toString();
  if (conf != null){
    type = conf.getType();
  }
  ChannelSelector selector = getSelectorForType(type);
  selector.setChannels(channels);
  Configurables.configure(selector, conf);
  return selector;
}

其中我们看一下ChannelSelectorType这个枚举类,包括了几种类型:

public enum ChannelSelectorType {

  /**
   * Place holder for custom channel selectors not part of this enumeration.
   */
  OTHER(null),

  /**
   * 复用通道选择器
   */
  REPLICATING("org.apache.flume.channel.ReplicatingChannelSelector"),

  /**
   *  多路通道选择器
   */
  MULTIPLEXING("org.apache.flume.channel.MultiplexingChannelSelector");
}

ChannelSelector的类结构图如下所示:

注:RelicatingChannelSelector和MultiplexingChannelSelector是二个通道选择器,第一个是复用型通道选择器,也就是的默认的方式,会把接收到的消息发送给其他每个channel。第二个是多路通道选择器,这个会根据消息header中的参数进行通道选择。

说完通道选择器,正式来解释Channel是什么,先看一个接口类:

public interface Channel extends LifecycleAware, NamedComponent {
  public void put(Event event) throws ChannelException;
  public Event take() throws ChannelException;
  public Transaction getTransaction();
}

注:put方法是用来发送消息,take方法是获取消息,transaction是用于事务操作。

类结构图如下:

3、Sink组件

Sink负责取出Channel中的消息数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。

Sink在设置存储数据时,可以向文件系统中,数据库中,hadoop中储数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。

Sink接口类内容如下:

public interface Sink extends LifecycleAware, NamedComponent {
  public void setChannel(Channel channel);
  public Channel getChannel();
  public Status process() throws EventDeliveryException;
  public static enum Status {
    READY, BACKOFF
  }
}

Sink是通过如下代码进行的创建:

Sink sink = sinkFactory.create(comp.getComponentName(),  comp.getType());

DefaultSinkFactory.create方法如下:

public Sink create(String name, String type) throws FlumeException {
  Preconditions.checkNotNull(name, "name");
  Preconditions.checkNotNull(type, "type");
  logger.info("Creating instance of sink: {}, type: {}", name, type);
  Class<? extends Sink> sinkClass = getClass(type);
  try {
    Sink sink = sinkClass.newInstance();
    sink.setName(name);
    return sink;
  } catch (Exception ex) {
    System.out.println(ex);
    throw new FlumeException("Unable to create sink: " + name
        + ", type: " + type + ", class: " + sinkClass.getName(), ex);
  }
}

注:Sink是通过SinkFactory工厂来创建,提供了DefaultSinkFactory默认工厂,程序会查找org.apache.flume.conf.sink.SinkType这个枚举类找到相应的Sink处理类,比如:org.apache.flume.sink.LoggerSink,如果没找到对应的处理类,直接通过Class.forName(className)进行直接查找实例化实现类。

Sink的类结构图如下:

与ChannelProcessor处理类对应的是SinkProcessor,由SinkProcessorFactory工厂类负责创建,SinkProcessor的类型由一个枚举类提供,看下面代码:

public enum SinkProcessorType {
  /**
   * Place holder for custom sinks not part of this enumeration.
   */
  OTHER(null),

  /**
   * 故障转移 processor
   *
   * @see org.apache.flume.sink.FailoverSinkProcessor
   */
  FAILOVER("org.apache.flume.sink.FailoverSinkProcessor"),

  /**
   * 默认processor
   *
   * @see org.apache.flume.sink.DefaultSinkProcessor
   */
  DEFAULT("org.apache.flume.sink.DefaultSinkProcessor"),

  /**
   * 负载processor
   *
   * @see org.apache.flume.sink.LoadBalancingSinkProcessor
   */
  LOAD_BALANCE("org.apache.flume.sink.LoadBalancingSinkProcessor");

  private final String processorClassName;

  private SinkProcessorType(String processorClassName) {
    this.processorClassName = processorClassName;
  }

  public String getSinkProcessorClassName() {
    return processorClassName;
  }
}

SinkProcessor的类结构图如下:

说明:

1、FailoverSinkProcessor是故障转移处理器,当sink从通道拿数据信息时出错进行的相关处理,代码如下:

public Status process() throws EventDeliveryException {
  // 经过了冷却时间,再次发起重试
  Long now = System.currentTimeMillis();
  while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {
    //从失败队列中获取sink节点
    FailedSink cur = failedSinks.poll();
    Status s;
    try {
      //调用相应sink进行处理,比如将channel的数据读取存放到文件中,
      //这个存放文件的动作就在process中进行。
      s = cur.getSink().process();
      if (s  == Status.READY) {
        //如果处理成功,则放到存活队列中
        liveSinks.put(cur.getPriority(), cur.getSink());
        activeSink = liveSinks.get(liveSinks.lastKey());
        logger.debug("Sink {} was recovered from the fail list",
                cur.getSink().getName());
      } else {
        // if it‘s a backoff it needn‘t be penalized.
        //如果处理失败,则继续放到失败队列中
        failedSinks.add(cur);
      }
      return s;
    } catch (Exception e) {
      cur.incFails();
      failedSinks.add(cur);
    }
  }

  Status ret = null;
  while(activeSink != null) {
    try {
      ret = activeSink.process();
      return ret;
    } catch (Exception e) {
      logger.warn("Sink {} failed and has been sent to failover list",
              activeSink.getName(), e);
      activeSink = moveActiveToDeadAndGetNext();
    }
  }

2、LoadBalancingSinkProcessor是负载Sink处理器

首先我们和ChannelProcessor一样,我们也要重点说明一下SinkSelector这个选择器。

先看一下SinkSelector.configure方法的部分代码:

if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_ROUND_ROBIN)) {
  selector = new RoundRobinSinkSelector(shouldBackOff);
} else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM)) {
  selector = new RandomOrderSinkSelector(shouldBackOff);
} else {
  try {
    @SuppressWarnings("unchecked")
    Class<? extends SinkSelector> klass = (Class<? extends SinkSelector>)
        Class.forName(selectorTypeName);

    selector = klass.newInstance();
  } catch (Exception ex) {
    throw new FlumeException("Unable to instantiate sink selector: "
        + selectorTypeName, ex);
  }
}

结合上面的代码,再看类结构图如下:

注:RoundRobinSinkSelector是轮询选择器,RandomOrderSinkSelector是随机分配选择器。

最后我们以KafkaSink为例看一下Sink里面的具体实现:

public Status process() throws EventDeliveryException {
  Status result = Status.READY;
  Channel channel = getChannel();
  Transaction transaction = null;
  Event event = null;
  String eventTopic = null;
  String eventKey = null;

  try {
    long processedEvents = 0;

    transaction = channel.getTransaction();
    transaction.begin();

    messageList.clear();
    for (; processedEvents < batchSize; processedEvents += 1) {
      event = channel.take();

      if (event == null) {
        // no events available in channel
        break;
      }

      byte[] eventBody = event.getBody();
      Map<String, String> headers = event.getHeaders();

      if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
        eventTopic = topic;
      }

      eventKey = headers.get(KEY_HDR);

      if (logger.isDebugEnabled()) {
        logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "
          + new String(eventBody, "UTF-8"));
        logger.debug("event #{}", processedEvents);
      }

      // create a message and add to buffer
      KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
        (eventTopic, eventKey, eventBody);
      messageList.add(data);

    }

    // publish batch and commit.
    if (processedEvents > 0) {
      long startTime = System.nanoTime();
      producer.send(messageList);
      long endTime = System.nanoTime();
      counter.addToKafkaEventSendTimer((endTime-startTime)/(1000*1000));
      counter.addToEventDrainSuccessCount(Long.valueOf(messageList.size()));
    }

    transaction.commit();

  } catch (Exception ex) {
    String errorMsg = "Failed to publish events";
    logger.error("Failed to publish events", ex);
    result = Status.BACKOFF;
    if (transaction != null) {
      try {
        transaction.rollback();
        counter.incrementRollbackCount();
      } catch (Exception e) {
        logger.error("Transaction rollback failed", e);
        throw Throwables.propagate(e);
      }
    }
    throw new EventDeliveryException(errorMsg, ex);
  } finally {
    if (transaction != null) {
      transaction.close();
    }
  }

  return result;
}

注:方法从channel中不断的获取数据,然后通过Kafka的producer生产者将消息发送到Kafka里面。

时间: 2024-12-22 15:52:12

Flume之核心架构深入解析的相关文章

移动支付平台间接口报文解析技术核心架构实现、及平台交易处理项目全程实录教程

<基于移动支付平台间接口报文解析技术核心架构实现.及平台交易处理项目全程实录>课程讲师:MoMo 课程分类:Java框架适合人群:中级课时数量:52课时用到技术:JavaBean .Spring3.X. SpringMVC. Hibernate3.X.Apache HttpClient 3.x.JUnit4.x.自定义Annotation + java反射技术涉及项目:移动支付平台间接口咨询QQ:1337192913 课程介绍:   本课程抛开理论.以项目为驱动,适用于初次接触报文收发.组装解

移动支付平台间接口报文解析核心架构及平台交易全程实录

移动支付平台间接口报文解析核心架构及平台交易全程实录 (HttpClient+SpringMVC+Spring3+Hibernate3+自定义Annotation) 课程分类:Java框架 适合人群:中级 课时数量:52课时 用到技术:JavaBean .Spring3.X. SpringMVC. Hibernate3.X.Apache HttpClient 3.x.JUnit4.x.自定义Annotation + java反射技术 涉及项目:移动支付平台间接口 咨询qq:1840215592

netty源码解解析(4.0)-1 核心架构

netty是java开源社区的一个优秀的网络框架.使用netty,我们可以迅速地开发出稳定,高性能,安全的,扩展性良好的服务器应用程序.netty封装简化了在服务器开发领域的一些有挑战性的问题:jdk nio的使用:多线程并发:扩展性.它还提供了多种应用层协议的支持:http/https/websock, protobuf, 自定义协议, 简化了服务器协议的开发. netty是一个基于事件驱动的框架,它把事件分成两种类型:输入事件(inbound)和输出事件(outbound), 整个框架都是围

Flume入门笔记------架构以及应用介绍

在具体介绍本文内容之前,先给大家看一下Hadoop业务的整体开发流程: 从Hadoop的业务开发流程图中可以看出,在大数据的业务处理过程中,对于数据的采集是十分重要的一步,也是不可避免的一步,从而引出我们本文的主角-Flume.本文将围绕Flume的架构.Flume的应用(日志采集)进行详细的介绍. (一)Flume架构介绍 1.Flume的概念 flume是分布式的日志收集系统,它将各个服务器中的数据收集起来并送到指定的地方去,比如说送到图中的HDFS,简单来说flume就是收集日志的. 2.

SpringMVC核心架构的具体流程

核心架构的具体流程步骤如下: 1.首先用户发送请求-->DispatcherServlet,前端控制器收到请求后自己不进行处理,而是委托给其他的解析器进行 处理,作为统一访问点,进行全局的流程控制: 2.DispatcherServlet-->HandlerMapping, HandlerMapping 将会把请求映射为HandlerExecutionChain 对象(包含一 个Handler 处理器(页面控制器)对象.多个HandlerInterceptor 拦截器)对象,通过这种策略模式,

SpringMVC核心架构具体流程

从网上挑出来的 孔子说了:磨刀不误砍柴工,先把这些基本而核心的概念弄懂了再下手,也不晚(不为过) Spring MVC核心架构的具体流程(步骤)如下: 1.首先用户发送请求  DispatcherServlet,前端控制器收到请求后自己不进行处理,而是委托给其它的解析器进行处理,作为统一访问点,进行全局的流程控制 2.DispatcherServlet  HandlerMapping,HandlerMapping将会把请求映射为HandlerExecutionChain对象(包含一个Handle

Hbase架构原理解析

Hbase架构原理解析 https://developer.51cto.com/art/201904/595698.htm HBase 架构 HBase 的架构似乎也是 master-slave 架构,和 HDFS 有点像,HMaster 是用来管理集群,HRegionServer 是真正存储数据的地方 HBase 在数据查询和写入的时候,其实并不是像 HDFS 那样询问 HMaster. 在 HBase 中,每一张表都会有元信息,这些信息也是被存储为 HBase 表,称为元信息表,也叫 Met

《云计算架构技术与实践》连载(13)2.3 云计算核心架构竞争力衡量维度

版权全部,未经华为书面许可.请勿转载或转发 2.3       云计算核心架构竞争力衡量维度 从将云计算技术引入传统数据中心所带来的独特商业价值角度看,重点能够从开源与节流两个方面来衡量云计算的核心竞争力. 节流(Cost Saving)方面 在业务系统搭建过程中.云计算和虚拟化使得企业及运营商的烟囱式软件应用能够突破应用边界的束缚,充分共享企业范围内.行业范围内.甚至全球范围内公用的"IT资源池",无需採购和安装实际物理形态的server.交换机及存储硬件,而是依赖于向集中的&quo

微信核心功能全解析

最近做了一套及时通讯软件,其中很多功能和微信是相仿的,下面详细介绍一下具体实现. 做及时通讯肯定要用xmpp协议,微信和一些及时通讯软件也是用的这套协议,只是纵向开发深度不同. 1.复写语音按钮 @SuppressLint("NewApi") public class RecordButton extends Button  { public RecordButton(Context context) { super(context); init(); } public RecordB