Storm批处理事务原理详解

1、事务-批处理

对于容错机制,Storm通过一个系统级别的组件acker,结合xor校验机制判断一个tuple是否发送成功,进而spout可以重发该tuple ,保证一个tuple在k\出错的情况下至少被重发一次。

但是在需要精确统计tuple的数量如销售金额场景时,希望每个tuple”被且仅被处理一次” 。Storm 0.7.0引入了Transactional Topology, 它可以保证每个tuple”被且仅被处理一次”, 这样我们就可以实现一种非常准确,且高度容错方式来实现计数类应用。

逐个处理单个tuple,增加很多开销,如写库、输出结果频率过高

事务处理单个tuple效率比较低,因此storm中引入batch处理,

批处理是一次性处理一批(batch)tuple,事务可确保该批次要么全部处理成功,如果有处理失败的则全部不计,Storm会对失败的批次重新发送,且确保每个batch被且仅被处理一次。

2、API介绍

IBatchBolt有三个方法

execute(Tuple tuple)

finishBatch()   会在所有的tuple处理完成之后,对整个批次的结果进行处理,执行commit的时候执行该方法

prepare (java.util.Map conf, TopologyContext context, BatchOutputCollector collector,T id)

ITransactionalSpout有以下几个主要方法:

ITransactionalSpout.Coordinator<T> getCoordinator(java.util.Map conf,

TopologyContext context)

ITransactionalSpout.Emitter<T> getEmitter(java.util.Map conf,

TopologyContext context)

3.事务机制原理

1) 对于只处理一次的需要,从原理上讲,需要在发送tuple(单个或者一个批次)的时候带上相同的事务id(txid)

在处理的时候就会根据这个txid判断是否已经处理过了。处理过了就会把处理结果和txid保存起来,以便以后比对,而且需要保障顺序性,在当前请求txid提交前,所有比自己低的txid请求都需要提交。

    在事务批处理的时候,一批tuple赋予一个体txid,为了提高batch之间处理的并行度

Storm采用了pipline(管道)处理模型,这样多个事务可以并行执行,但是提交的时候是严格按照顺序的

2. Storm事务处理中,把一个batch的计算分成两个阶段,Processing和commit阶段

      Process阶段:多个batch可以并行计算

      Commiting 阶段:batch之间强制按照顺序进行提交

Processing阶段:多个batch可以并行计算,上面例子中bolt2是普通的batchbolt(实现BaseBatchBolt),那么多个batch在bolt2的task之间可以并行执行,比如对batch3和batch4并行执行execute或finishbatch(什么时候调用该操作,后面会介绍)方法。

Commiting阶段:batch之间强制按照顺序进行提交,上图中Bolt3实现BaseBatchBolt并且标记需要事务处理的(实现了ICommitter接口或者通过TransactionalTopologyBuilder的setCommitterBolt方法把BatchBolt添加到topology里面),那么在Storm认为可以提交(至于什么时候可以提交,后面会介绍)batch的时候调用finishbatch,在finishBatch做xid的比较以及状态保存工作。例子中batch2必须等待batch1提交后,才可以进行提交。

Storm事务性的拓扑看起来比较复杂,需要对batch的commit进行管理,错误的发现,batch的发射以及处理等等,其内部实现完全基于storm的相关底层操作进行抽象。

当使用Transactional Topologies的时候, storm为你做下面这些事情:

  • 管理状态: Storm把所有实现Transactional Topologies所必须的状态保存在zookeeper里面。 这包括当前transaction id以及定义每个batch的一些元数据。
  • 协调事务: Storm帮你管理所有事情, 以帮你决定在任何一个时间点是该proccessing还是该committing。
  • 错误检测: Storm利用acking框架来高效地检测什么时候一个batch被成功处理了,被成功提交了,或者失败了。Storm然后会相应地replay对应的batch。你不需要自己手动做任何acking或者anchoring(emit发生的动作) — storm帮你搞定所有事情。
  • 内置的批处理API: Storm在普通bolt之上包装了一层API来提供对tuple的批处理支持。Storm管理所有的协调工作,包括决定什么时候一个bolt接收到一个特定transaction的所有tuple。Storm同时也会自动清理每个transaction所产生的中间数据。
  • 最后,需要注意的一点是Transactional Topologies需要一个可以完全重发(replay)一个特定batch的消息的队列系统(Message Queue)。storm-contrib里面的storm-kafka实现了这个。

事务性topology从实现上来讲,包括事务性的spout,以及事务性的bolt。

