Storm容错机制Acker详解和实战案例

Storm中有个特殊的Executor叫acker,他们负责跟踪spout发出的每一个Tuple的Tuple树。当acker发现一个Tuple树已经处理完成了,它会告诉框架回调Spout的ack(),否则回调Spout的fail()。

Acker的跟踪算法是Storm的主要突破之一,对任意大的一个Tuple树,它只需要恒定的20字节就可以进行跟踪。

我们期望的是,如果某个Tuple被Bolt执行失败了,则Spout端可以重新发送该Tuple。但很遗憾的是,框架不会自动重新发送,需要我们自己手工编码实现。后续给大家实战案例!

什么是Tuple树?

Spout类代码如下:

package les19.Ack_Fail;

import java.io.BufferedReader;

import java.io.FileInputStream;

import java.io.InputStreamReader;

import java.util.Map;

import java.util.UUID;

import java.util.concurrent.ConcurrentHashMap;

import org.apache.storm.spout.SpoutOutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.IRichSpout;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;

public class AckSpout implements IRichSpout{

/**

*

*/

private static final long serialVersionUID = 1L;

FileInputStream fis;

InputStreamReader isr;

BufferedReader br;

private ConcurrentHashMap<Object, Values> _pending;//线程安全的Map,存储emit过的tuple

private ConcurrentHashMap<Object, Integer> fail_pending;//存储失败的tuple和其失败次数

SpoutOutputCollector collector = null;

String str = null;

@Override

public void nextTuple() {

try {

while ((str = this.br.readLine()) != null) {

// 过滤动作

UUID msgId = UUID.randomUUID();

String arr[] = str.split("\t");

String date = arr[2].substring(0, 10);

String orderAmt = arr[1];

Values val = new Values(date,orderAmt);

this._pending.put(msgId, val);

collector.emit(val, msgId);

System.out.println("_pending.size()="+_pending.size());

}

} catch (Exception e) {

// TODO: handle exception

}

}

@Override

public void close() {

// TODO Auto-generated method stub

try {

br.close();

isr.close();

fis.close();

} catch (Exception e) {

// TODO: handle exception

e.printStackTrace();

}

}

@Override

//初始化函数

public void open(Map conf, TopologyContext context,

SpoutOutputCollector collector) {

try {

this.collector = collector;

this.fis = new FileInputStream("order.log");

this.isr = new InputStreamReader(fis, "UTF-8");

this.br = new BufferedReader(isr);

_pending = new ConcurrentHashMap<Object, Values>();

fail_pending = new ConcurrentHashMap<Object, Integer>();

} catch (Exception e) {

e.printStackTrace();

}

// TODO Auto-generated method stub

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

// TODO Auto-generated method stub

declarer.declare(new Fields("date","orderAmt"));

}

@Override

public Map<String, Object> getComponentConfiguration() {

// TODO Auto-generated method stub

return null;

}

@Override

public void ack(Object msgId) {

// TODO Auto-generated method stub

System.out.println("_pending size 共有:"+_pending.size());

System.out.println("spout ack:"+msgId.toString()+"---"+msgId.getClass());

this._pending.remove(msgId);

System.out.println("_pending size 剩余:"+_pending.size());

}

@Override

public void activate() {

// TODO Auto-generated method stub

}

@Override

public void deactivate() {

// TODO Auto-generated method stub

}

@Override

public void fail(Object msgId) {

// TODO Auto-generated method stub

System.out.println("spout fail:"+msgId.toString());

Integer fail_count = fail_pending.get(msgId);//获取该Tuple失败的次数

if (fail_count == null) {

fail_count = 0;

}

fail_count ++ ;

if (fail_count>=3) {

//重试次数已满,不再进行重新emit

fail_pending.remove(msgId);

}else {

//记录该tuple失败次数

fail_pending.put(msgId, fail_count);

//重发

this.collector.emit(this._pending.get(msgId), msgId);

}

}

}

Bolt如下:

package les19.Ack_Fail;

import java.util.Map;

import org.apache.storm.task.OutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.IRichBolt;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Tuple;

import org.apache.storm.tuple.Values;

public class AckBolt implements IRichBolt {

/**

*

*/

private static final long serialVersionUID = 1L;

OutputCollector collector = null;

TopologyContext context = null;

@Override

public void cleanup() {

// TODO Auto-generated method stub

}

int num = 0;

String url = null;

String session_id = null;

String date = null;

String province_id = null;

@Override

public void execute(Tuple input) {

try {

date = input.getStringByField("date") ;

Double orderAmt = Double.parseDouble(input.getStringByField("orderAmt"));

collector.emit(input,new Values(date,orderAmt));//注意参数,第一个参数是Tuple本身

collector.ack(input);

//     Thread.sleep(300);

} catch (Exception e) {

collector.fail(input);

e.printStackTrace();

}

}

//初始化,对应spout的open函数

@Override

public void prepare(Map stormConf, TopologyContext context,

OutputCollector collector) {

// TODO Auto-generated method

this.context = context ;

this.collector = collector ;

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

// TODO Auto-generated method stub

declarer.declare(new Fields("date","orderAmt")) ;

}

@Override

public Map<String, Object> getComponentConfiguration() {

// TODO Auto-generated method stub

return null;

}

}

TOPO类如下:

package les19.Ack_Fail;

import org.apache.storm.Config;

import org.apache.storm.LocalCluster;

import org.apache.storm.StormSubmitter;

import org.apache.storm.topology.TopologyBuilder;

