Flume NG源代码分析(二)支持执行时动态改动配置的配置模块

在上一篇中讲了Flume NG配置模块主要的接口的类,PropertiesConfigurationProvider提供了基于properties配置文件的静态配置的能力,这篇细说一下PollingPropertiesFileConfigurationProvider提供的执行时动态改动配置并生效的能力。

要实现动态改动配置文件并生效,主要有两个待实现的功能

1. 观察配置文件是否改动

2. 假设改动,将改动的内容通知给观察者

对于第一点,监控配置文件是否改动,Flume NG定义了一个FileWatcherRunnable对象来监控配置文件,启动了一个单独的线程採用定时轮询的方式来监控,轮询频率是30毫秒一次。比較file.lastModified属性与lastChange时间戳,当file.lastModified > lastChange时表示文件被改动

public class FileWatcherRunnable implements Runnable {

    private final File file;
    private final CounterGroup counterGroup;

    private long lastChange;

    public FileWatcherRunnable(File file, CounterGroup counterGroup) {
      super();
      this.file = file;
      this.counterGroup = counterGroup;
      this.lastChange = 0L;
    }

    @Override
    public void run() {
      LOGGER.debug("Checking file:{} for changes", file);

      counterGroup.incrementAndGet("file.checks");

      long lastModified = file.lastModified();

      if (lastModified > lastChange) {
        LOGGER.info("Reloading configuration file:{}", file);

        counterGroup.incrementAndGet("file.loads");

        lastChange = lastModified;

        try {
          eventBus.post(getConfiguration());
        } catch (Exception e) {
          LOGGER.error("Failed to load configuration data. Exception follows.",
              e);
        } catch (NoClassDefFoundError e) {
          LOGGER.error("Failed to start agent because dependencies were not " +
              "found in classpath. Error follows.", e);
        } catch (Throwable t) {
          // caught because the caller does not handle or log Throwables
          LOGGER.error("Unhandled error", t);
        }
      }
    }
  }

// PollingPropertiesFileConfigurationProvider.start()启动一个单独的线程来监控properties配置文件
 public void start() {
    LOGGER.info("Configuration provider starting");

    Preconditions.checkState(file != null,
        "The parameter file must not be null");

    executorService = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")
                .build());

    FileWatcherRunnable fileWatcherRunnable =
        new FileWatcherRunnable(file, counterGroup);

    executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval,
        TimeUnit.SECONDS);

    lifecycleState = LifecycleState.START;

    LOGGER.debug("Configuration provider started");
  }

对于第二点,利用Guava EventBus提供的公布订阅模式机制,将配置改动封装成事件传递给Application。来又一次载入配置

// FileWatcherRunnable.run方法 公布配置改动的事件
   eventBus.post(getConfiguration());

// Application.main方法来注冊事件订阅
      Application application;
      if(reload) {
        EventBus eventBus = new EventBus(agentName + "-event-bus");
        PollingPropertiesFileConfigurationProvider configurationProvider =
            new PollingPropertiesFileConfigurationProvider(agentName,
                configurationFile, eventBus, 30);
        components.add(configurationProvider);
        application = new Application(components);
        eventBus.register(application);
      }

// Application类採用@Subscribe标注来定义订阅方法,即配置改动后会运行handleConfigurationEvent方法,这种方法是线程安全的

@Subscribe
  public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
    stopAllComponents();
    startAllComponents(conf);
  }
时间: 2024-12-19 13:51:14

Flume NG源代码分析(二)支持执行时动态改动配置的配置模块的相关文章

CSAPP Tiny web server源代码分析及搭建执行

1. Web基础 webclient和server之间的交互使用的是一个基于文本的应用级协议HTTP(超文本传输协议). 一个webclient(即浏览器)打开一个到server的因特网连接,而且请求某些内容.server响应所请求的内容,然后关闭连接.浏览器读取这些内容.并把它显示在屏幕上. 对于webclient和server而言.内容是与一个MIME类型相关的字节序列. 常见的MIME类型: MIME类型 描写叙述 text/html HTML页面 text/plain 无格式文本 ima

