storm 消息确认机制及可靠性

worker进程死掉

在一个节点 kill work进程 比方 kill 2509  对work没有影响 由于会在其它节点又一次启动进程运行topology任务

supervisor进程死掉

supervisor进程kill掉 对work进程没有影响  由于他们是互相独立的!

nimbus进程死掉(存在HA的问题)

nimbus假设死掉 整个任务挂掉 存在单点故障问题!(hadoop2有ha!。!!。!

storm没有ha高可用)

节点宕机(和supervisor是一样的)

ack/fail消息确认机制

spout发送过来的数据  blot要确认数据是否收到及反馈给spout 以下给一个样例:

import java.util.Map;

import backtype.storm.Config;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.topology.base.BaseRichBolt;

import backtype.storm.topology.base.BaseRichSpout;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

import backtype.storm.utils.Utils;

public class ClusterStormTopologyAck {

public static class DataSourceSpout extends BaseRichSpout{

private Map conf;

private TopologyContext context;

private SpoutOutputCollector collector;

/**

* 在本实例执行的时候被调用一次

*/

public void open(Map conf, TopologyContext context,

SpoutOutputCollector collector) {

this.conf = conf;

this.context = context;

this.collector = collector;

}

/**

* 死循环调用 心跳

*/

int i=0;

public void nextTuple() {

System.err.println("spout :"+i);

//values 就是value的list列表

//(new Values(i++),i-1);发送的值及key一一相应

this.collector.emit(new Values(i++),i-1);

Utils.sleep(1000);

}

/**

* 声明字段名称

*/

public void declareOutputFields(OutputFieldsDeclarer declarer) {

//fields就是field的列表

declarer.declare(new Fields("num"));

}

@Override

public void ack(Object msgId) {

System.out.println("运行ACK:"+msgId);

}

@Override

public void fail(Object msgId) {

System.out.println("运行FAIL:"+msgId);

//TODO--

//this.collector.emit(tuple);

}

}

public static class SumBolt extends BaseRichBolt{

private Map stormConf;

private TopologyContext context;

private OutputCollector collector;

/**

* 仅仅会被调用一次

*/

public void prepare(Map stormConf, TopologyContext context,

OutputCollector collector) {

this.stormConf = stormConf;

this.context = context;

this.collector = collector;

}

/**

* 死循环,循环的获取上一级发送过来的数据(spout/bolt)

*/

int sum = 0;

public void execute(Tuple input) {

//input.getInteger(0);

Integer count = input.getIntegerByField("num");

try{

//--------

this.collector.ack(input);

}catch(Exception e){

this.collector.fail(input);

}

/*if(count>10 && count<20){

this.collector.fail(input);

}{

this.collector.ack(input);

}*/

}

public void declareOutputFields(OutputFieldsDeclarer declarer) {

}

}

public static void main(String[] args) {

TopologyBuilder builder = new TopologyBuilder();

String SPOUT_NAME = DataSourceSpout.class.getSimpleName();

String BOLT_NAME = SumBolt.class.getSimpleName();

builder.setSpout(SPOUT_NAME, new DataSourceSpout());

builder.setBolt(BOLT_NAME, new SumBolt()).shuffleGrouping(SPOUT_NAME);

Config config = new Config();

try {

StormSubmitter.submitTopology(ClusterStormTopologyAck.class.getSimpleName(), config, builder.createTopology());

} catch (AlreadyAliveException e) {

e.printStackTrace();

} catch (InvalidTopologyException e) {

e.printStackTrace();

}

}

}

时间: 2024-09-30 14:44:56

storm 消息确认机制及可靠性的相关文章

RabbitMQ 消息确认机制

消息确认机制 在之前异常处理部分就已经写了,对于consumer的异常退出导致消息丢失,可以时候consumer的消息确认机制.重复的就不说了,这里说一些不一样的. consumer的消息确认机制 当一个消费者收到一个快递,但是这个包裹是破损的,这时候一般会有以下选择 拒收快递,让快递员把快递寄回. (如果有多个consumer可能这条消息会到其它的consumer中,如果只有一个,那么下次获取还是可以拿到) 签收快递,然后偷偷的扔了(钱多任性) 拒收快递,联系商家再给我补发一个 下面是具体的方

(转)RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message

RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message

RabbitMQ之消息确认机制(事务+Confirm)

概述 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎

activemq的消息确认机制ACK

一.简介 消息消费者有没有接收到消息,需要有一种机制让消息提供者知道,这个机制就是消息确认机制. ACK(Acknowledgement)即确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符.表示发来的数据已确认接收无误. 二.ACK_MODE有几类 我们在开发JMS应用程序的时候,会经常使用到上述ACK_MODE,其中"INDIVIDUAL_ACKNOWLEDGE "只有ActiveMQ支持,当然开发者也可以使用它. ACK_MODE描述了Consumer与broker确认

RabbitMQ (十二) 消息确认机制 - 发布者确认

消费者确认解决的问题是确认消息是否被消费者"成功消费". 它有个前提条件,那就是生产者发布的消息已经"成功"发送出去了. 因此还需要一个机制来告诉生产者,你发送的消息真的"成功"发送了. 在标准的AMQP 0-9-1,保证消息不会丢失的唯一方法是使用事务:在通道上开启事务,发布消息,提交事务.但是事务是非常重量级的,它使得RabbitMQ的吞吐量降低250倍.为了解决这个问题,RabbitMQ 引入了 发布者确认(Publisher Confir

activemq 消息阻塞优化和消息确认机制优化

一.消息阻塞优化 1.activemq消费者在从待消费队列中获取消息是会先进行预读取,默认是1000条(prefetch=1000).这样很容易造成消息积压. 2.可以通过设置prefetch的默认值来调整预读取条数,java代码如下 //设置预读取为1ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy();p.setQueuePrefetch(1);//创建一个链接工厂connectionFactory = new ActiveMQCon

rabbitMQ基础知识--消息确认机制

一:确认种类 RabbitMQ的消息确认有两种. 一种是消息发送确认.这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递.发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列. 第二种是消费接收确认.这种是确认消费者是否成功消费了队列中的消息. 具体建议参考:https://www.cnblogs.com/nizuimeiabc1/p/9397326.html 这里我们重点研究下接收确认的情况. 为了保证消息从队列可靠的到达消费者,RabbitMQ提供了

RabbitMQ消息确认机制—消息发送确认和 消息接收确认

/** * RabbitMQ消息确认机制 * 关于rabbit的生产和消费方的一些实用的操作: * producer的confirm和consumer的ack,这两者使用的模式都是用来保证数据完整性,防止数据丢失 */ /** * producer的confirm模式 * 业务场景描述: * 促销系统在做活动前,需要给用户的手机发送一条活动内容短信希望用户来参加, * 因为用户量有点大,所以通过往短信mq中插入数据方式,让短信服务来消费mq发短信: * 此时插入mq消息的服务为了保证给所有用户发