Flume sink 相关内容

SinkRunner.java 开启线程调用相应的Processor(Policy) , 根据
 Policy调用process的返回值来决定线程睡眠时间,每次默认延后1s,最大默认为5s。


public class SinkRunner implements LifecycleAware {

private static final Logger logger = LoggerFactory
.getLogger(SinkRunner.class);
private static final long backoffSleepIncrement = 1000;
private static final long maxBackoffSleep = 5000;
}

@Override
public void start() {
SinkProcessor policy = getPolicy();

policy.start();

runner = new PollingRunner();

runner.policy = policy;
runner.counterGroup = counterGroup;
runner.shouldStop = new AtomicBoolean();

runnerThread = new Thread(runner);
runnerThread.setName("SinkRunner-PollingRunner-" +
policy.getClass().getSimpleName());
runnerThread.start();

lifecycleState = LifecycleState.START;
}

public static class PollingRunner implements Runnable {

private SinkProcessor policy;
private AtomicBoolean shouldStop;
private CounterGroup counterGroup;

@Override
public void run() {
logger.debug("Polling sink runner starting");

while (!shouldStop.get()) {
try {
if (policy.process().equals(Sink.Status.BACKOFF)) {
counterGroup.incrementAndGet("runner.backoffs");

Thread.sleep(Math.min(
counterGroup.incrementAndGet("runner.backoffs.consecutive")
* backoffSleepIncrement, maxBackoffSleep));
} else {
counterGroup.set("runner.backoffs.consecutive", 0L);
}
} catch (InterruptedException e) {
logger.debug("Interrupted while processing an event. Exiting.");
counterGroup.incrementAndGet("runner.interruptions");
} catch (Exception e) {
logger.error("Unable to deliver event. Exception follows.", e);
if (e instanceof EventDeliveryException) {
counterGroup.incrementAndGet("runner.deliveryErrors");
} else {
counterGroup.incrementAndGet("runner.errors");
}
try {
Thread.sleep(maxBackoffSleep);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
logger.debug("Polling runner exiting. Metrics:{}", counterGroup);
}

}

HDFSEentSink.java      被SinkRunner 作为processor
使用,调用process来进行处理。维护一个bucketWriter的集合,根据event获得对应的BucketWriter.  
并调用他们的append方法。

之后对所有的这次process过程中有任务的bucketWriter    即在writers集合中
 遍历,调用他们的flush()。如果Event 数量小于1,那么会返回BACKOFF
状态。sinkRunner会对其进行相应的backoff操作,默认多睡眠1秒。

除此之外还要保证transaction的完整性(未深入阅读)


public Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
List<BucketWriter> writers = Lists.newArrayList();
transaction.begin();
try {
int txnEventCount = 0;
for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
Event event = channel.take();
if (event == null) {
break;
}

// reconstruct the path name by substituting place holders
String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
timeZone, needRounding, roundUnit, roundValue, useLocalTime);
String realName = BucketPath.escapeString(fileName, event.getHeaders(),
timeZone, needRounding, roundUnit, roundValue, useLocalTime);

String lookupPath = realPath + DIRECTORY_DELIMITER + realName;
BucketWriter bucketWriter;
HDFSWriter hdfsWriter = null;
// Callback to remove the reference to the bucket writer from the
// sfWriters map so that all buffers used by the HDFS file
// handles are garbage collected.
WriterCallback closeCallback = new WriterCallback() {
@Override
public void run(String bucketPath) {
LOG.info("Writer callback called.");
synchronized (sfWritersLock) {
sfWriters.remove(bucketPath);
}
}
};
synchronized (sfWritersLock) {
bucketWriter = sfWriters.get(lookupPath);
// we haven‘t seen this file yet, so open it and cache the handle
if (bucketWriter == null) {
hdfsWriter = writerFactory.getWriter(fileType);
bucketWriter = initializeBucketWriter(realPath, realName,
lookupPath, hdfsWriter, closeCallback);
sfWriters.put(lookupPath, bucketWriter);
}
}

// track the buckets getting written in this transaction
if (!writers.contains(bucketWriter)) {
writers.add(bucketWriter);
}

// Write the data to HDFS
try {
bucketWriter.append(event);
} catch (BucketClosedException ex) {
LOG.info("Bucket was closed while trying to append, " +
"reinitializing bucket and writing event.");
hdfsWriter = writerFactory.getWriter(fileType);
bucketWriter = initializeBucketWriter(realPath, realName,
lookupPath, hdfsWriter, closeCallback);
synchronized (sfWritersLock) {
sfWriters.put(lookupPath, bucketWriter);
}
bucketWriter.append(event);
}
}

if (txnEventCount == 0) {
sinkCounter.incrementBatchEmptyCount();
} else if (txnEventCount == batchSize) {
sinkCounter.incrementBatchCompleteCount();
} else {
sinkCounter.incrementBatchUnderflowCount();
}

// flush all pending buckets before committing the transaction
for (BucketWriter bucketWriter : writers) {
bucketWriter.flush();
}

transaction.commit();

if (txnEventCount < 1) {
return Status.BACKOFF;
} else {
sinkCounter.addToEventDrainSuccessCount(txnEventCount);
return Status.READY;
}
} catch (IOException eIO) {
transaction.rollback();
LOG.warn("HDFS IO error", eIO);
return Status.BACKOFF;
} catch (Throwable th) {
transaction.rollback();
LOG.error("process failed", th);
if (th instanceof Error) {
throw (Error) th;
} else {
throw new EventDeliveryException(th);
}
} finally {
transaction.close();
}
}

