【Flume】从入口Application来分析flume的source和sink是如何与channel交互的

大家在启动flume的时候,输入的命令就可以看出flume的启动入口了

[[email protected] apache-flume-1.5.2-bin]# sh bin/flume-ng agent -c conf -f conf/server.conf -n a1
Info: Sourcing environment configuration script /home/flume/apache-flume-1.5.2-bin/conf/flume-env.sh
+ exec /home/flume/jdk1.7.0_71/bin/java -server -Xms2048m -Xmx2048m -Xss256K -XX:PermSize=32M -XX:MaxPermSize=512M -XX:+UseConcMarkSweepGC -XX:+DisableExplicitGC -XX:+UseParNewGC -XX:+CMSClassUnloadingEnabled -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+UseCompressedOops -XX:CMSInitiatingOccupancyFraction=70 -XX:+HeapDumpOnOutOfMemoryError -XX:SurvivorRatio=8 -cp '/home/flume/apache-flume-1.5.2-bin/conf:/home/flume/apache-flume-1.5.2-bin/lib/*' -Djava.library.path= org.apache.flume.node.Application -f conf/server.conf -n a1

从这里可以看出flume的启动入口是:org.apache.flume.node.Application

下面我们就来看该入口程序是如何来运行的

找到main函数

附:flume每次启动都会先判断有没有与当前配置的三大组件同名的组件存在,存在的话先停掉该组件,顺序为source,sink,channel

其次是启动所有当前配置的组件,启动顺序为channel,sink,source

通过这个启动停止的顺序可以看出flume也是对数据一致性做了保证的。

if(reload) {
        EventBus eventBus = new EventBus(agentName + "-event-bus");
        PollingPropertiesFileConfigurationProvider configurationProvider =
            new PollingPropertiesFileConfigurationProvider(agentName,
                configurationFile, eventBus, 30);
        components.add(configurationProvider);
        application = new Application(components);
        eventBus.register(application);
      } else {
        PropertiesFileConfigurationProvider configurationProvider =
            new PropertiesFileConfigurationProvider(agentName,
                configurationFile);
        application = new Application();
        application.handleConfigurationEvent(configurationProvider.getConfiguration());
      }

这个if的作用就是是否30秒读一下配置,判断是否有更新

主要看一下对于配置内容的处理,两个分支虽然从代码上看逻辑不一样,但是处理的逻辑是一样的

我们看else分支的代码吧:

看configurationProvider.getConfiguration()

public MaterializedConfiguration getConfiguration() {
    MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
    FlumeConfiguration fconfig = getFlumeConfiguration();
    AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
    if (agentConf != null) {
      Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
      Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
      Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
      try {
        loadChannels(agentConf, channelComponentMap);
        loadSources(agentConf, channelComponentMap, sourceRunnerMap);
        loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
        Set<String> channelNames =
            new HashSet<String>(channelComponentMap.keySet());
        for(String channelName : channelNames) {
          ChannelComponent channelComponent = channelComponentMap.
              get(channelName);
          if(channelComponent.components.isEmpty()) {
            LOGGER.warn(String.format("Channel %s has no components connected" +
                " and has been removed.", channelName));
            channelComponentMap.remove(channelName);
            Map<String, Channel> nameChannelMap = channelCache.
                get(channelComponent.channel.getClass());
            if(nameChannelMap != null) {
              nameChannelMap.remove(channelName);
            }
          } else {
            LOGGER.info(String.format("Channel %s connected to %s",
                channelName, channelComponent.components.toString()));
            conf.addChannel(channelName, channelComponent.channel);
          }
        }
        for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
          conf.addSourceRunner(entry.getKey(), entry.getValue());
        }
        for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
          conf.addSinkRunner(entry.getKey(), entry.getValue());
        }
      } catch (InstantiationException ex) {
        LOGGER.error("Failed to instantiate component", ex);
      } finally {
        channelComponentMap.clear();
        sourceRunnerMap.clear();
        sinkRunnerMap.clear();
      }
    } else {
      LOGGER.warn("No configuration found for this host:{}", getAgentName());
    }
    return conf;
  }