2) 事务性的spout需要实现ITransactionalSpout,这个接口包含两个内部类Coordinator和Emitter。在topology运行的时候,事务性的spout内部包含一个子的topology,类似下面这个结构:

Interface ITransactionalSpout.Coordinator<X>

Method Summary
 void close() 
 X initializeTransaction(java.math.BigInteger txid, X prevMetadata)   初始化启动事务,prevMetadata:元数据
 boolean isReady()   返回时true,可以继续启动下一个事务

其中coordinator是spout,emitter是bolt。

这里面有两种类型的tuple,一种是事务性的tuple,一种是真实batch中的tuple;

coordinator为事务性batch发射tuple,Emitter负责为每个batch实际发射tuple。

具体如下:

  • coordinator只有一个,emitter根据并行度可以有多个实例
  • emitter以all grouping(广播)的方式订阅coordinator的”batch emit”流
  • coordinator (其实是是一个内部的spout)开启一个事务准备发射一个batch时候,进入一个事务的processing阶段,会发射一个事务性tuple(transactionAttempt & metadata)到”batch emit”流

*****说明******

  

TransactionalTopology里发送的tuple都必须以TransactionAttempt作为第一个field,storm根据这个field来判断tuple属于哪一个batch。

TransactionAttempt包含两个值:一个transaction id,一个attempt id。transaction id的作用就是我们上面介绍的对于每个batch中的tuple是唯一的

,而且不管这个batch    replay多少次都是一样的。attempt id是对于每个batch唯一的一个id, 但是对于同一个batch,它replay之后的attempt id跟replay之前就不一样了,

我们可以把attempt id理解成replay-times, storm利用这个id来区别一个batch发射的tuple的不同版本

metadata(元数据)中包含当前事务可以从哪个point进行重放数据,存放在zookeeper中的,spout可以通过Kryo从zookeeper中序列化和反序列化该元数据。

**************

  • Emiter接收到这个tuple后,会进行batch tuple的发送
  • Storm通过anchoring/acking机制来检测事务是否已经完成了processing 阶段;
  • Processing阶段完成后,并且之前的transactions都已经提交了,coordinator发射一个tuble到” commit”流,进入commit阶段。
  • commiting bolts通过all grouping方式订阅该”commit”流,事务提交后,coordinator同样通过anchoring/acking机制确认已经完成了commit阶段,接收到ack后,在zookeeper上把该transaction标记为完成。

  coordinator只有一个,emitter根据并行度可以有多个实例

  事务内部处理流程图

  

3) 事务性的Bolt继承BaseTransactionalBolt,

处理batch在一起的tuples,对于每一个tuple调用调用execute方法,而在整个batch处理(processing)完成的时候调用finishBatch方法。如果BatchBolt被标记成Committer,则只能在commit阶段调用finishBolt方法。一个batch的commit阶 段由storm保证只在前一个batch成功提交之后才会执行。并且它会重试直到topology里面的所有bolt在commit完成提交。那么如何知道batch的processing完成了,也就是bolt是否接收处理了batch里面所有的tuple;在bolt内部,有一个CoordinatedBolt的模型。

CoordinateBolt具体原理如下:

CoordinateBolt具体原理如下:

  • 真正执行计算的bolt外面封装了一个CoordinateBolt。真正执行任务的bolt我们称为real bolt。
  • 每个CoordinateBolt记录两个值:有哪些task给我发送了tuple(根据topology的grouping信息);我要给哪些tuple发送信息(同样根据groping信息)
  • Real bolt发出一个tuple后,其外层的CoordinateBolt会记录下这个tuple发送给哪个task了。
  • 等所有的tuple都发送完了之后,CoordinateBolt通过另外一个特殊的stream以emitDirect的方式告诉所有它发送过 tuple的task,它发送了多少tuple给这个task。下游task会将这个数字和自己已经接收到的tuple数量做对比,如果相等,则说明处理 完了所有的tuple。
  • 下游CoordinateBolt会重复上面的步骤,通知其下游。

事务性的拓扑在storm中的一个应用是Trident,它是在storm的原语和事务性的基础上做更高层次的抽象,做到一致性和恰好一次的语义,后续章节会对trident做分析。

时间: 2024-10-06 04:36:42

Storm批处理事务原理详解的相关文章

Storm批处理事务API详解

看此博文前,建议先查看 Storm批处理事务原理详解 为什么要进行批处理(Batch)? 逐个处理单个tuple,增加很多开销,如写库.输出结果频率过高 事务处理单个tuple效率比较低,因此storm中引入batch处理 批处理是一次性处理一批(batch)tuple,而事务则确保该批次要么全部处理成功,如果有处理失败的则全部不计,Storm会对失败的批次重新发送,且确保每个batch被且仅被处理一次 Spout有三种: 分别为: 1. ITransactionalSpout<T>,同Bas