BucketWriter.java
将对应的文件写到HDFS上,并负责监控是否应该进行rotate操作(count,size,time)等因素。只列出了append函数。


/**
* Open file handles, write data, update stats, handle file rolling and
* batching / flushing. <br />
* If the write fails, the file is implicitly closed and then the IOException
* is rethrown. <br />
* We rotate before append, and not after, so that the active file rolling
* mechanism will never roll an empty file. This also ensures that the file
* creation time reflects when the first event was written.
*
* @throws IOException
* @throws InterruptedException
*/
public synchronized void append(final Event event)
throws IOException, InterruptedException {
checkAndThrowInterruptedException();
// If idleFuture is not null, cancel it before we move forward to avoid a
// close call in the middle of the append.
if(idleFuture != null) {
idleFuture.cancel(false);
// There is still a small race condition - if the idleFuture is already
// running, interrupting it can cause HDFS close operation to throw -
// so we cannot interrupt it while running. If the future could not be
// cancelled, it is already running - wait for it to finish before
// attempting to write.
if(!idleFuture.isDone()) {
try {
idleFuture.get(callTimeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException ex) {
LOG.warn("Timeout while trying to cancel closing of idle file. Idle" +
" file close may have failed", ex);
} catch (Exception ex) {
LOG.warn("Error while trying to cancel closing of idle file. ", ex);
}
}
idleFuture = null;
}

// If the bucket writer was closed due to roll timeout or idle timeout,
// force a new bucket writer to be created. Roll count and roll size will
// just reuse this one
if (!isOpen) {
if (closed) {
throw new BucketClosedException("This bucket writer was closed and " +
"this handle is thus no longer valid");
}
open();
}

// check if it‘s time to rotate the file
if (shouldRotate()) {
boolean doRotate = true;

if (isUnderReplicated) {
if (maxConsecUnderReplRotations > 0 &&
consecutiveUnderReplRotateCount >= maxConsecUnderReplRotations) {
doRotate = false;
if (consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) {
LOG.error("Hit max consecutive under-replication rotations ({}); " +
"will not continue rolling files under this path due to " +
"under-replication", maxConsecUnderReplRotations);
}
} else {
LOG.warn("Block Under-replication detected. Rotating file.");
}
consecutiveUnderReplRotateCount++;
} else {
consecutiveUnderReplRotateCount = 0;
}

if (doRotate) {
close();
open();
}
}

// write the event
try {
sinkCounter.incrementEventDrainAttemptCount();
callWithTimeout(new CallRunner<Void>() {
@Override
public Void call() throws Exception {
writer.append(event); // could block
return null;
}
});
} catch (IOException e) {
LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" +
bucketPath + ") and rethrowing exception.",
e.getMessage());
try {
close(true);
} catch (IOException e2) {
LOG.warn("Caught IOException while closing file (" +
bucketPath + "). Exception follows.", e2);
}
throw e;
}

// update statistics
processSize += event.getBody().length;
eventCounter++;
batchCounter++;

if (batchCounter == batchSize) {
flush();
}
}

Flume sink 相关内容,布布扣,bubuko.com

时间: 2024-12-19 10:57:24

Flume sink 相关内容的相关文章

自定义Flume Sink:ElasticSearch Sink

