在上一篇中讲了Flume NG配置模块主要的接口的类,PropertiesConfigurationProvider提供了基于properties配置文件的静态配置的能力,这篇细说一下PollingPropertiesFileConfigurationProvider提供的执行时动态改动配置并生效的能力。
要实现动态改动配置文件并生效,主要有两个待实现的功能
1. 观察配置文件是否改动
2. 假设改动,将改动的内容通知给观察者
对于第一点,监控配置文件是否改动,Flume NG定义了一个FileWatcherRunnable对象来监控配置文件,启动了一个单独的线程採用定时轮询的方式来监控,轮询频率是30毫秒一次。比較file.lastModified属性与lastChange时间戳,当file.lastModified > lastChange时表示文件被改动
public class FileWatcherRunnable implements Runnable { private final File file; private final CounterGroup counterGroup; private long lastChange; public FileWatcherRunnable(File file, CounterGroup counterGroup) { super(); this.file = file; this.counterGroup = counterGroup; this.lastChange = 0L; } @Override public void run() { LOGGER.debug("Checking file:{} for changes", file); counterGroup.incrementAndGet("file.checks"); long lastModified = file.lastModified(); if (lastModified > lastChange) { LOGGER.info("Reloading configuration file:{}", file); counterGroup.incrementAndGet("file.loads"); lastChange = lastModified; try { eventBus.post(getConfiguration()); } catch (Exception e) { LOGGER.error("Failed to load configuration data. Exception follows.", e); } catch (NoClassDefFoundError e) { LOGGER.error("Failed to start agent because dependencies were not " + "found in classpath. Error follows.", e); } catch (Throwable t) { // caught because the caller does not handle or log Throwables LOGGER.error("Unhandled error", t); } } } } // PollingPropertiesFileConfigurationProvider.start()启动一个单独的线程来监控properties配置文件 public void start() { LOGGER.info("Configuration provider starting"); Preconditions.checkState(file != null, "The parameter file must not be null"); executorService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d") .build()); FileWatcherRunnable fileWatcherRunnable = new FileWatcherRunnable(file, counterGroup); executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval, TimeUnit.SECONDS); lifecycleState = LifecycleState.START; LOGGER.debug("Configuration provider started"); }
对于第二点,利用Guava EventBus提供的公布订阅模式机制,将配置改动封装成事件传递给Application。来又一次载入配置
// FileWatcherRunnable.run方法 公布配置改动的事件 eventBus.post(getConfiguration()); // Application.main方法来注冊事件订阅 Application application; if(reload) { EventBus eventBus = new EventBus(agentName + "-event-bus"); PollingPropertiesFileConfigurationProvider configurationProvider = new PollingPropertiesFileConfigurationProvider(agentName, configurationFile, eventBus, 30); components.add(configurationProvider); application = new Application(components); eventBus.register(application); } // Application类採用@Subscribe标注来定义订阅方法,即配置改动后会运行handleConfigurationEvent方法,这种方法是线程安全的 @Subscribe public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); startAllComponents(conf); }
时间: 2024-12-19 13:51:14