我们看在载入source组件的时候有个方法: SourceRunner.forSource(source)

public static SourceRunner forSource(Source source) {
    SourceRunner runner = null;

    if (source instanceof PollableSource) {
      runner = new PollableSourceRunner();
      ((PollableSourceRunner) runner).setSource((PollableSource) source);
    } else if (source instanceof EventDrivenSource) {
      runner = new EventDrivenSourceRunner();
      ((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
    } else {
      throw new IllegalArgumentException("No known runner type for source "
          + source);
    }

    return runner;
  }

这个方法里面通过对source的类型判断来选择使用哪种SourceRunner

我们来看一个具体例子吧AvroSource,它是事件驱动类型的source——EventDrivenSourceRunner

  public void start() {
    Source source = getSource();
    ChannelProcessor cp = source.getChannelProcessor();
    cp.initialize();
    source.start();
    lifecycleState = LifecycleState.START;
  }

这个方法MonitorRunnable类会来调的,这个类就是负责监控flume的所有组件的

那么什么时候来调呢?一旦调用这个方法,source与channel的交互就开始了

 switch (supervisoree.status.desiredState) {
              case START:
                try {
                  lifecycleAware.start();

上面的代码出现在LifecycleSupervisor类中的内部静态类MonitorRunnable的run方法中,再来看这个线程类谁来调用?

  MonitorRunnable monitorRunnable = new MonitorRunnable();
    monitorRunnable.lifecycleAware = lifecycleAware;
    monitorRunnable.supervisoree = process;
    monitorRunnable.monitorService = monitorService;

    supervisedProcesses.put(lifecycleAware, process);

    ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
        monitorRunnable, 0, 3, TimeUnit.SECONDS);
    monitorFutures.put(lifecycleAware, future);

在LifecycleSupervisor类中supervise方法

从这里我们终于看到核心中的核心了,也就是每隔3秒,source会和channel交互一次。

 Supervisoree process = new Supervisoree();
    process.status = new Status();

    process.policy = policy;
    process.status.desiredState = desiredState;
    process.status.error = false;

那么上面的代码所在方法又是被谁调用的呢?

是Application

public synchronized void start() {
    for(LifecycleAware component : components) {
      supervisor.supervise(component,
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
    }
  }

这样的话,整个链就串起来了

所以从这里看出来source和channel的交互频率是3秒

看完source和channel的交互,再来看sink和channel的交互

到这里再看sink就很简单了,因为flume中三大组件都实现自接口LifecycleAware

所以从flume的入口Application来看,从start开始最终都是到LifecycleSupervisor类的supervise方法,而该方法同样:

MonitorRunnable monitorRunnable = new MonitorRunnable();
    monitorRunnable.lifecycleAware = lifecycleAware;
    monitorRunnable.supervisoree = process;
    monitorRunnable.monitorService = monitorService;

    supervisedProcesses.put(lifecycleAware, process);

    ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
        monitorRunnable, 0, 3, TimeUnit.SECONDS);
    monitorFutures.put(lifecycleAware, future);

这串逻辑,不分具体的source,sink,同样是3秒执行一次。

至此,flume中三大组件的交互以及交互频率就说完了,望各位网友不吝指教!!

时间: 2024-10-28 06:43:29

【Flume】从入口Application来分析flume的source和sink是如何与channel交互的的相关文章

Flume+Hadoop+Hive的离线分析系统基本架构

PS:历史原因作者账号名为:ymh198816,但事实上作者的生日并不是1988年1月6日 最近在学习大数据的离线分析技术,所以在这里通过做一个简单的网站点击流数据分析离线系统来和大家一起梳理一下离线分析系统的架构模型.当然这个架构模型只能是离线分析技术的一个简单的入门级架构,实际生产环境中的大数据离线分析技术还涉及到很多细节的处理和高可用的架构.这篇文章的目的只是带大家入个门,让大家对离线分析技术有一个简单的认识,并和大家一起做学习交流. 离线分析系统的结构图 整个离线分析的总体架构就是使用F

【Flume】【源码分析】flume中事件Event的数据结构分析以及Event分流

前言 首先来看一下flume官网中对Event的定义 一行文本内容会被反序列化成一个event[序列化是将对象状态转换为可保持或传输的格式的过程.与序列化相对的是反序列化,它将流转换为对象.这两个过程结合起来,可以轻松地存储和传输数据],event的最大定义为2048字节,超过,则会切割,剩下的会被放到下一个event中,默认编码是UTF-8,这都是统一的. 但是这个解释是针对Avro反序列化系统中的Event的定义,而flume ng中很多event用的不是这个,所以你只要记住event的数据

【Android 应用开发】 Application 使用分析

博客地址 : http://blog.csdn.net/shulianghan/article/details/40737419 代码下载 : Android 应用 Application 经典用法; -- Github : https://github.com/han1202012/ApplicationDemo -- CSDN : http://download.csdn.net/detail/han1202012/8127247 一. Application 分析 1. Applicati

WPF入口Application

1.WPF和 传统的WinForm 类似, WPF 同样需要一个 Application 来统领一些全局的行为和操作,并且每个 Domain (应用程序域)中只能有一个 Application 实例存在.和 WinForm 不同的是 WPF Application 默认由两部分组成 : App.xaml 和 App.xaml.cs,将定义和行为代码相分离.当然,这个和WebForm 也比较类似.XAML 从严格意义上说并不是一个纯粹的 XML 格式文件,它更像是一种 DSL(Domain Spe

Java application 性能分析分享

性能分析的主要方式 监视:监视是一种用来查看应用程序运行时行为的一般方法.通常会有多个视图(View)分别实时地显示 CPU 使用情况.内存使用情况.线程状态以及其他一些有用的信息,以便用户能很快地发现问题的关键所在. 转储:性能分析工具从内存中获得当前状态数据并存储到文件用于静态的性能分析.Java 程序是通过在启动 Java 程序时添加适当的条件参数来触发转储操作的.它包括以下三种: 系统转储:JVM 生成的本地系统的转储,又称作核心转储.一般的,系统转储数据量大,需要平台相关的工具去分析,

Flume简介与使用(二)——Thrift Source采集数据

Flume简介与使用(二)——Thrift Source采集数据 继上一篇安装Flume后,本篇将介绍如何使用Thrift Source采集数据. Thrift是Google开发的用于跨语言RPC通信,它拥有功能强大的软件堆栈和代码生成引擎,允许定义一个简单的IDL文件来生成不同语言的代码,服务器端和客户端通过共享这个IDL文件来构建来完成通信. Flume的Thrift Source是其实现的众多Source中的一个,Flume已经实现了服务器端,因此我们可以用任意自己熟悉的语言编写自己的Th

flume学习(三):flume将log4j日志数据写入到hdfs(转)

原文链接:flume学习(三):flume将log4j日志数据写入到hdfs 在第一篇文章中我们是将log4j的日志输出到了agent的日志文件当中.配置文件如下: [plain] view plaincopy tier1.sources=source1 tier1.channels=channel1 tier1.sinks=sink1 tier1.sources.source1.type=avro tier1.sources.source1.bind=0.0.0.0 tier1.sources

日志收集系统Flume调研笔记第2篇 - Flume配置及使用实例

上篇笔记对Flume的使用场景和系统架构做了介绍,本篇笔记以实例说明Flume的配置方法.下面开始正文. 1. Flume使用实例 1.1 配置 Flume agent的3个组件及其拓扑关系是在配置文件中指定的,总的原则是必须列出source/channel/sink的name/type等重要的配置项,并通过channel将source(s)和sink(s)连接起来,此外,1个source可以指定多个channel,而1个sink只能接收来自1个channel的数据. 这里给出的是部署1套含1个

Flume -- 初识flume、source和sink

Flume – 初识flume.source和sink 目录基本概念常用源 Source常用sink 基本概念 ? 什么叫flume? 分布式,可靠的大量日志收集.聚合和移动工具. ? events 事件,是一行数据的字节数据,是flume发送文件的基本单位. ? flume配置文件 重命名flume-env.sh.template为flume-env.sh,并添加[export JAVA_HOME=/soft/jdk] ? flume的Agent source //从哪儿读数据. 负责监控并收