Storm 中什么是-acker,acker工作流程介绍

概述

我们知道storm一个很重要的特性是它能够保证你发出的每条消息都会被完整处理, 完整处理的意思是指:

一个tuple被完全处理的意思是: 这个tuple以及由这个tuple所导致的所有的tuple都被成功处理。而一个tuple会被认为处理失败了如果这个消息在timeout所指定的时间内没有成功处理。

也就是说对于任何一个spout-tuple以及它的所有子孙到底处理成功失败与否我们都会得到通知。关于如果做到这一点的原理,可以看看Twitter Storm如何保证消息不丢失这篇文章。从那篇文章里面我们可以知道,storm里面有个专门的acker来跟踪所有tuple的完成情况。这篇文章就来讨论acker的详细工作流程。

源代码列表

这篇文章涉及到的源代码主要包括:

  1. backtype.storm.daemon.acker
  2. backtype.storm.daemon.task
  3. backtype.storm.task.OutputCollectorImpl

算法简介

acker对于tuple的跟踪算法是storm的主要突破之一, 这个算法使得对于任意大的一个tuple树, 它只需要恒定的20字节就可以进行跟踪了。原理很简单:acker 对于每个spout-tuple保存一个ack-val的校验值,它的初始值是0, 然后每发射一个tuple/ack一个tuple,那么tuple的id都要跟这个校验值异或一下,并且把得到的值更新为ack-val的新值。那么假设 每个发射出去的tuple都被ack了, 那么最后ack-val一定是0(因为一个数字跟自己异或得到的值是0)。

进入正题

那么下面我们从源代码层面来看看哪些组件在哪些时候会给acker发送什么样的消息来共同完成这个算法的。acker对消息进行处理的主要是下面这块代码:

(let [id (.getValue tuple 0)

^TimeCacheMap pending @pending

curr (.get pending id)

curr (condp = (.getSourceStreamId tuple)

ACKER-INIT-STREAM-ID (-> curr

(update-ack id)

(assoc :spout-task (.getValue tuple 1)))

ACKER-ACK-STREAM-ID (update-ack

curr (.getValue tuple 1))

ACKER-FAIL-STREAM-ID (assoc curr :failed true))]

...)

Spout创建一个新的tuple的时候给acker发送消息

消息格式(看上面代码的第1行和第7行对于tuple.getValue()的调用)助

(spout-tuple-id, task-id)

消息的streamId是__ack_init(ACKER-INIT-STREAM-ID)

这 是告诉acker, 一个新的spout-tuple出来了, 你跟踪一下,它是由id为task-id的task创建的(这个task-id在后面会用来通知这个task:你的tuple处理成功了/失败了)。处理 完这个消息之后, acker会在它的pending这个map(类型为TimeCacheMap)里面添加这样一条记录:

{spout-tuple-id {:spout-tasktask-id :valack-val)}

这就是acker对spout-tuple进行跟踪的核心数据结构, 对于每个spout-tuple所产生的tuple树的跟踪都只需要保存上面这条记录。acker后面会检查:val什么时候变成0,变成0, 说明这个spout-tuple产生的tuple都处理完成了。

Bolt发射一个新tuple的时候会给acker发送消息么?

任何一个bolt在发射一个新的tuple的时候,是不会直接通知acker的,如果这样做的话那么每发射一个消息会有三条消息了:

  1. Bolt创建这个tuple的时候,把它发给下一个bolt的消息
  2. Bolt创建这个tuple的时候,发送给acker的消息
  3. ack tuple的时候发送的ack消息

事 实上storm里面只有第一条和第三条消息,它把第二条消息省掉了, 怎么做到的呢?storm这点做得挺巧妙的,bolt在发射一个新的bolt的时候会把这个新tuple跟它的父tuple的关系保存起来。然后在ack 每个tuple的时候,storm会把要ack的tuple的id, 以及这个tuple新创建的所有的tuple的id的异或值发送给acker。这样就给每个tuple省掉了一个消息(具体看下一节)。

Tuple被ack的时候给acker发送消息

每个tuple在被ack的时候,会给acker发送一个消息,消息格式是:助

(spout-tuple-id, tmp-ack-val)

消息的streamId是__ack_ack(ACKER-ACK-STREAM-ID)

注意,这里的tmp-ack-val是要ack的tuple的id与由它新创建的所有的tuple的id异或的结果:

tuple-id ^ (child-tuple-id1 ^ child-tuple-id2 ... )

我们可以从task.clj里面的send-ack方法看出这一点:

(defn- send-ack [^TopologyContext topology-context

^Tuple input-tuple

^List generated-ids send-fn]

(let [ack-val (bit-xor-vals generated-ids)]

(doseq [

[anchor id] (.. input-tuple

getMessageId

getAnchorsToIds)]

(send-fn (Tuple. topology-context

[anchor (bit-xor ack-val id)]

(.getThisTaskId topology-context)

ACKER-ACK-STREAM-ID))

)))

这里面的generated-ids参数就是这个input-tuple的所有子tuple的id, 从代码可以看出storm会给这个tuple的每一个spout-tuple发送一个ack消息。

为什么说这里的generated-ids是input-tuple的子tuple呢? 这个send-ack是被OutputCollectorImpl里面的ack方法调用的:

public void ack(Tuple input) {

List generated = getExistingOutput(input);

// don‘t just do this directly in case

// there was no output

_pendingAcks.remove(input);

_collector.ack(input, generated);

}

generated是由getExistingOutput(input)方法计算出来的, 我们再来看看这个方法的定义:

private List getExistingOutput(Tuple anchor) {

if(_pendingAcks.containsKey(anchor)) {

return _pendingAcks.get(anchor);

} else {

List ret = new ArrayList();

_pendingAcks.put(anchor, ret);

return ret;

}

}

_pendingAcks里面存的是什么东西呢?

private Tuple anchorTuple(Collection< Tuple > anchors,

String streamId,

List< Object > tuple) {

// The simple algorithm in this function is the key

// to Storm. It is what enables Storm to guarantee

// message processing.

// 这个map存的东西是 spout-tuple-id到ack-val的映射

Map< Long, Long > anchorsToIds

= new HashMap<Long, Long>();

// anchors 其实就是它的所有父亲:spout-tuple

if(anchors!=null) {

for(Tuple anchor: anchors) {

long newId = MessageId.generateId();

// 告诉每一个父亲,你们又多了一个儿子了。

getExistingOutput(anchor).add(newId);

for(long root: anchor.getMessageId()

.getAnchorsToIds().keySet()) {

Long curr = anchorsToIds.get(root);

if(curr == null) curr = 0L;

// 更新spout-tuple-id的ack-val

anchorsToIds.put(root, curr ^ newId);

}

}

}

return new Tuple(_context, tuple,

_context.getThisTaskId(),

streamId,

MessageId.makeId(anchorsToIds));

}

从上面代码里面的红色部分我们可以看出, _pendingAcks里面维护的其实就是tuple到自己儿子的对应关系。

Tuple处理失败的时候会给acker发送失败消息

acker会忽略这种消息的消息内容(消息的streamId为ACKER-FAIL-STREAM-ID), 直接将对应的spout-tuple标记为失败(最上面代码第9行)

最后Acker发消息通知spout-tuple对应的Worker

最后, acker会根据上面这些消息的处理结果来通知这个spout-tuple对应的task:

(when (and curr

(:spout-task curr))

(cond (= 0 (:val curr))

;; ack-val == 0 说明这个tuple的所有子孙都

;; 处理成功了(都发送ack消息了)

;; 那么发送成功消息通知创建这个spout-tuple的task.

(do

(.remove pending id)

(acker-emit-direct @output-collector

(:spout-task curr)

ACKER-ACK-STREAM-ID

[id]

))

;; 如果这个spout-tuple处理失败了

;; 发送失败消息给创建这个spout-tuple的task

(:failed curr)

(do

(.remove pending id)

(acker-emit-direct @output-collector

(:spout-task curr)

ACKER-FAIL-STREAM-ID

[id]

))

))

时间: 2024-10-09 20:54:54

Storm 中什么是-acker,acker工作流程介绍的相关文章

浅析live555媒体库之工作流程介绍

live555项目的源代码包括四个基本的库,各种测试代码以及Media Server. 工作模块 四个基本的库分别是: UsageEnvironment  TaskScheduler, groupsock, liveMedia和BasicUsageEnvironment. 官网英文的基本介绍截图如下: 虽是英文的,但是难度不大,能看懂大致意思.这里多说一句,程序员还是要接触并学习英文,毕竟好的技术都是国外引进的.简单说下我的理解 UsageEnvironment   该类库是对系统环境的抽象,包

工作流程介绍