public class Ack_FailTopo {

/**

* @param args

*/

public static void main(String[] args) {

// TODO Auto-generated method stub

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", new AckSpout(), 1);

builder.setBolt("bolt", new AckBolt(), 1).shuffleGrouping("spout");

Config conf = new Config() ;

//conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);

conf.setDebug(false);

if (args.length > 0) {

try {

StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

} catch (Exception e) {

e.printStackTrace();

}

}else {

LocalCluster localCluster = new LocalCluster();

localCluster.submitTopology("mytopology", conf, builder.createTopology());

}

}

}


想了解更多,见我的51CTO上的Storm视频教程http://edu.51cto.com/course/course_id-9041.html

,本节来自第18-19讲。

时间: 2024-10-25 03:42:22

Storm容错机制Acker详解和实战案例的相关文章

JavaSE数组详解与实战案例应用

1.数组声明: 第一种:数据类型 变量名[],例如:int x[]; 第二种:数据类型[] 变量名=new 数据类型[要在内存中开辟的空间数量,例如:int[] x=new int[3]; 第三种:数据类型[] 变量名=new 数据类型[]{元素1,元素2,元素3}; 例子1: public staticvoidmain(String[] arg){ //定义一个字符串数组,将各个元素(也就是月饼)放入其中 String[]names={"五仁月饼","香辣牛肉月饼"

Storm概念、原理详解及其应用(一)BaseStorm

本文借鉴官文,添加了一些解释和看法,其中有些理解,写的比较粗糙,有问题的地方希望大家指出.写这篇文章,是想把一些官文和资料中基础.重点拿出来,能总结出便于大家理解的话语.与大多数"wordcount"代码不同的是,并不会有如何运行第一storm代码等内容,只有在运行完代码后,发现需要明白:"知其然,并知其所以然". Storm是什么?为什么要用Storm?为什么不用Spark? 第一个问题,以下概念足以解释: Storm是基于数据流的实时处理系统,提供了大吞吐量的实

[转]hibernate缓存机制所有详解

以下文章来自http://www.blogjava.net/tbwshc/articles/380013.html Hibernate 所有缓存机制详解 hibernate提供的一级缓存 hibernate是一个线程对应一个session,一个线程可以看成一个用户.也就是说session级缓存(一级缓存)只能给一个线程用,别的线程用不了,一级缓存就是和线程绑定了. hibernate一级缓存生命周期很短,和session生命周期一样,一级缓存也称session级的缓存或事务级缓存.如果tb事务提

Storm批处理事务API详解

看此博文前,建议先查看 Storm批处理事务原理详解 为什么要进行批处理(Batch)? 逐个处理单个tuple,增加很多开销,如写库.输出结果频率过高 事务处理单个tuple效率比较低,因此storm中引入batch处理 批处理是一次性处理一批(batch)tuple,而事务则确保该批次要么全部处理成功,如果有处理失败的则全部不计,Storm会对失败的批次重新发送,且确保每个batch被且仅被处理一次 Spout有三种: 分别为: 1. ITransactionalSpout<T>,同Bas

Django文件上传机制用法详解(转)

Django文件上传机制用法详解 http://www.jbxue.com/article/24283.html 分享下Django文件上传机制的用法,包括基本上传文件的原理,以及如何处理上传文件的方法,需要的朋友参考下. 当Django处理上传一个文件时,文件数据被放在request.FILES中. 这个文档解释文件怎么样被存储在磁盘上或者内存中,怎样定制默认的行为. 一,基本文件上传考虑一个包含FileField的简单的表单: 复制代码 代码示例: from  django  import 

Java垃圾回收机制(GC)详解

Java垃圾回收机制(GC)详解 简介: 垃圾回收GC(Garbage Collection)是Java语言的核心技术之一,之前我们曾专门探讨过Java 7新增的垃圾回收器G1的新特性,但在JVM的内部运行机制上看,Java的垃圾回收原理与机制并未改变.垃圾收集的目的在于清除不再使用的对象.GC通过确定对象是否被活动对象引用来确定是否收集该对象.GC首先要判断该对象是否是时候可以收集.两种常用的方法是引用计数和对象引用遍历. 垃圾收集的算法分析: Java语言规范没有明确地说明JVM使用哪种垃圾

iptables详解加实战

iptables 组件是一种工具,也称为用户空间(userspace),它使插入.修改和除去信息包过滤表中的规则变得容易.分为四个表和五个链,其中表是按照对数据包的操作区分的,链是按照不同的Hook点来区分的,表和链实际上是netfilter的两个维度. 4个表:filter,nat,mangle,raw,默认表是filter(没有指定表的时候就是filter表).表的处理优先级:raw>mangle>nat>filter. filter:一般的过滤功能,如不-t指定表,则默认filte

Storm集群安装详解

Storm集群安装详解 storm有两种操作模式: 本地模式和远程模式. 本地模式:你可以在你的本地机器上开发测试你的topology, 一切都在你的本地机器上模拟出来; 远端模式:你提交的topology会在一个集群的机器上执行. 本文以Twitter Storm官方Wiki为基础,详细描述如何快速搭建一个Storm集群,其中,项目实践中遇到的问题及经验总结,在相应章节以“注意事项”的形式给出. 1.   Strom集群组件 Storm集群中包含两类节点:主控节点(Master Node)和工

android消息机制原理详解

android消息机制原理详解 因为之前使用的是CSDN默认的文本编辑器,而且也因为懒得学用MarkDown来写博客,所以排版上有一些问题.就上一篇写的设计模式之抽象工厂模式提出了这个问题(一个android群的群友提出来的,没有在评论里评论),所以以后的文章都用MarkDown来写了. 好了,言归正传,这篇文章我来介绍一下android消息机制的原理 Android消息机制概述 说到Android的消息机制,Android初级工程师(不包括那些初学者)肯定会想到Handler.是的,Andro