理解FlumeNG的batchSize和transactionCapacity参数和传输事务的原理 【转】

基于ThriftSource,MemoryChannel,HdfsSink三个组件,对Flume数据传输的事务进行分析,如果使用的是其他组件,Flume事务具体的处理方式将会不同。

Flume的事务处理原理:

Flume在对Channel进行Put和Take操作的时候,必须要用事物包住,比如:

  1. Channel ch = new MemoryChannel();
  2. Transaction txn = ch.getTransaction();
  3. //事物开始
  4. txn.begin();
  5. try {
  6. Event eventToStage = EventBuilder.withBody(\"Hello Flume!\",
  7. Charset.forName(\"UTF-8\"));
  8. //往临时缓冲区Put数据
  9. ch.put(eventToStage);
  10. //或者ch.take()
  11. //将这些数据提交到channel中
  12. txn.commit();
  13. } catch (Throwable t) {
  14. txn.rollback();
  15. if (t instanceof Error) {
  16. throw (Error)t;
  17. }
  18. } finally {
  19. txn.close();
  20. }
Put事务流程

Put事务可以分为以下阶段:

  • doPut:将批数据先写入临时缓冲区putList
  • doCommit:检查channel内存队列是否足够合并。
  • doRollback:channel内存队列空间不足,抛弃数据   (这个地方个人理解可能会存在数据丢失)

我们从Source数据接收到写入Channel这个过程对Put事物进行分析。

ThriftSource会spawn多个Worker线程(ThriftSourceHandler)去处理数据,Worker处理数据的接口,我们只看batch批量处理这个接口:

  1. @Override
  2. public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
  3. List<Event> flumeEvents = Lists.newArrayList();
  4. for(ThriftFlumeEvent event : events) {
  5. flumeEvents.add(EventBuilder.withBody(event.getBody(), event.getHeaders()));
  6. }
  7. //ChannelProcessor,在Source初始化的时候传进来.将数据写入对应的Channel
  8. getChannelProcessor().processEventBatch(flumeEvents);
  9. ...
  10. return Status.OK;
  11. }

事务逻辑都在processEventBatch这个方法里:

  1. public void processEventBatch(List<Event> events) {
  2. ...
  3. //预处理每行数据,有人用来做ETL嘛
  4. events = interceptorChain.intercept(events);
  5. ...
  6. //分类数据,划分不同的channel集合对应的数据
  7. // Process required channels
  8. Transaction tx = reqChannel.getTransaction();
  9. ...
  10. //事务开始,tx即MemoryTransaction类实例
  11. tx.begin();
  12. List<Event> batch = reqChannelQueue.get(reqChannel);
  13. for (Event event : batch) {
  14. // 这个put操作实际调用的是transaction.doPut
  15. reqChannel.put(event);
  16. }
  17. //提交,将数据写入Channel的队列中
  18. tx.commit();
  19. } catch (Throwable t) {
  20. //回滚
  21. tx.rollback();
  22. ...
  23. }
  24. }
  25. ...
  26. }

每个Worker线程都拥有一个Transaction实例,保存在Channel(BasicChannelSemantics)里的ThreadLocal变量currentTransaction.

那么,事务到底做了什么?

实际上,Transaction实例包含两个双向阻塞队列LinkedBlockingDeque(感觉没必要用双向队列,每个线程写自己的putList,又不是多个线程?),分别为:

  • putList
  • takeList

对于Put事物操作,当然是只用到putList了。putList就是一个临时的缓冲区,数据会先put到putList,最后由commit方法会检查channel是否有足够的缓冲区,有则合并到channel的队列。