欢迎您加入2014第一诚信兼职网(我们承接的是全球各大刷信誉平台任务)任务随机发配! 兼职人员工作流程以及常见问题,请认真看完! 郑重声明: 凡是收取任何押金,向你索要账号密码的类似项目,都是骗人的!虽然我们对业务员的要求比较高,但加入本项目我们绝不收取任何押金,更不会向你索要任何密码. 据统计,每天有5万人在网上开店,网店数量已经突破百万大关!一个没有信誉的网店,生意绝对不可能好起来!所以越来越多的卖家开始刷信誉,这也就催生网店刷钻手这个行业.网店刷钻手的任务就是给卖家店铺刷信誉赚钱! ———

OSG 中 相交測试 模块 工作流程及原理

主要涉及三个类: 1. osgUtil::PolytopeIntersector // 详细不同算法实现类 2. osgUtil::IntersectionVisitor //用来遍历节点树的每一个节点 3.osg::Node * mNode;  //  你要做相交測试的根节点 先看使用方法: osg::ref_ptr<osgUtil::PolytopeIntersector> intersector = new osgUtil::PolytopeIntersector(osgUtil::In

mybatis工作流程介绍

mybatis通过注解将statement配置起来,再通过java对象和statement中的sql生成待执行的sql,通过mybatis框架执行sql并将结果映射为java对象 mybatis架构图: 解释: 1.mybatis基本配置文件配置了mybatis运行环境信息,包含:数据连接池.数据库事务管理等,映射配置文件配置sql语句,要在基本配置文件中加载 2.通过mybatis环境配置文件构造会话工厂及SqlSessionFactory 3.通过SqlSessionFactory会话工厂创

爬取当当网的图书信息之工作流程介绍

前往http://book.dangdang.com/我们可以看到当当网上面的图书种类非常丰富 我们是计算机类图书为例子,那么计算机类图书页面的URL  http://book.dangdang.com/01.54.htm?ref=book-01-A是我们的种子URL 当我们进入这个页面可以看到很多计算机类图书,什么都别说了,都抓取下来,然后在进入子品类页面继续抓取信息,我们以程序涉及品类为例 进来之后我们可以看到大量的图书,而且在页面上方我们可以看到100页,可不止这么一点还有99页没有显示出

工作流程管理的重要性

对成长企业来说,"成长的关键不是经济环境,也不是市场条件,而是企业自身的管理条件",从单纯的业务流程升级为管理流程是提升企业水平的重要一步.管理流程的制定水平成为影响成长型企业生存的关键要素.好的管理在于好的流程,好的流程在于好的执行. 企业发展到一定规模后,由企业一把手管理全部业务的局面难以为继,然而由于缺乏组织上的准备,没有系统地计划及在关键岗位上培养对象,管理团队薄弱,业务流程模糊,领导人长期事必躬亲,其他员工难以分担职责,严重影响企业发展后劲. 从另一个角度来看.企业发展靠两条

Android 4.4 Kitkat Phone工作流程浅析(十二)__4.4小结与5.0概览

前置文章: <Android 4.4 Kitkat Phone工作流程浅析(一)__概要和学习计划> <Android 4.4 Kitkat Phone工作流程浅析(二)__UI结构分析> <Android 4.4 Kitkat Phone工作流程浅析(三)__MO(去电)流程分析> <Android 4.4 Kitkat Phone工作流程浅析(四)__RILJ工作流程简析> <Android 4.4 Kitkat Phone工作流程浅析(五)__M

【转】Android 4.4 Kitkat Phone 对比 5.0 Lollipop Phone工作流程浅析

概述 在Android 4.4 中,Google 对Telephony_Phone进行了重构,前面也通过一些列文章分析了Android 4.4 中Telephony Phone的工作流程.但在2014年10月15日,Google发布了Android 5.0 预览版,正式版也在一个月之后发布.Android 5.0 变化非常大,无论从UI风格还是功能实现上,Google都进行了大刀阔斧的修改.同时,Telephony_Phone模块的架构也再次进行了调整,调整之后的Telephony_Phone各

MapReduce与Yarn 的详细工作流程分析

MapReduce详细工作流程之Map阶段 如上图所示 首先有一个200M的待处理文件 切片:在客户端提交之前,根据参数配置,进行任务规划,将文件按128M每块进行切片 提交:提交可以提交到本地工作环境或者Yarn工作环境,本地只需要提交切片信息和xml配置文件,Yarn环境还需要提交jar包:本地环境一般只作为测试用 提交时会将每个任务封装为一个job交给Yarn来处理(详细见后边的Yarn工作流程介绍),计算出MapTask数量(等于切片数量),每个MapTask并行执行 MapTask中执