Flume Sink的目的是从Flume Channel中获取数据然后输出到存储或者其他Flume Source中.Flume Agent启动的时候,它会为每一个Sink都启动一个SinkRunner的对象,SinkRunner.start()方法会启动一个新的线程去管理每一个Sink的生命周期.每一个Sink需要实现start().Stop()和process()方法.你可以在start方法中去初始化Sink的参数和状态,在stop方法中清理Sink的资源.最关键的是process方法,它将处

移动端 h5开发相关内容总结——CSS篇

移动端 h5开发相关内容总结——CSS篇 标签: css移动 2016-01-06 15:59 5536人阅读 评论(3) 收藏 举报  分类: HTML+CSS(17)  版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] 1.移动端开发视窗口的添加 h5端开发下面这段话是必须配置的 <meta name="viewport" content="width=device-width, initial-scale=1, user-scalable=n

EF Code First 配置的相关内容

I.实体间一对一的关系 添加一个PersonPhoto类,表示用户照片类 1 /// <summary> 2 /// 用户照片类 3 /// </summary> 4 public class PersonPhoto 5 { 6 [Key] 7 public int PersonId { get ; set ; } 8 public byte [] Photo { get ; set ; } 9 public string Caption { get ; set ; } // 标题

flume SinkProcessor 相关类实现分析

org.apache.flume.SinkProcessor 扩展了LifecycleAware, Configurable接口的接口类,操作多个sink的抽象层(类似于proxy),用来分配给SinkRunner对象抽象方法:process 和Sink 的process方法类似(内部实现增加了选择Sink的功能)setSinks 设置sinks具体实现类: org.apache.flume.sink.SinkProcessorFactory 设计模式的工厂模式,用于返回SinkProcesso

【转帖】MATLAB 与 音频处理 相关内容摘记

MATLAB 与 音频处理 相关内容摘记 MATLAB 与 音频处理 相关内容摘记 MATLAB 与 音频处理 相关内容摘记 1 MATLAB 音频相关函数 1 MATLAB 处理音频信号的流程 2 音量标准化 2 声道分离合并与组合 3 数字滤波 3 数据转换 5 基于MATLAB 的数字滤波实验6 MATLAB 音频相关函数 声音数据输入输出函数: 可以方便地读写au和way文件,并可控制其中的位及频率. wavread()和wavwriteO. 声音播放: wavplay():播放wav声

python爬虫主要就是五个模块:爬虫启动入口模块,URL管理器存放已经爬虫的URL和待爬虫URL列表,html下载器,html解析器,html输出器 同时可以掌握到urllib2的使用、bs4(BeautifulSoup)页面解析器、re正则表达式、urlparse、python基础知识回顾(set集合操作)等相关内容。

本次python爬虫百步百科,里面详细分析了爬虫的步骤,对每一步代码都有详细的注释说明,可通过本案例掌握python爬虫的特点: 1.爬虫调度入口(crawler_main.py) # coding:utf-8from com.wenhy.crawler_baidu_baike import url_manager, html_downloader, html_parser, html_outputer print "爬虫百度百科调度入口" # 创建爬虫类class SpiderMai

找不到org.apache.spark.streaming.flume.sink.SparkFlumeProtocol$Callback

java.lang.NoClassDefFoundError: org/apache/spark/streaming/flume/sink/SparkFlumeProtocol$Callback at org.apache.spark.streaming.flume.FlumePollingReceiver$$anonfun$onStart$1.apply(FlumePollingInputDStream.scala:84) at org.apache.spark.streaming.flume

学习笔记之html5相关内容

写一下昨天学习的html5的相关内容,首先谈下初次接触html5的感受.以前总是听说html5是如何的强大,如何的将要改变世界.总是充满了神秘感.首先来谈一下我接触的第一个属性是  input的里面的一个属性是 type="email".以前用html的时候,type是有很多类型的,有text,password,summit.就是没听说过email.当得知这个email是验证输入的内容是否为email格式的时候.我顿时兴奋了.以前接触到html的时候,如果要验证输入内容时,是需要写js

在生成一系列相关内容后,签名文件生成

然后上传刚刚生成的CSR文件 按下Generate后,稍等一会,证书就生成了,你可以下载然后安装到本机. 3. AppID 需要强调一下的是,这里的AppID和常说的AppleID不是一个概念.AppleID是用户在苹果的账号,在AppStore上下载应用时,需要使用的就是这个账号.而AppID则是应用的身份证,用来表明应用的ID. 在证书生成后,下一步就是生成AppID,相当于帮你想开发的应用生成一张身份证. AppID需要你起一个名字以方便描述.这里不能输入特殊字符. AppID还分成带通配