channel.put -> transaction.doPut:

  1. protected void doPut(Event event) throws InterruptedException {
  2. //计算数据字节大小
  3. int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
  4. //写入临时缓冲区putList
  5. if (!putList.offer(event)) {
  6. throw new ChannelException(
  7. \"Put queue for MemoryTransaction of capacity \" +
  8. putList.size() + \" full, consider committing more frequently, \" +
  9. \"increasing capacity or increasing thread count\");
  10. }
  11. putByteCounter += eventByteSize;
  12. }

transaction.commit:

  1. @Override
  2. protected void doCommit() throws InterruptedException {
  3. //检查channel的队列剩余大小是否足够
  4. ...
  5. int puts = putList.size();
  6. ...
  7. synchronized(queueLock) {
  8. if(puts > 0 ) {
  9. while(!putList.isEmpty()) {
  10. //写入到channel的队列
  11. if(!queue.offer(putList.removeFirst())) {
  12. throw new RuntimeException(\"Queue add failed, this shouldn\‘t be able to happen\");
  13. }
  14. }
  15. }
  16. //清除临时队列
  17. putList.clear();
  18. ...
  19. }
  20. ...
  21. }

如果在事务期间出现异常,比如channel剩余空间不足,则rollback:

  1. @Override
  2. protected void doRollback() {
  3. ...
  4. //抛弃数据,没合并到channel的内存队列
  5. putList.clear();
  6. ...
  7. }

Take事务

Take事务分为以下阶段:

  • doTake:先将数据取到临时缓冲区takeList
  • 将数据发送到下一个节点
  • doCommit:如果数据全部发送成功,则清除临时缓冲区takeList
  • doRollback:数据发送过程中如果出现异常,rollback将临时缓冲区takeList中的数据归还给channel内存队列。

Sink其实是由SinkRunner线程调用Sink.process方法来了处理数据的。我们从HdfsEventSink的process方法说起,Sink类都有个process方法,用来处理传输数据的逻辑。:

  1. public Status process() throws EventDeliveryException {
  2. ...
  3. Transaction transaction = channel.getTransaction();
  4. ...
  5. //事务开始
  6. transaction.begin();
  7. ...
  8. for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
  9. //take数据到临时缓冲区,实际调用的是transaction.doTake
  10. Event event = channel.take();
  11. if (event == null) {
  12. break;
  13. }
  14. ...
  15. //写数据到HDFS
  16. bucketWriter.append(event);
  17. ...
  18. // flush all pending buckets before committing the transaction
  19. for (BucketWriter bucketWriter : writers) {
  20. bucketWriter.flush();
  21. }
  22. //commit
  23. transaction.commit();
  24. ...
  25. } catch (IOException eIO) {
  26. transaction.rollback();
  27. ...
  28. } finally {
  29. transaction.close();
  30. }
  31. }

大致流程图:

接着看看channel.take,作用是将数据放到临时缓冲区,实际调用的是transaction.doTake:

  1. protected Event doTake() throws InterruptedException {
  2. ...
  3. //从channel内存队列取数据
  4. synchronized(queueLock) {
  5. event = queue.poll();
  6. }
  7. ...
  8. //将数据放到临时缓冲区
  9. takeList.put(event);
  10. ...
  11. return event;
  12. }

接着,HDFS写线程bucketWriter将take到的数据写到HDFS,如果批数据都写完了,则要commit了:

  1. protected void doCommit() throws InterruptedException {
  2. ...
  3. takeList.clear();
  4. ...
  5. }

很简单,其实就是清空takeList而已。如果bucketWriter在写数据到HDFS的时候出现异常,则要rollback:

  1. protected void doRollback() {
  2. int takes = takeList.size();
  3. //检查内存队列空间大小,是否足够takeList写回去
  4. synchronized(queueLock) {
  5. Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), \"Not enough space in memory channel \" +
  6. \"queue to rollback takes. This should never happen, please report\");
  7. while(!takeList.isEmpty()) {
  8. queue.addFirst(takeList.removeLast());
  9. }
  10. ...
  11. }
  12. ...
  13. }

读完代码可见 

batchSize是针对Source和Sink提出的一个概念,它用来限制source和sink对event批量处理的。

即一次性你可以处理batchSize个event,这个一次性就是指在一个事务中。

这个参数值越大,每个事务提交的范围就越大,taskList的清空等操作次数会减少,因此性能肯定会提升,但是可能在出错时,回滚的返回也会变大。

接下来看一下 

