flume sink核心类结构
1 核心接口Sink
org.apache.flume.Sink
/** * <p>Requests the sink to attempt to consume data from attached channel</p> * <p><strong>Note</strong>: This method should be consuming from the channel * within the bounds of a Transaction. On successful delivery, the transaction * should be committed, and on failure it should be rolled back. * @return READY if 1 or more Events were successfully delivered, BACKOFF if * no data could be retrieved from the channel feeding this sink * @throws EventDeliveryException In case of any kind of failure to * deliver data to the next hop destination. */ public Status process() throws EventDeliveryException; public static enum Status { READY, BACKOFF }
process为核心接口,返回值为状态,只有两个:ready和backoff,调用方会根据返回值做相应处理,后边会看到;
这个接口也是扩展flume sink需要实现的接口,比如KuduSink;
2 Sink封装
org.apache.flume.SinkProcessor
/** * <p> * Interface for a device that allows abstraction of the behavior of multiple * sinks, always assigned to a SinkRunner * </p> * <p> * A sink processors {@link SinkProcessor#process()} method will only be * accessed by a single runner thread. However configuration methods * such as {@link Configurable#configure} may be concurrently accessed. * * @see org.apache.flume.Sink * @see org.apache.flume.SinkRunner * @see org.apache.flume.sink.SinkGroup */ public interface SinkProcessor extends LifecycleAware, Configurable { /** * <p>Handle a request to poll the owned sinks.</p> * * <p>The processor is expected to call {@linkplain Sink#process()} on * whatever sink(s) appropriate, handling failures as appropriate and * throwing {@link EventDeliveryException} when there is a failure to * deliver any events according to the delivery policy defined by the * sink processor implementation. See specific implementations of this * interface for delivery behavior and policies.</p> * * @return Returns {@code READY} if events were successfully consumed, * or {@code BACKOFF} if no events were available in the channel to consume. * @throws EventDeliveryException if the behavior guaranteed by the processor * couldn‘t be carried out. */ Status process() throws EventDeliveryException;
这个类负责封装单个sink或者sink group的处理,常用的子类有:
1)单个sink
org.apache.flume.sink.DefaultSinkProcessor
@Override public Status process() throws EventDeliveryException { return sink.process(); }
DefaultSinkProcessor的process会直接调用内部sink的process;
2)sink group
org.apache.flume.sink.LoadBalancingSinkProcessor
org.apache.flume.sink.FailoverSinkProcessor.FailedSink
3 sink的调用方为SinkRunner
org.apache.flume.SinkRunner
/** * <p> * A driver for {@linkplain Sink sinks} that polls them, attempting to * {@linkplain Sink#process() process} events if any are available in the * {@link Channel}. * </p> * * <p> * Note that, unlike {@linkplain Source sources}, all sinks are polled. * </p> * * @see org.apache.flume.Sink * @see org.apache.flume.SourceRunner */ public class SinkRunner implements LifecycleAware { ... private static final long backoffSleepIncrement = 1000; private static final long maxBackoffSleep = 5000; org.apache.flume.SinkRunner.PollingRunner public static class PollingRunner implements Runnable { private SinkProcessor policy; private AtomicBoolean shouldStop; private CounterGroup counterGroup; @Override public void run() { logger.debug("Polling sink runner starting"); while (!shouldStop.get()) { try { if (policy.process().equals(Sink.Status.BACKOFF)) { counterGroup.incrementAndGet("runner.backoffs"); Thread.sleep(Math.min( counterGroup.incrementAndGet("runner.backoffs.consecutive") * backoffSleepIncrement, maxBackoffSleep)); } else { counterGroup.set("runner.backoffs.consecutive", 0L); } } catch (InterruptedException e) { logger.debug("Interrupted while processing an event. Exiting."); counterGroup.incrementAndGet("runner.interruptions"); } catch (Exception e) { logger.error("Unable to deliver event. Exception follows.", e); if (e instanceof EventDeliveryException) { counterGroup.incrementAndGet("runner.deliveryErrors"); } else { counterGroup.incrementAndGet("runner.errors"); } try { Thread.sleep(maxBackoffSleep); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } } logger.debug("Polling runner exiting. Metrics:{}", counterGroup); } }
无论process返回backoff或者抛exception,都会sleep一段时间,所以flume的sink一旦遇到大量异常数据或者自定义sink返回backoff,都会非常慢;
原文地址:https://www.cnblogs.com/barneywill/p/10570545.html
时间: 2024-10-12 00:25:27