Overview
- source采集的日志首先会传入ChannelProcessor, 在其内首先会通过Interceptors进行过滤加工,然后通过ChannelSelector选择channel。
- Source和Sink之间是异步的,sink只需要监听自己关系的Channel的变化即可。
- sink存在写失败的情况,flume提供了如下策略:
- 默认是一个sink,若写入失败,则该事务失败,稍后重试。
- 故障转移策略:给多个sink定义优先级,失败时会路由到下一个优先级的sink。sink只要抛出一次异常就会被认为是失败了,则从存活Sink中移除,然后指数级时间等待重试,默认是等待1s开始重试,最大等待重试时间是30s.
- flume还提供了负载均衡策略:默认提供轮训和随机两种算法。通过抽象一个类似ChannelSelector的SinkSelector进行选择。
- 以上,对于Source和sink如何异步、channel如何实现事务机制,详见后面的具体源码分析。
The whole process
- 首先是flume的启动, 提供了两种启动方式:使用EmbeddedAgent内嵌在Java应用中或使用Application单独启动一个进程。 一般使用Application起一个进程比较多,我们这里也主要分析这种方式。
- 程序入口:org.apache.flume.node.Application的main方法。
- 注:因为暂时还没有了解到Zookeeper原理,所以这里关于ZK的部分就跳过了。
- flume启动流程大致如下:
- 设置默认值启动参数,参数是否是必须的
Options options = new Options(); ? Option option = new Option("n", "name", true, "the name of this agent"); option.setRequired(true); options.addOption(option); ? option = new Option("f", "conf-file", true, "specify a config file (required if -z missing)"); option.setRequired(false); options.addOption(option); ......
- 解析命令行参数
if (commandLine.hasOption(‘h‘)) { new HelpFormatter().printHelp("flume-ng agent", options, true); return; } String agentName = commandLine.getOptionValue(‘n‘); boolean reload = !commandLine.hasOption("no-reload-conf"); // 是否reload配置文件 ? if (commandLine.hasOption(‘z‘) || commandLine.hasOption("zkConnString")) { isZkConfigured = true; }
- Zookepper相关:暂时略
- 打开配置文件
if (isZkConfigured) { ... // 若配置了zk,则使用zk参数启动 } else { // 打开配置文件,如果不存在则快速失败 File configurationFile = new File(commandLine.getOptionValue(‘f‘)); ? // 确保没有配置文件的时候agent会启动失败 if (!configurationFile.exists()) { ...// If command line invocation, then need to fail fast } List<LifecycleAware> components = Lists.newArrayList(); ? // 若需要定期reload配置文件 if (reload) { // 使用EventBus事件总线, to allow publish-subscribe-style communication EventBus eventBus = new EventBus(agentName + "-event-bus"); // 读取配置文件,使用定期轮训拉起策略,默认30s拉取一次 PollingPropertiesFileConfigurationProvider configurationProvider = new PollingPropertiesFileConfigurationProvider( agentName, configurationFile, eventBus, 30); components.add(configurationProvider); // 向Application注册组件 application = new Application(components); // 向EventBus注册本应用,EB会自动注册Application中使用@Subscribe声明的方法 // TODO: EventBus, and why reload configuration eventBus.register(application); } else { // 若配置文件不支持定期reload PropertiesFileConfigurationProvider configurationProvider = new PropertiesFileConfigurationProvider(agentName, configurationFile); application = new Application(); // 直接使用配置文件初始化Flume组件 application.handleConfigurationEvent(configurationProvider.getConfiguration()); } }
- reload conf:若需要reload,则使用事件总线EventBus实现,Application的handleConfigurationEvent是事件订阅者,PollingPropertiesFileConfigurationProvider是事件发布者,其会定期轮训检查文件是否变更,如果变更则重新读取配置文件,发布配置文件事件变更,而handleConfigurationEvent会收到该配置变更重新进行初始化。
- handleConfigurationEvent:
@Subscribe public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { // MaterializedConfiguration存储Flume运行时需要的组件:Source、Channel、Sink、SourceRunner、SinkRunner等。其是通过ConfigurationProvider进行初始化获取,比如PollingPropertiesFileConfigurationProvider会读取配置文件然后进行组件的初始化。 stopAllComponents(); // 停止所有组件 startAllComponents(conf);// 使用配置文件初始化所有组件 }
- startAllComponents
- 要首先启动channels,等待所有channels启动才能继续。然后启动SinkRunner,准备好消费者。最后启动SourceRunner开始进行采集日志。
- LifecycleSupervisor是组件守护哨兵,对这些组件进行守护,出问题时默认策略是自动重启。
- 这里的启动都是supervisor.supervise(entry.getValue(),new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); 这是如何启动的,我们后面再介绍。
private void startAllComponents(MaterializedConfiguration materializedConfiguration) { logger.info("Starting new configuration:{}", materializedConfiguration); ? this.materializedConfiguration = materializedConfiguration; ? // 启动channels。 for (Entry<String, Channel> entry : materializedConfiguration.getChannels().entrySet()) { try { logger.info("Starting Channel " + entry.getKey()); // TODO: LifecycleSupervisor启动 // new SupervisorPolicy.AlwaysRestartPolicy():使用失败时总是重启的策略 // LifecycleState.START: 初始化组件默认状态为START supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } ? /* * Wait for all channels to start. */ for (Channel ch : materializedConfiguration.getChannels().values()) { while (ch.getLifecycleState() != LifecycleState.START && !supervisor.isComponentInErrorState(ch)) { try { logger.info("Waiting for channel: " + ch.getName() + " to start. Sleeping for 500 ms"); Thread.sleep(500); } catch (InterruptedException e) { logger.error("Interrupted while waiting for channel to start.", e); Throwables.propagate(e); } } } ? // 启动sinkRunner for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) { try { logger.info("Starting Sink " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } ? // 启动SourceRunner TODO: SourceRunner & SinkRunner for (Entry<String, SourceRunner> entry : materializedConfiguration.getSourceRunners().entrySet()) { try { logger.info("Starting Source " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } ? this.loadMonitoring(); }
- 之后main函数调用了
application.start();
/** 其循环Application注册的组件,然后守护哨兵对它进行守护,默认策略是出现问题会自动重启组件,假设我们支持reload配置文件,则之前启动Application时注册过PollingPropertiesFileConfigurationProvider组件,即该组件会被守护哨兵守护着,出现问题默认策略自动重启 **/ public synchronized void start() { // private final List<LifecycleAware> components; for (LifecycleAware component : components) { // private final LifecycleSupervisor supervisor; supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } }
相应的stop函数。首先是main函数中:
final Application appReference = application; // Runtinme.getRuntime(): Returns the runtime object associated with the current Java application. /** addShutdownHook: 注册一个新的虚拟机关闭钩子。 虚拟机shutdown有两种情况:1)当最后一个非守护进行户外那个退出或调用system.exit时,程序正常退出;2)JVM通过ctrl-c等被用户中断。 **/ Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") { @Override public void run() { appReference.stop(); } }); public synchronized void stop() { // 关闭守护哨兵和监控服务。 supervisor.stop(); if (monitorServer != null) { monitorServer.stop(); } }
- 至此,Application整个流程就分析完了。
- 设置默认值启动参数,参数是否是必须的
- 整体流程可以总结为:
- 首先初始化命令行配置;
- 接着读取配置文件;
- 根据是否需要reload初始化配置文件中的组件;如果需要reload会使用EventBus进行发布订阅变化;
- 接着创建Application,创建守护哨兵LifecycleSupervisor,并先停止所有组件,接着启动所有组件;启动顺序:Channel、SinkRunner、SourceRunner,并把这些组件注册给守护哨兵、初始化监控服务MonitorService;停止顺序:SourceRunner、SinkRunner、Channel;
- 如果配置文件需要定期reload,则需要注册PollingPropertiesFileConfigurationProvider到守护哨兵;
- 最后注册虚拟机关闭钩子,停止守护哨兵和监控服务。
LifecycleSupervisor
- 守护哨兵,负责监控和重启组件
- My: 所有需要被监控和重启的组件都应implements LifecycleAware
public class LifecycleSupervisor implements LifecycleAware { public LifecycleSupervisor() { lifecycleState = LifecycleState.IDLE; // 存放被监控的组件 supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>(); // 存放正在被监控的组件 monitorFutures = new HashMap<LifecycleAware, ScheduledFuture<?>>(); // 创建监控服务线程池 monitorService = new ScheduledThreadPoolExecutor(10, new ThreadFactoryBuilder().setNameFormat( "lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d") .build()); monitorService.setMaximumPoolSize(20); monitorService.setKeepAliveTime(30, TimeUnit.SECONDS); // 定期清理被取消的组件 purger = new Purger(); // 默认不进行清理 needToPurge = false; } ... // start() & stop()... // 进行组件守护 public synchronized void supervise(LifecycleAware lifecycleAware, SupervisorPolicy policy, LifecycleState desiredState) { if (this.monitorService.isShutdown() || this.monitorService.isTerminated() || this.monitorService.isTerminating()) { ...// 如果哨兵已停止则抛出异常 } ? // 初始化守护组件 Supervisoree process = new Supervisoree(); process.status = new Status(); ? // 默认策略是失败重启 process.policy = policy; process.status.desiredState = desiredState; // 初始化组件默认状态,一般为START process.status.error = false; ? // 组件监控器,用于定时获取组件的最新状态,或重启组件。后面会介绍MonitorRunnable具体做什么。 MonitorRunnable monitorRunnable = new MonitorRunnable(); monitorRunnable.lifecycleAware = lifecycleAware; monitorRunnable.supervisoree = process; monitorRunnable.monitorService = monitorService; ? supervisedProcesses.put(lifecycleAware, process); ? // 以固定时间间隔执行monitorRunnable线程 // scheduleWithFixedDelay: Creates and executes a periodic action. If any execution of the task encounters an exception, subsequent executions are suppressed. Otherwise, the task will only terminate via cancellation or termination of the executor. // 所以需要把所有异常捕获,才能保证定时任务继续执行。 ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay( monitorRunnable, 0, 3, TimeUnit.SECONDS); monitorFutures.put(lifecycleAware, future); }
- MonitorRunnable:负责进行组件状态迁移或组件故障恢复
public static class MonitorRunnable implements Runnable { ? public ScheduledExecutorService monitorService; public LifecycleAware lifecycleAware; public Supervisoree supervisoree; ? @Override public void run() { long now = System.currentTimeMillis(); ? try { if (supervisoree.status.firstSeen == null) { logger.debug("first time seeing {}", lifecycleAware); ? supervisoree.status.firstSeen = now; // 记录第一次状态查看时间 } ? supervisoree.status.lastSeen = now; // 记录最后一次状态查看时间 synchronized (lifecycleAware) { // 如果守护组件被丢弃或出错了,则直接返回 if (supervisoree.status.discard) { // 也就是此时已经调用了unsupervise logger.info("Component has already been stopped {}", lifecycleAware); return; } else if (supervisoree.status.error) { logger.info("Component {} is in error state, and Flume will not" + "attempt to change its state", lifecycleAware); return; } ? // 更新最后一次查看到的状态 supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState(); ? // 如果组件的状态和守护组件看到的状态不一致,则以守护组件的状态为准,然后进行初始化 if (!lifecycleAware.getLifecycleState().equals( supervisoree.status.desiredState)) { switch (supervisoree.status.desiredState) { // 如果是启动状态,则启动组件。# 最开始的时候组件应该就是这么启动的 case START: try { lifecycleAware.start(); } catch (Throwable e) { logger.error("Unable to start " + lifecycleAware + " - Exception follows.", e); if (e instanceof Error) { // This component can never recover, shut it down. supervisoree.status.desiredState = LifecycleState.STOP; try { lifecycleAware.stop(); logger.warn("Component {} stopped, since it could not be" + "successfully started due to missing dependencies", lifecycleAware); } catch (Throwable e1) { logger.error("Unsuccessful attempt to " + "shutdown component: {} due to missing dependencies." + " Please shutdown the agent" + "or disable this component, or the agent will be" + "in an undefined state.", e1); supervisoree.status.error = true; if (e1 instanceof Error) { throw (Error) e1; } // Set the state to stop, so that the conf poller can // proceed. } } supervisoree.status.failures++; } break; case STOP: try { lifecycleAware.stop(); } catch (Throwable e) { logger.error("Unable to stop " + lifecycleAware + " - Exception follows.", e); if (e instanceof Error) { throw (Error) e; } supervisoree.status.failures++; } break; default: logger.warn("I refuse to acknowledge {} as a desired state", supervisoree.status.desiredState); } ? if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) { logger.error( "Policy {} of {} has been violated - supervisor should exit!", supervisoree.policy, lifecycleAware); } } } } catch (Throwable t) { logger.error("Unexpected error", t); } logger.debug("Status check complete"); } }
Source
SourceRunner
- 首先是SourceRunner,它控制how a source is driven。?
- 它是一个用来实例化derived classes(派生类)的抽象类。 根据指定的source,来通过其内的static factory method 来实例化runner。
// 根据指定source的类型来实例化一个source runner的静态工厂方法 // 输入是要运行的source,返回可以运行指定source的runner public static SourceRunner forSource(Source source) { SourceRunner runner = null; ? if (source instanceof PollableSource) { runner = new PollableSourceRunner(); ((PollableSourceRunner) runner).setSource((PollableSource) source); } else if (source instanceof EventDrivenSource) { runner = new EventDrivenSourceRunner(); ((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source); } else { throw new IllegalArgumentException("No known runner type for source " + source); } ? return runner; }
EventDrivenSourceRunner
- starts、stops and manages EventDrivenSource event-driven sources
- 其内有如下几个方法:
- 构造方法
public EventDrivenSourceRunner() { lifecycleState = LifecycleState.IDLE; }
- start()
@Override public void start() { Source source = getSource(); //获取Source ChannelProcessor cp = source.getChannelProcessor(); //Channel处理器 cp.initialize(); //初始化Channel处理器 source.start(); //启动Source lifecycleState = LifecycleState.START; //本组件状态改成启动状态 }
- stop()、toString()、getLifecycleState()
- 构造方法
PollableSourceRunner
public class PollableSourceRunner extends SourceRunner { @Override public void start() { PollableSource source = (PollableSource) getSource(); //获取Source ChannelProcessor cp = source.getChannelProcessor(); //Channel处理器 cp.initialize(); //初始化channel处理器 source.start(); //启动source ? runner = new PollingRunner(); //新建一个PollingRunner线程来拉取数据 runner.source = source; runner.counterGroup = counterGroup; runner.shouldStop = shouldStop; ? runnerThread = new Thread(runner); runnerThread.setName(getClass().getSimpleName() + "-" + source.getClass().getSimpleName() + "-" + source.getName()); runnerThread.start(); ? lifecycleState = LifecycleState.START; } }
- PollingRunner线程
@Override public void run() { while (!shouldStop.get()) { //如果没有停止,则一直在死循环运行 counterGroup.incrementAndGet("runner.polls"); //原子操作 ? try { //调用PollableSource的process方法进行轮训拉取,然后判断是否遇到了失败补偿 if (source.process().equals(PollableSource.Status.BACKOFF)) {/ counterGroup.incrementAndGet("runner.backoffs"); ? //失败补偿时暂停线程处理,等待超时时间之后重试 Thread.sleep(Math.min( counterGroup.incrementAndGet("runner.backoffs.consecutive") * source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval())); } else { counterGroup.set("runner.backoffs.consecutive", 0L); } } catch (InterruptedException e) { } } } } }
- TODO
Source
public interface Source extends LifecycleAware, NamedComponent { public void setChannelProcessor(ChannelProcessor channelProcessor); public ChannelProcessor getChannelProcessor(); }
- 继承了LifecycleAware接口,然后只提供了ChannelProcessor的setter和getter接口。其中:
- 它的的所有逻辑的实现应该在LifecycleAware接口的start和stop中实现;
- ChannelProcessor用来进行日志流的过滤和Channel的选择及调度。
- 由上述的Runner我们知道,Source 提供了两种机制: PollableSource (轮训拉取)和 EventDrivenSource (事件驱动)
- Source作用就是监听日志,采集,然后交给ChannelProcessor处理。
EventDrivenSource
- 事件驱动型source不需要外部driver来获取event,EventDriven是一个implement Source的空接口。
- 从这里开始~~~‘
Channel
- 通过 Channel 实现了 Source 和 Sink 的解耦,可以实现多对多的关联,和 Source 、 Sink 的异步化
- Channel exposes a transaction interface that can be used by its clients to ensure automic put(Event) and take() semantics.
ChannelProcesoor
- 前面我们了解到Source采集日志后会交给ChannelProcessor处理,so接下来我们从ChannelProcessor入手,其依赖如下组件:
private final ChannelSelector selector; //Channel选择器,.flume.ChannelSelector private final InterceptorChain interceptorChain; //拦截器链,.flume.interceptor.InterceptorChain private ExecutorService execService; //用于实现可选Channel的ExecutorService,默认是单线程实现 [注:这个我在某个博客上看到的,但这个组件我在ChannelProcessor中没有搜到]
- 我们来看ChannelProcessor是如何处理Event的:
// Attempts to put the given event into each configured channel public void processEvent(Event event) { ? event = interceptorChain.intercept(event); //首先进行拦截器链过滤,TODO:intercep... // InterceptorChain实现了Interceptor接口,调用a list of other Interceptors. 实现event的过滤和加工。具体见后面 if (event == null) { return; } ? // Process required channels //通过Channel选择器获取必须成功处理的Channel,然后事务中执行. List<Channel> requiredChannels = selector.getRequiredChannels(event); for (Channel reqChannel : requiredChannels) { Transaction tx = reqChannel.getTransaction(); // 继承自Channel接口的类要实现getTransaction()方法,TODO:getTransaction Preconditions.checkNotNull(tx, "Transaction object must not be null"); try { tx.begin(); //开始事务 ? reqChannel.put(event); // 将event放到reqChannel ? tx.commit(); //提交事务 } catch (Throwable t) { tx.rollback(); // 如果捕捉到throwable(including Error & Exception),则回滚事务 if (t instanceof Error) { LOG.error("Error while writing to required channel: " + reqChannel, t); throw (Error) t; } else if (t instanceof ChannelException) { throw (ChannelException) t; } else { throw new ChannelException("Unable to put event on required " + "channel: " + reqChannel, t); //TODO: Channelexception可能会被handle,不然如何保证RequiredChannel的成功处理? } } finally { if (tx != null) { tx.close(); // 最后如果事务非空,还得关闭该事务 } } } ? // Process optional channels //通过Channel选择器获取可选的Channel,这些Channel失败是可以忽略,不影响其他Channel的处理 List<Channel> optionalChannels = selector.getOptionalChannels(event); for (Channel optChannel : optionalChannels) { Transaction tx = null; try { tx = optChannel.getTransaction(); tx.begin(); ? optChannel.put(event); ? tx.commit(); } catch (Throwable t) { tx.rollback(); LOG.error("Unable to put event on optional channel: " + optChannel, t); if (t instanceof Error) { throw (Error) t; } } finally { if (tx != null) { tx.close(); } } } }
- 看下flume内实现的channel类
?
Channel接口
public interface Channel extends LifecycleAware, NamedComponent { // put() and get() must be invoked within an active Transaction boundary public void put(Event event) throws ChannelException; public Event take() throws ChannelException; // @return: the transaction instance associated with this channel public Transaction getTransaction(); }
AbstractChannel
abstract class AbstractChannel implements Channel, LifecycleAware, Configurable
- 实现了lifecycleStatus的改变(在构造、start()和stop()方法中),实现了空configure()方法。没有做什么具体的channel相关的处理。
BasicChannelSemantics
- 基本Channel语义的实现,包括Transaction类的thread-local语义的实现。
public abstract class BasicChannelSemantics extends AbstractChannel { ? // 1. 事务使用ThreadLocal存储,保证事务线程安全 private ThreadLocal<BasicTransactionSemantics> currentTransaction = new ThreadLocal<BasicTransactionSemantics>(); ? private boolean initialized = false; ? protected void initialize() {} // 2. 进行一些初始化工作 ? // 3.提供给实现类(子类)的创建事务的回调 // 用于new Transaction对象,该对象必须继承自BasicTransactionSemantics // 比如MemoryChannel覆盖了该方法,方法体内new了一个实例,该实例为其内私有类MemoryTransaction,该私有类继承了BasicTransactionSemantics。 // MemoryTransaction内部用两条双向并发阻塞队列LinkedBlockingDeque实现putList和takeList。具体的稍后看,会介绍到MemoryChannel TODO protected abstract BasicTransactionSemantics createTransaction(); ? // 4. 往Channel中放Event,其直接委托给事务的put方法 // 确保该thread存在一个事务,然后将put方法委托给该线程的BasicTransactionSemantics实例 @Override public void put(Event event) throws ChannelException { // ThreadLocal<BasicTransactionSemantics>的实例currentTransaction // 即取得当前线程的事务实例 BasicTransactionSemantics transaction = currentTransaction.get(); Preconditions.checkState(transaction != null, "No transaction exists for this thread"); transaction.put(event); } ? // 5.从Channel获取Event,也是直接委托给事务的take方法实现 @Override public Event take() throws ChannelException { BasicTransactionSemantics transaction = currentTransaction.get(); Preconditions.checkState(transaction != null, "No transaction exists for this thread"); return transaction.take(); } ? @Override public Transaction getTransaction() { // 1. 如果channel is not ready, then 初始化该channel if (!initialized) { synchronized (this) { if (!initialized) { initialize(); initialized = true; } } } ? // 2. 如果当前线程没有open的事务(无事务或已关闭),则创建一个,并绑定到currentTransaction中 BasicTransactionSemantics transaction = currentTransaction.get(); if (transaction == null || transaction.getState().equals( BasicTransactionSemantics.State.CLOSED)) { transaction = createTransaction(); currentTransaction.set(transaction); } return transaction; } }
MemoryChannel
- 当写入硬盘不实际或不需要数据持久化时,推荐使用。或在单元测试时使用。
- 大部分channel会把put和take委托给事务去完成。
- 纯内存的Channel实现,整个事务操作都是在内存中完成的。
- 每个事务都有一个TakeList和PutList,分别用于存储事务相关的取数据和放数据,等事务提交时才完全同步到Channel Queue,或者失败把取数据回滚到Channel Queue。 TODO:整体理解何时commit、rollback。
public class MemoryChannel extends BasicChannelSemantics { // TODO: about factory private static Logger LOGGER = LoggerFactory.getLogger(MemoryChannel.class); ...//一些常量定义:缺省值defaultCapacity、defaultTransCapacity、byteCapacitySlotSize.. ? // 内部类,继承自BasicTransactionSemantics。TODO: About BasicTransactionSemantics private class MemoryTransaction extends BasicTransactionSemantics { // 每个事务都有两条双向并发阻塞队列,TODO: LinkedBlockingDeque private LinkedBlockingDeque<Event> takeList; private LinkedBlockingDeque<Event> putList; private final ChannelCounter channelCounter; ...// public MemoryTransaction(int transCapacity, ChannelCounter counter) { putList = new LinkedBlockingDeque<Event>(transCapacity); takeList = new LinkedBlockingDeque<Event>(transCapacity); ? channelCounter = counter; } ? // 将event放到putList中 // 整个doPut操作相对来说比较简单,就是往事务putList队列放入Event,如果满了则直接抛异常回滚事务;否则放入putList暂存,等事务提交时转移到Channel Queue。另外需要增加放入队列的字节数计数器,以便之后做字节容量限制 @Override protected void doPut(Event event) throws InterruptedException { // channelCounter是一个计数器,记录当前队列放入Event数、取出event数、成功数等。 channelCounter.incrementEventPutAttemptCount(); // 增加放入event计数器 // estimateEventSize计算当前Event body大小,ceil():向上取整 int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize); ? // 往事务队列的putList中放入Event,如果满了,则抛异常回滚事务 if (!putList.offer(event)) { throw new ChannelException( "Put queue for MemoryTransaction of capacity " + putList.size() + " full, consider committing more frequently, " + "increasing capacity or increasing thread count"); } putByteCounter += eventByteSize; // 增加放入队列字节数计数器 } ? // 从Channel Queue中取event放到takeList中 @Override protected Event doTake() throws InterruptedException { channelCounter.incrementEventTakeAttemptCount(); // 如果takeList队列没有剩余容量,即当前事务已经消费了最大容量的Event if (takeList.remainingCapacity() == 0) { throw new ChannelException("Take list for MemoryTransaction, capacity " + takeList.size() + " full, consider committing more frequently, " + "increasing capacity, or increasing thread count"); } // queueStored试图获取一个信号量,超时直接返回null if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) { return null; } Event event; // 从Channel Queue获取一个Event, 对Channel Queue的操作必须加queueLock synchronized (queueLock) { event = queue.poll(); } // 因为信号量的保证,Channel Queue不应该返回null,出现了就不正常了 Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " + "signalling existence of entry"); // 暂存到事务的takeList队列 takeList.put(event); // 计算当前Event body大小并增加取出队列字节数计数器 int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize); takeByteCounter += eventByteSize; ? return event; } ? // 等事务提交时,才将当前事务的put list同步到Channel Queue @Override protected void doCommit() throws InterruptedException { // /1、计算改变的Event数量,即取出数量-放入数 int remainingChange = takeList.size() - putList.size(); if (remainingChange < 0) { // bytesRemaining是字节容量信号量,超出容量则回滚事务 if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) { throw new ChannelException("Cannot commit transaction. Byte capacity " + "allocated to store event body " + byteCapacity * byteCapacitySlotSize + "reached. Please increase heap space/byte capacity allocated to " + "the channel as the sinks may not be keeping up with the sources"); } if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) { bytesRemaining.release(putByteCounter); throw new ChannelFullException("Space for commit to queue couldn‘t be acquired." + " Sinks are likely not keeping up with sources, or the buffer size is too tight"); } } int puts = putList.size(); int takes = takeList.size(); synchronized (queueLock) { if (puts > 0) { while (!putList.isEmpty()) { if (!queue.offer(putList.removeFirst())) { // offer:添加一个元素并返回true throw new RuntimeException("Queue add failed, this shouldn‘t be able to happen"); } } } putList.clear(); takeList.clear(); } bytesRemaining.release(takeByteCounter); takeByteCounter = 0; putByteCounter = 0; ? queueStored.release(puts); if (remainingChange > 0) { queueRemaining.release(remainingChange); } if (puts > 0) { channelCounter.addToEventPutSuccessCount(puts); } if (takes > 0) { channelCounter.addToEventTakeSuccessCount(takes); } ? channelCounter.setChannelSize(queue.size()); } ? // 事务失败时,将take list数据回滚到Channel Queue // 在回滚时,需要把takeList中暂存的事件回滚到Channel Queue,并回滚queueStored信号量。 @Override protected void doRollback() { int takes = takeList.size(); synchronized (queueLock) { Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " + "queue to rollback takes. This should never happen, please report"); while (!takeList.isEmpty()) { queue.addFirst(takeList.removeLast()); } putList.clear(); } bytesRemaining.release(putByteCounter); putByteCounter = 0; takeByteCounter = 0; ? queueStored.release(takes); channelCounter.setChannelSize(queue.size()); } ? } ? private Object queueLock = new Object(); // 在操作Channel Queue时都需要锁定,因为Channel Queue可能动态扩容(会被重新new)。用法就是synchronized(queueLock){...操作queue} @GuardedBy(value = "queueLock") // 用@GuardedBy注解告诉维护者这个变量被哪个锁保护着 private LinkedBlockingDeque<Event> queue; // 由一个Channel Queue存储整个Channel的Event数据 ? // Semaphore可控制某资源可被同时访问的个数,acquire()获取一个许可,若无等待,而release()释放一个许可 // queueRemaining表示可存储事件容量。在提交事务时增加或减少该信号量 // 1. 首先在configure()函数中初始化为一个capacity大小的信号量 // 2. 在resize的时候,如果要缩容则要看是否还能acquire到oldCapacity - capacity个许可,不能则不允许缩容(很合理啊,不然就丢失数据了)。若是扩容,则queueRemaining.release(capacity - oldCapacity) // 3. 提交事务时,如果takeList.size() < putList.size(),则要检查是否有足够的queueRemaining private Semaphore queueRemaining; ? // 表示ChannelQueue已存储事件容量 // 2. 在configure()中初始化为一个大小为0的信号量 // 3. 在doTake()时tryAcquire是否有许可 // 4. 在commit()时release(puts)增加puts个许可 // 5. 在rollback()时release(takes)个许可 private Semaphore queueStored; ? // maximum items in a transaction queue private volatile Integer transCapacity; private volatile int keepAlive; private volatile int byteCapacity; private volatile int lastByteCapacity; private volatile int byteCapacityBufferPercentage; private Semaphore bytesRemaining; private ChannelCounter channelCounter; ? public MemoryChannel() { super(); } ? @Override public void configure(Context context) { // Read parameters from context // capacity、transactionCapacity、byteCapacity、byteCapacityBufferPercentage... } ? // 因为多个事务要操作ChannelQueue,还要考虑ChannelQueue的扩容问题,因此MemoryChannel使用了锁来实现;而容量问题则使用了信号量来实现。 // 改变queue的容量,是通过新建一个LinkedBlockingDeque来实现的,并将原queue的东西加进来。 private void resizeQueue(int capacity) throws InterruptedException { int oldCapacity; // 计算原queue的capacity,注意该方法需加锁 synchronized (queueLock) { oldCapacity = queue.size() + queue.remainingCapacity(); } ? if (oldCapacity == capacity) { return; } else if (oldCapacity > capacity) { // tryAcquire():从该信号量中获取指定数量的许可 //首先要预占老容量-新容量的大小,以便缩容容量。如果获取失败,默认是记录日志,然后忽略 if (!queueRemaining.tryAcquire(oldCapacity - capacity, keepAlive, TimeUnit.SECONDS)) { LOGGER.warn("Couldn‘t acquire permits to downsize the queue, resizing has been aborted"); } else { //否则,直接缩容,然后复制老Queue的数据,缩容时需要锁定queueLock,这一系列操作要线程安全 synchronized (queueLock) { LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity); newQueue.addAll(queue); queue = newQueue; } } }
Interceptor
- flume内部实现了很多自定义的Interceptor,如下图:
- 同时还实现了InterceptorChain用来链式处理event。
InterceptorChain
- Implementation of Interceptor that calls a list of other Interceptors
- Interptor接口: 用于过滤、加工Event,然后返回一个新的Event。
- 相比之下,InterceptorChain就是对event逐个(链式)调用其内的Interceptor(接口子类)实例的各个方法。
public class InterceptorChain implements Interceptor { ? // list of interceptors that will be traversed, in order private List<Interceptor> interceptors; ? public InterceptorChain() { interceptors = Lists.newLinkedList(); // 构造方法,type LinkedList } public void setInterceptors(List<Interceptor> interceptors) { this.interceptors = interceptors; // set方法 } ? // Interceptor接口的intercept方法: Interception of a single Event.事件拦截 // @return: Original or modified event, or null if the Event is to be dropped. @Override public Event intercept(Event event) { for (Interceptor interceptor : interceptors) { if (event == null) { return null; } event = interceptor.intercept(event); // 注意:该类的实例会调用上面的set方法初始化intercptors,其中的intercptor是Interceptor接口子类的实例。所以这里的intercept()方法调用的是Interceptor的某个接口所覆盖的方法。[Interceptor有很多子类,下面有一个demo子类的分析,可以往下看HostInterceptor] } return event; } ? // Interceptor接口: Interception of a batch of events // @return: Output list of events @Override public List<Event> intercept(List<Event> events) { ... // 基本同上面的方法,不过调用的是interceptor.intercept(events); } ? // Interceptor: Any initialization / startup needed by the Interceptor. @Override public void initialize() { Iterator<Interceptor> iter = interceptors.iterator(); while (iter.hasNext()) { Interceptor interceptor = iter.next(); interceptor.initialize(); // 挨个对linkedlist中的interceptor实例进行initialize } } ? @Override public void close() { ...// 挨个对linkedlist中的interceptor实例进行close }
HostInterceptor
- implements Interceptor
- 功能:在所有拦截的events的header中上加上本机的host name或IP
public class HostInterceptor implements Interceptor { ... // 一些private变量 /** * Only {@link HostInterceptor.Builder} can build me */ // private的构造方法,so只能通过下面的静态方法Builder实例化 private HostInterceptor(boolean preserveExisting, boolean useIP, String header) { // 用xx.conf内的值初始化这些变量 this.preserveExisting = preserveExisting; this.header = header; InetAddress addr; try { addr = InetAddress.getLocalHost(); //Returns the address of the local host. if (useIP) { //Returns the IP address string in textual presentation host = addr.getHostAddress(); } else { // Gets the fully qualified domain name for this IP address. host = addr.getCanonicalHostName(); } } catch (UnknownHostException e) { logger.warn("Could not get local host address. Exception follows.", e); } ? } ? @Override public void initialize() { // no-op } ? /** * Modifies events in-place. */ @Override public Event intercept(Event event) { Map<String, String> headers = event.getHeaders(); ? // 如果要要保存当前的‘host‘值并且当前已有头部,那么就不处理直接返回。 if (preserveExisting && headers.containsKey(header)) { return event; } if (host != null) { headers.put(header, host); //将host添加到头部 } ? return event; } ? @Override public List<Event> intercept(List<Event> events) { ... // 为events中的每一个event调用intercept(Event event) } ? @Override public void close() { // no-op } ? /** * Builder which builds new instances of the HostInterceptor. */ public static class Builder implements Interceptor.Builder { ? private boolean preserveExisting = PRESERVE_DFLT; private boolean useIP = USE_IP_DFLT; private String header = HOST; ? @Override public Interceptor build() { return new HostInterceptor(preserveExisting, useIP, header); } ? @Override public void configure(Context context) { preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT); useIP = context.getBoolean(USE_IP, USE_IP_DFLT); header = context.getString(HOST_HEADER, HOST); } } ? public static class Constants { public static String HOST = "host"; ... // 一些配置的缺省值 } }
- demo Usage in xx.conf: more details see User Guide
agent.sources.r1.interceptors = i1 agent.sources.r1.interceptors.i1.type = host # preserveExisting: 是否保存当前已存在的‘host‘值,缺省是不保存 agent.sources.r1.interceptors.i1.preserveExisting = true agent.sources.r1.interceptors.i1.useIP = false agent.sources.r1.interceptors.i1.hostHeader = hostname
Selector
- 先上一张所有selector的继承关系图
可见ChannelSelector默认提供了两种实现:复制和多路复用。默认实现是ReplicatingChannelSelector。
ChannelSelector
- interface
- 基于不同实现政策,允许在channels的集合中选取channels子集。
// NamedComponent接口:用于给component附加一个名字,包括setName()和getName()方法 public interface ChannelSelector extends NamedComponent, Configurable { ? // @param channels:all channels the selector could select from. public void setChannels(List<Channel> channels); ? /** * Returns a list of required channels. 这些channels的写入失败会传达回接收事件的source. * @param: event * @return: the list of required channels that this selector has selected for * the given event. */ public List<Channel> getRequiredChannels(Event event); /** * Returns a list of optional channels. 这些channels的写入失败会被忽略。 * @param: event * @return: the list of optional channels that this selector has selected for * the given event. */ public List<Channel> getOptionalChannels(Event event); ? /** * @return the list of all channels that this selector is configured to work * with. */ public List<Channel> getAllChannels(); } ? ## AbstractChannelSelector ? * abstract class ? ```java public abstract class AbstractChannelSelector implements ChannelSelector { ? private List<Channel> channels; private String name; ...// override ChannelSelctor的getAllChannels()、setChannels(List<Channel> channels)、setName(String name)、getName()方法。 //@return: A map of name to channel instance. protected Map<String, Channel> getChannelNameMap() { Map<String, Channel> channelNameMap = new HashMap<String, Channel>(); for (Channel ch : getAllChannels()) { // 对每一个Channel, 将Channel和其名字放到HashMap中 channelNameMap.put(ch.getName(), ch); } return channelNameMap; } ? /** * Given a list of channel names as space delimited string, * returns list of channels. * @return List of {@linkplain Channel}s represented by the names. */ // 根据(space分隔的channel名字的)字符串, 返回相应的channel,利用名字-channel的HashMap protected List<Channel> getChannelListFromNames(String channels, Map<String, Channel> channelNameMap) { List<Channel> configuredChannels = new ArrayList<Channel>(); if (channels == null || channels.isEmpty()) { // 判空 return configuredChannels; } String[] chNames = channels.split(" "); for (String name : chNames) { Channel ch = channelNameMap.get(name); if (ch != null) { configuredChannels.add(ch); } else { throw new FlumeException("Selector channel not found: " + name); } } return configuredChannels; } ? }
ReplicatingChannelSelector
- ChannelSelector的一个具体实现,即把接收到的消息复制到每一个Channel。【与之对应的,MultiplexingChannelSelector会根据 Event Header 中的参数进行选择,以此来选择使用哪个 Channel】
- Replicating channel selector. 允许event被放置到source所配置的所有channels中。
- 实际的实现方式是,默认将所有channel加入requiredChannels中,optionalChannels为空。然后根据配置的"optional"将该配置对应的channel加入optionalChannels,并从requiredChannels中移除(添加和移除是在configure方法中实现的)。 TODO:看一下这个配置如何实现
public class ReplicatingChannelSelector extends AbstractChannelSelector { ? // Configuration to set a subset of the channels as optional. public static final String CONFIG_OPTIONAL = "optional"; List<Channel> requiredChannels = null; // 在configure()中被设置为getAllChannels()的返回值,即所有配置的channels List<Channel> optionalChannels = new ArrayList<Channel>(); ? @Override public List<Channel> getRequiredChannels(Event event) { /* * Seems like there are lot of components within flume that do not call * configure method. It is conceiveable that custom component tests too * do that. So in that case, revert to old behavior. */ // 如果component没有调用configure(),那么requiredChannels为null,此时再调用一次。 // TODO: configure()方法是在哪里调用的? 同样的问题在很多class中都存在 if (requiredChannels == null) { return getAllChannels(); } return requiredChannels; } ? @Override public List<Channel> getOptionalChannels(Event event) { return optionalChannels; } ? @Override public void configure(Context context) { String optionalList = context.getString(CONFIG_OPTIONAL); requiredChannels = new ArrayList<Channel>(getAllChannels()); Map<String, Channel> channelNameMap = getChannelNameMap(); // 根据OptionList(String, 是空格分隔的channel名字),得到相应的Channel,并将channel放到optionalChannel&& 从requiredChannels中移除。 if (optionalList != null && !optionalList.isEmpty()) { for (String optional : optionalList.split("\\s+")) { Channel optionalChannel = channelNameMap.get(optional); requiredChannels.remove(optionalChannel); if (!optionalChannels.contains(optionalChannel)) { optionalChannels.add(optionalChannel); } } } } }
Sink
Sink Runner
- A driver for sinks that polls them, attempting to process events if any are available in the Channel. All sinks are polled.
public class SinkRunner implements LifecycleAware { private PollingRunner runner; // 内部类,实现了Runnable接口 private SinkProcessor policy; // }
Sink Processor
- 分为两类:
- DefaultSinkProcessor处理单sink,直接传送不附加任何处理。
public void start() { Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set"); sink.start(); // start()方法直接启动single sink lifecycleState = LifecycleState.START; } // stop()方法类似,configure()方法为空 public Status process() throws EventDeliveryException { return sink.process(); // 直接调用sink的process() } public void setSinks(List<Sink> sinks) { Preconditions.checkNotNull(sinks); Preconditions.checkArgument(sinks.size() == 1, "DefaultSinkPolicy can " + "only handle one sink, " + "try using a policy that supports multiple sinks"); sink = sinks.get(0); }
- 多sink处理(AbstractSinkProcessor),其中又包括两种:
- FailoverSinkProcessor:故障切换—>通过维持一个sinks的优先级list —> 把故障sinks降级放到一个pool中被赋予一个冷冻周期。必须先调用setSinks()再configure()
public void setSinks(List<Sink> sinks) { // needed to implement the start/stop functionality super.setSinks(sinks); ? this.sinks = new HashMap<String, Sink>(); for (Sink sink : sinks) { this.sinks.put(sink.getName(), sink); } } private Sink moveActiveToDeadAndGetNext() { Integer key = liveSinks.lastKey(); failedSinks.add(new FailedSink(key, activeSink, 1)); // 把当前liveSinks里的第一优先级key移除到failedSinks中 liveSinks.remove(key); if (liveSinks.isEmpty()) return null; if (liveSinks.lastKey() != null) { return liveSinks.get(liveSinks.lastKey()); } else { return null; } } ...
- LoadBalancingSinkProcessor: 提供在多个sinks之间负载均衡的能力—> 维持一个active sinks的索引序列(load需分布在这些sinks上) —> 算法包括ROUND_ROBIN(default)和RANDOM选择机制。
内部通过一个
private interface SinkSelector
实现。该接口下实现了两个私有静态类RoundRobinSinkSelector
和RandomOrderSinkSelector
.
- DefaultSinkProcessor处理单sink,直接传送不附加任何处理。
时间: 2024-11-10 01:06:50