Storm概念、原理详解及其应用(一)BaseStorm

本文借鉴官文,添加了一些解释和看法,其中有些理解,写的比较粗糙,有问题的地方希望大家指出.写这篇文章,是想把一些官文和资料中基础.重点拿出来,能总结出便于大家理解的话语.与大多数"wordcount"代码不同的是,并不会有如何运行第一storm代码等内容,只有在运行完代码后,发现需要明白:"知其然,并知其所以然". Storm是什么?为什么要用Storm?为什么不用Spark? 第一个问题,以下概念足以解释: Storm是基于数据流的实时处理系统,提供了大吞吐量的实

(转)MySQL备份原理详解

MySQL备份原理详解 原文:http://www.cnblogs.com/cchust/p/5452557.html 备份是数据安全的最后一道防线,对于任何数据丢失的场景,备份虽然不一定能恢复百分之百的数据(取决于备份周期),但至少能将损失降到最低.衡量备份恢复有两个重要的指标:恢复点目标(RPO)和恢复时间目标(RTO),前者重点关注能恢复到什么程度,而后者则重点关注恢复需要多长时间.这篇文章主要讨论MySQL的备份方案,重点介绍几种备份方式的原理,包括文件系统快照(LVM),逻辑备份工具M

JAVA消息服务JMS规范及原理详解

一.简介 JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信.Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持. JMS允许应用程序组件基于JavaEE平台创建.发送.接收和读取消息.它使分布式通信耦合度更低,消息服务更加可靠以及异步性. 二.常用术语介绍 在提到JMS时,我们通常会说到一些术语,解释如下: 消息

SpringBoot启动机制(starter机制)核心原理详解

作者:MyBug 一.前言 使用过springboot的同学应该已经知道,springboot通过默认配置了很多框架的使用方式帮我们大大简化了项目初始搭建以及开发过程.本文的目的就是一步步分析springboot的启动过程,这次主要是分析springboot特性自动装配.那么首先带领大家回顾一下以往我们的web项目是如何搭建的,通常我们要搭建一个基于Spring的Web应用,我们需要做以下一些工作:pom文件中引入相关jar包,包括spring.springmvc.redis.mybaits.l

图像处理中的数学原理详解17——卷积定理及其证明

欢迎关注我的博客专栏"图像处理中的数学原理详解" 全文目录请见 图像处理中的数学原理详解(总纲) http://blog.csdn.net/baimafujinji/article/details/48467225 图像处理中的数学原理详解(已发布的部分链接整理) http://blog.csdn.net/baimafujinji/article/details/48751037 1.4.5   卷积定理及其证明 卷积定理是傅立叶变换满足的一个重要性质.卷积定理指出,函数卷积的傅立叶变

Java虚拟机工作原理详解

原文地址:http://blog.csdn.net/bingduanlbd/article/details/8363734 一.类加载器 首先来看一下java程序的执行过程. 从这个框图很容易大体上了解java程序工作原理.首先,你写好java代码,保存到硬盘当中.然后你在命令行中输入 [java] view plaincopy javac YourClassName.java 此时,你的java代码就被编译成字节码(.class).如果你是在Eclipse IDE或者其他开发工具中,你保存代码

kickstart安装系统原理详解

前言 作为中小公司的运维,经常会遇到一些机械式的重复工作,例如:有时公司同时上线几十甚至上百台服务器,而且需要我们在短时间内完成系统安装. 常规的办法有什么? 光盘安装系统===>一个服务器DVD内置光驱百千块,百台服务器都配光驱就浪费了,因为一台服务器也就开始装系统能用的上,以后用的机会屈指可数.用USB外置光驱,插来插去也醉了. U盘安装系统===>还是同样的问题,要一台一台服务器插U盘. 网络安装系统(ftp,http,nfs) ===>这个方法不错,只要服务器能联网就可以装系统了

SVM -支持向量机原理详解与实践之四

SVM -支持向量机原理详解与实践之四 SVM原理分析 SMO算法分析 SMO即Sequential minmal optimization, 是最快的二次规划的优化算法,特使对线性SVM和稀疏数据性能更优.在正式介绍SMO算法之前,首先要了解坐标上升法. 坐标上升法(Coordinate ascent) 坐标上升法(Coordinate Ascent)简单点说就是它每次通过更新函数中的一维,通过多次的迭代以达到优化函数的目的. 坐标上升法原理讲解 为了更加通用的表示算法的求解过程,我们将算法表