storm的可靠性

消息确认机制:

在数据发送的过程中可能会数据丢失导致没能接收到,spout有个超时时间(默认是30S),如果30S过去了还是没有接收到数据,也认为是处理失败。

运行结果都是处理成功

参考代码StormTopologyAcker.java

package yehua.storm;

import java.util.Map;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

public class StormTopologyAcker {

    public static class MySpout extends BaseRichSpout{
        private Map conf;
        private TopologyContext context;
        private SpoutOutputCollector collector;
        @Override
        public void open(Map conf, TopologyContext context,
                SpoutOutputCollector collector) {
            this.conf = conf;
            this.collector = collector;
            this.context = context;
        }

        int num = 0;
        @Override
        public void nextTuple() {
            num++;
            System.out.println("spout:"+num);
            int messageid = num;
            //开启消息确认机制,就是在发送数据的时候发送一个messageid,一般情况下,messageid可以理解为mysql数据里面的主键id字段
            //要保证messageid和tuple之间有一个唯一的对应关系,这个关系需要程序员自己维护
            this.collector.emit(new Values(num),messageid);
            Utils.sleep(1000);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("num"));
        }

        @Override
        public void ack(Object msgId) {
            System.out.println("处理成功!"+msgId);
        }

        @Override
        public void fail(Object msgId) {
            System.out.println("处理失败!"+msgId);
            //TODO  可以吧这个数据单独记录下来
        }

    }

    public static class MyBolt extends BaseRichBolt{

        private Map stormConf;
        private TopologyContext context;
        private OutputCollector collector;
        @Override
        public void prepare(Map stormConf, TopologyContext context,
                OutputCollector collector) {
            this.stormConf = stormConf;
            this.context = context;
            this.collector = collector;
        }

        int sum = 0;
        @Override
        public void execute(Tuple input) {
            try{
                Integer num = input.getIntegerByField("num");
                sum += num;
                System.out.println("sum="+sum);
                this.collector.ack(input);
            }catch(Exception e){
                this.collector.fail(input);
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }

    }

    public static void main(String[] args) {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        String spout_id = MySpout.class.getSimpleName();
        String bolt_id = MyBolt.class.getSimpleName();

        topologyBuilder.setSpout(spout_id, new MySpout());
        topologyBuilder.setBolt(bolt_id, new MyBolt()).shuffleGrouping(spout_id);

        Config config = new Config();
        config.setMaxSpoutPending(1000);//如果设置了这个参数,必须要保证开启了acker机制才有效
        String topology_name = StormTopologyAcker.class.getSimpleName();
        if(args.length==0){
            //在本地运行
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology());
        }else{
            //在集群运行
            try {
                StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            } catch (AuthorizationException e) {
                e.printStackTrace();
            }
        }

    }

}
时间: 2024-08-07 22:54:39

storm的可靠性的相关文章

Storm消息可靠性的保障机制

参考[并发编程网]的Storm官方教程翻译 以WordCountToPology为例: // 构造Topology TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SPOUT_ID,new SentenceSpout(), 2)// 指定 Spout ,2 指的是使用2个executor来运行spout .setNumTasks(4);//指定tasks的数量 // 指定 SentenceSpout 向Split

Storm系列三: Storm消息可靠性保障

Storm系列三: Storm消息可靠性保障 在上一篇 Storm系列二: Storm拓扑设计 中我们已经设计了一个稍微复杂一点的拓扑. 而本篇就是在上一篇的基础上再做出一定的调整. 在这里先大概提一下上一篇的业务逻辑, 我们会不断收到来自前端的消息,消息包含消息的发送时间,消息内容,结束标识, 消息的发送者, SessionId等其他信息, 我们需要做的事情是当接收到消息之后,根据SessionId判断是否属于同一消息, 如果是的话将内容拼接, 如果结束标识为 true, 表示会话已结束,则存

4. Storm可靠性

storm高可靠性: storm有一种机制可以保证从spout发出的每个tuple都会被完全处理 可靠性机制: 1.节点故障迁移 当一个节点上的worker出现问题是,会自动切到其他节点: 2.消息完整发送 一个消息(tuple)从spout发送出来,可能会导致成百上千的消息基于此消息被创建 "单词统计"的例子: storm任务从数据源每次读取一个完整的英文句子:将这个句子分解为独立的单词,最后,实时的输出每个单词以及它出现过的次数. 每个从spout发送出来的消息(每个英文句子)都会

apache Storm学习之三-消息可靠性

4.1 简介 storm可以确保spout发送出来的每个消息都会被完整的处理.本章将会描述storm体系是如何达到这个目标的,并将会详述开发者应该如何使用storm的这些机制来实现数据的可靠处理. 4.2 理解消息被完整处理 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentences", new KestrelSpout("kestrel.backtype.com", 22

Storm中的可靠性

我们知道Storm有一个很重要的特性,那就是Storm API能够保证它的一个Tuple能够被完全处理,这一点尤为重要,其实storm中的可靠性是由spout和bolt组件共同完成的,下面就从spout和bolt两个方便给大家介绍一下storm中的可靠性,最后会给出一个实现了可靠性的例子. 1.Spout的可靠性保证 在Storm中,消息处理可靠性从Spout开始的.storm为了保证数据能正确的被处理, 对于spout产生的每一个tuple,storm都能够进行跟踪,这里面涉及到了ack/fa

Storm 官方文档翻译 --- 消息的可靠性保障

消息的可靠性保障 Storm 能够保证每一个由 Spout 发送的消息都能够得到完整地处理.本文详细解释了 Storm 如何实现这种保障机制,以及作为用户如何使用好 Storm 的可靠性机制. 消息的“完整性处理”是什么意思 一个从 spout 中发送出的 tuple 会产生上千个基于它创建的 tuples.例如,有这样一个 word-count 拓扑: TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sent

Storm可靠性实例解析——ack机制

对于Storm,它有一个很重要的特性:“Guarantee no data loss” ——可靠性 很显然,要做到这个特性,必须要track每个data的去向和结果.Storm是如何做到的呢——acker机制. 先概括下acker所参与的工作流程: Spout创建一个新的Tuple时,会发一个消息通知acker去跟踪: Bolt在处理Tuple成功或失败后,也会发一个消息通知acker: acker会找到发射该Tuple的Spout,回调其ack或fail方法. 我们说RichBolt和Basi

storm之10:可靠性

storm blueprint: P20 从零开始学storm  : P40 可靠性:spout发送的消息会被拓扑树上的所有节点ack,否则会一直重发. 完整的可靠性示例请参考storm blueprint的chapter1 v4代码,或者P22. 关键步骤如下: (一)spout 1.创建一个map,用于记录已经发送的tuple的id与内容,此为待确认的tuple列表. private ConcurrentHashMap<UUID,Values> pending; 2.发送tuple时,加上

Storm入门学习随记

推荐慕课网视频:http://www.imooc.com/video/10055 ====Storm的起源. Storm是开源的.分布式.流式计算系统 什么是分布式呢?就是将一个任务拆解给多个计算机去执行,让许多机器共通完成同一个任务, 把这个多机的细节给屏蔽,对外提供同一个接口.同一个服务,这样的系统就是分布式系统. 在多年以前并没有非常范用的分布式系统,即使存在,也都是限定在指定的领域, 当然,也有人尝试从中提取出共通的部分,发明一个通用的分布式系统,但是都没有很好的结果. 后来,Googl