【原创】Kakfa utils源代码分析(二)

我们继续研究kafka.utils包 八.KafkaScheduler.scala 首先该文件定义了一个trait:Scheduler——它就是运行任务的一个调度器.任务调度的方式支持重复执行的后台任务或是一次性的延时任务.这个trait定义了三个抽象方法: 1. startup: 启动调度器,用于接收调度任务 2. shutdown: 关闭调度器.一旦关闭就不再执行调度任务了,即使是那些晚于关闭时刻的任务. 3. schedule: 调度一个任务的执行.方法接收4个参数 3.1 任务名称 3.

【原创】kafka server源代码分析(二)

十四.AbstractFetcherManager.scala 该scala定义了两个case类和一个抽象类.两个case类很简单: 1. BrokerAndFectherId:封装了一个broker和一个fetcher的数据结构 2. BrokerAndInitialOffset:封装了broker和初始位移的一个数据结构 该scala中最核心的还是那个抽象类:AbstractFetcherManager.它维护了一个获取线程的map,主要保存broker id + fetcher id对应的

【原创】kafka controller源代码分析(二)

四.TopicDeletionManager.scala 管理topic删除的状态机,具体逻辑如下: TopicCommand发送topic删除命令,在zk的/admin/delete_topics目录下创建topic节点 controller会监听该zk目录下任何节点的变更并为对应的topic开启删除操作 controller开启一个后台线程处理topic的删除.使用该线程主要为了以后能够增加TTL(time to live)的特性.无论何时开启或重启topic删除操作时都会通知该线程.当前,

【原创】Kafka console consumer源代码分析(二)

我们继续讨论console consumer的实现原理,本篇着重探讨ZookeeperConsumerConnector的使用,即后续所有的内容都由下面这条语句而起: val connector = Consumer.create(config) 那么问题来了?这条语句后面执行了什么呢?我们先看create方法的定义 def create(config: ConsumerConfig): ConsumerConnector = { val consumerConnect = new Zookee

[Android]Fragment源代码分析(二) 状态

我们上一讲,抛出来一个问题,就是当Activity的onCreateView的时候,是怎样构造Fragment中的View參数.要回答这个问题我们先要了解Fragment的状态,这是Fragment管理中很重要的一环.我们先来看一下FragmentActivity提供的一些核心回调: @Override protected void onCreate(Bundle savedInstanceState) { mFragments.attachActivity(this, mContainer,

源代码分析语言支持列表

序号 开发语言 代码质量规则 代码安全规则 1 Oracle Forms × × 2 PLSQL √ × 3 XML × √ 4 JAVA √ √ 5 JSP √ √ 6 ASP × √ 7 JavaScript × √ 8 PHP × √ 9 Android-Java √ √ 10 ProC/C √ √ 11 C++ √ √ 12 Object-C × √ 13 C# × ×

12、手把手教你Extjs5(十二)执行菜单命令在tabPanel中显示模块

上面设计好了一个模块的主界面,下面通过菜单命令的执行来把这个模块加入到主界面当中.在MainModule.js中有一个函数,生成了当前的菜单数据: // 根据data.systemMenu生成菜单条和菜单按钮下面使用的菜单数据 getMenus : function() { var items = []; var menuData = this.get('systemMenu'); // 取得定义好的菜单数据 Ext.Array.each(menuData, function(group) {

Android学习笔记(十四)——在执行时加入碎片(附源代码)

在执行时加入碎片 点击获取源代码 将UI切割为多个可配置的部分是碎片的优势之中的一个,但其真正强大之处在于可在执行时动态地把它们加入到活动中. 1.使用上一篇创建的Fragments项目,在main.xml文件里凝视掉两个<fragment>元素: 2.在FragmentActivity.java中加入以下的代码: FragmentManager fragmentManager = getSupportFragmentManager();//向活动加入碎片 FragmentTransactio