内存通道中的内部类MemoryTransaction:

 private class MemoryTransaction extends BasicTransactionSemantics {
    private LinkedBlockingDeque takeList;
    private LinkedBlockingDeque putList;
    private final ChannelCounter channelCounter;
    private int putByteCounter = 0;
    private int takeByteCounter = 0;

    public MemoryTransaction(int transCapacity, ChannelCounter counter) {
      putList = new LinkedBlockingDeque(transCapacity);
      takeList = new LinkedBlockingDeque(transCapacity);

      channelCounter = counter;
    }

可见transactionCapacity参数其实

就是putList和takeList的容量大小。在flume1.5版本中SpillableMemoryChannel的putList和takeList的长度为largestTakeTxSize和largestPutTxSize参数,该参数值为5000

时间: 2024-11-02 13:02:21

理解FlumeNG的batchSize和transactionCapacity参数和传输事务的原理 【转】的相关文章

(转)理解YOLOv2训练过程中输出参数含义

最近有人问起在YOLOv2训练过程中输出在终端的不同的参数分别代表什么含义,如何去理解这些参数?本篇文章中我将尝试着去回答这个有趣的问题. 刚好现在我正在训练一个YOLOv2模型,拿这个真实的例子来讨论再合适不过了,下边是我训练中使用的 .cfg 文件(你可以在cfg文件夹下找到它): 以下是训练过程中终端输出的一个截图: 以上截图显示了所有训练图片的一个批次(batch),批次大小的划分根据我们在 .cfg 文件中设置的subdivisions参数.在我使用的 .cfg 文件中 batch =

JVM理解(上):classloader加载class文件的原理和机制

转自:https://www.jianshu.com/p/52c38cf2e3d4 JVM理解(上):classloader加载class文件的原理和机制 安东尼_Anthony关注 12018.11.10 10:16:40字数 4,361阅读 3,731 1 JVM架构整体架构 在进入classloader分析之前,先了解一下jvm整体架构: JVM架构 JVM被分为三个主要的子系统 (1)类加载器子系统(2)运行时数据区(3)执行引擎 1. 类加载器子系统 Java的动态类加载功能是由类加载

深入理解Java虚拟机(jvm性能调优+内存模型+虚拟机原理)视频教程

14套java精品高级架构课,缓存架构,深入Jvm虚拟机,全文检索Elasticsearch,Dubbo分布式Restful 服务,并发原理编程,SpringBoot,SpringCloud,RocketMQ中间件,Mysql分布式集群,服务架构,运 维架构视频教程 14套精品课程介绍: 1.14套精 品是最新整理的课程,都是当下最火的技术,最火的课程,也是全网课程的精品: 2.14套资 源包含:全套完整高清视频.完整源码.配套文档: 3.知识也 是需要投资的,有投入才会有产出(保证投入产出比是

深入理解带参方法-----对象作为参数的方法

在了解过普通的带参方法后,我们再次将视角拉到深入带参方法中. 在数组作为参数的方法里面我们知道了可以将多个学生的成绩添加到数组中并打印出来, 那么如果现在不仅要添加学生的成绩,还要添加学生的年龄和成绩,如何实现呢? 面对这样的问题我们就可以使用面向对象的思想,把所有要添加的学生信息封装到学生类中,只需要在方法中传递一个学生对象就可以包含所有的信息. 接下来就来看看到底应该怎样实现吧. 1 public class Student { 2 //学生类 3 public int id; 4 publ

&lt;16&gt;【理解】数组元素作为函数参数+【掌握】数组名作为函数参数+【掌握】数组名作为函数参数的注意点

#include <stdio.h> int sum(int x,int y){ return x+y; } void printNum(int x){ //判断x的值 if (x>0) { printf("%d\t",x); }else{ printf("0\t"); } } int main(int argc, const char * argv[]) { int a[5]={1,-2,-3,-4,5}; //需求:要求计算数组的第一个元素和最

垃圾收集器与内存分配策略之篇三:理解GC日志和垃圾收集器参数总结

一.GC日志片段如下: [GC[DefNew: 2658K->371K(4928K), 0.0038671 secs] 2658K->2419K(15872K), 0.0173438 secs] [Times: user=0.00 sys=0.00, real=0.02 secs] [Full GC[Tenured: 2048K->370K(10944K), 0.0331593 secs] 4564K->370K(15872K), [Perm : 176K->176K(122

JVM理论:(二/4)理解GC日志、垃圾收集器参数总结

JVM的GC日志的主要参数包括如下几个: -XX:+PrintGC 输出GC日志 -XX:+PrintGCDetails 输出GC的详细日志 -XX:+PrintGCTimeStamps 输出GC的时间戳(以基准时间的形式) -XX:+PrintGCDateStamps 输出GC的时间戳(以日期的形式,如 2013-05-04T21:53:59.234+0800) -XX:+PrintHeapAtGC 在进行GC的前后打印出堆的信息 -XX:+PrintGCApplicationStoppedT

使用Discuz!自带参数防御CC攻击以及原理,修改Discuz X 开启防CC攻击后,不影响搜索引擎收录的方法

这部份的工作,以前花的时间太少. 希望能产生一定的作用. http://www.nigesb.com/discuz-cc-attacker-defence.html http://bbs.zb7.com/thread-8644-1-1.html CC攻击确实是很蛋疼的一种攻击方式,Discuz!的配置文件中已经有了一个自带的减缓CC攻击的参数,在配置文件config.inc.php中: 1 $attackevasive = 0;             // 论坛防御级别,可防止大量的非正常请求

深入理解Linux网络技术内幕——IPv4 报文的传输发送

报文传输,指的是报文离开本机,发往其他系统的过程. 传输可以由L4层协议发起,也可以由报文转发发起. 在深入理解Linux网络技术内幕--IPv4 报文的接收(转发与本地传递)一文中,我们可以看到,报文转发最后会调用dst_output与邻居子系统进行交互,然后传给设备驱动程序. 这里,我们从L4层协议发起的传输,最后也会经历这一过程(调用dst_output).本文讨论的是L4层协议发起的传输,在IPv4协议处理(IP层)中的一些环节. 大蓝图 我们先看下传输环节的大蓝图,以便对传输这一过程有