RabbitMQ如何应对消费出现异常的情况

1,生产者

new_task.py

import pika

if __name__ == ‘__main__‘:
    connection=pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel=connection.channel()
    channel.queue_declare("Kadima")
    message="You are awsome!"
    for i in range(0,100):#循环100次发送消息
        channel.basic_publish(exchange="",routing_key=‘Kadima‘,body=message+" "+str(i))
    print "sending ",message

2,多个消费者

消费者1,work.py

#-*- coding: UTF-8 -*-
import time
import pika
import sys

__author__ = ‘Yue‘

var=0

def callback(ch, method, properties, body):
    # temp=var+1 #这里有趣的是不能写成var+=1或者var=var+1,要知道为什么,就需要清楚“Python全局变量和局部变量”
    #global var
    #var+=1
    #if var==20:
        #sys.exit()
    print "1 received %r" % (body,)
    time.sleep(body.count("."))
    print "Done"

if __name__ == ‘__main__‘:
    connection=pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel=connection.channel()
    channel.queue_declare("Kadima")
    channel.basic_consume(callback,queue="Kadima",no_ack=True)
    print ‘ [1] Waiting for messages‘
    channel.start_consuming()

work2.py

import time
import pika

__author__ = ‘Yue‘

def callback(ch, method, properties, body):
    print "2 received %r" % (body,)
    time.sleep(body.count("."))
    print "Done"

if __name__ == ‘__main__‘:
    connection=pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel=connection.channel()
    channel.queue_declare("Kadima")
    channel.basic_consume(callback,queue="Kadima",no_ack=True)
    print ‘ [2] Waiting for messages‘
    channel.start_consuming()

3,执行work,work2,new_task

我的启动顺序是work,work2,从执行结果可以看出,RabbitMQ是将task分别依次分发给按照时间顺序注册的work上的,

也就是,task1,task2,task3,task4,它会将task1,task3分发给work,另外两个分发给task3,task4

接下来,有趣的事情就要发生了:

当把work.py中的callback函数的注释内容打开后(起作用是让work处理19个task,便退出程序),MQ并没有将本该分发给work的task分发给work2,那到底去哪里了呢?我暂时假设为work退出时并没有告诉MQ他不干了(他出现异常啦),MQ还是会将task分发给work

4,那没有执行完的任务怎么办呢?

Message acknowledgment :ack默认是打开的

修改work代码如下

#-*- coding: UTF-8 -*-
import time
import pika
import sys

__author__ = ‘Yue‘

var=0

def callback(ch, method, properties, body):
    # temp=var+1 #这里有趣的是不能写成var+=1或者var=var+1,要知道为什么,就需要清楚“Python全局变量和局部变量”
    global var
    var+=1
    if var==20:
        sys.exit()
    print "1 received %r" % (body,)
    time.sleep(body.count("."))
    print "Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

if __name__ == ‘__main__‘:
    connection=pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel=connection.channel()
    channel.queue_declare("Kadima")
    channel.basic_consume(callback,queue="Kadima",no_ack=True)
    print ‘ [1] Waiting for messages‘
    channel.start_consuming()

work只执行到20,但是work2并未从22开始全部执行,而是从37开始MQ,这老实说,我也有点搞不懂了,以后想清楚了再补充

时间: 2024-10-05 08:26:30

RabbitMQ如何应对消费出现异常的情况的相关文章

应用flexbox布局的页面在手机版chrome浏览器出现文字省略号显示异常的情况

描述: 手机端Chrome浏览器(安卓)测试文字省略号显示效果  出现不支持的情况 如图所示: 原因:文字部分的直接父元素 采用了flex布局:(flex:1);祖父元素采用了flexbox布局 解决方法: 给直接父元素 设置 overflow:hidden;

nginx+fastcgi php 使用file_get_contents、curl、fopen读取localhost本站点.php异常的情况

原文:http://www.oicto.com/nginx_fastcgi_php_file_get_contents/ 参考:http://os.51cto.com/art/201408/449205.htm 这两天一直在搞windows下nginx+fastcgi的file_get_contents请求.我想,很多同学都遇到当file_get_contents请求外网的http/https的php文件时毫无压力,比如echo file_get_contents(‘http://www.bai

如何应对eva存储崩溃的情况?

一.故障描述用户是持有一台HP EVA 4100的存储,23块容量为300G的光纤硬盘,上层映射给一台装有windows系统的服务器上,开始发现有三块硬盘亮黄灯,这个时候存储还能正常使用,之后用户就开始联系运维商更换硬盘,在更换硬盘的过程中,又出现一块硬盘亮黄灯离线,这个时候存储就整个崩溃无法使用了,用户对4块硬盘进行检测了下,发现4块硬盘都出现磁头和盘片损坏的情况,只能从剩余的19块硬盘上来进行恢复数据.二.备份数据考虑到数据的安全性以及可还原性,在做数据恢复之前需要对所有源数据做备份,以防万

tp5 ThinkPhp5 自定义异常处理类

在项目的开发过程中异常抛出尤为重要不仅能够做出友好提示帮助掩盖我们伟大的程序员们尴尬的瞬间,还能做到提示开发人员代码白编写的错误,下面进行自定义异常抛出类,纯属个人理解,希望大家指正 首先在框架中我们可以自定义目录结构用来做异常类的存储位置例如文件目录为以下红框中 定义目录结构后阐述一下我理解的异常类的工作流程,见名知意既然是异常抛出那么只有在代码出现问题的时候或者是逻辑出现异常的时候会进行抛出,那么我认为这就会出现两种情况, 第一种是用户传输的参数或者用户操作的流程有问题这种情况下并不是我们的

C#多线程技术提高RabbitMQ消费吞吐率(二)

一.课程介绍 本次分享课程属于<C#高级编程实战技能开发宝典课程系列>中的第二部分,阿笨后续会计划将实际项目中的一些比较实用的关于C#高级编程的技巧分享出来给大家进行学习,不断的收集.整理和完善此系列课程!本次高级系列课程适合人群如下: 1.有一定的NET开发基础并对RabbitMQ技术有一定了解和认识. 2.喜欢阿笨的干货分享课程的童鞋们. 希望大家在选择阿笨的 C#高级编程实战技能开发宝典课程系列的时候,根据自身的情况进行选择,由于本次课程不是零基础教学课程系列,所以说.NET基础差的到了

RabbitMQ消费方式汇总

在学习本章节前,请先学习之前的章节:Java访问RabbitMQ:https://www.cnblogs.com/duanjt/p/10057330.htmlRabbitMQ消息发布时的权衡:https://www.cnblogs.com/duanjt/p/10075308.html 一.推送Consume 前面我们使用到的都是这种模式,注册一个消费者后,RabbitMQ会在消息可用时,自动将消息进行推送给消费者.这种方式效率最高最及时.核心代码如下: // 接收消息,第二个参数表示是否自动应答

如何分析账户数据异常情况,从而提高转化

百度竞价开户前面翱翔竞价软件的小编讲解过数据异常的情况如何进行解决的分析方法.主要的方式就是通过软件来进行调整的,而用软件就不要求你有多少的技术,只要你有一定的思维方式就行了.那么如果从技术上分析,如何进行呢?下面翱翔竞价软件就来给大家讲解一下. 从技术上分析的话,这就要求我们对excel这样的工具有熟练的应用了.平时我们统计账户的数据汇总之类的工作都需要用到这个工具.基本上我们需要做到实时了解账户中各项数据状态,可以全面掌控.这是一项非常有难度的工作,需要多练习才行. 如果说你的账户中关键词创

Java异常-可能会出现异常丢失的情况&amp;finally

finally的两种特性: 对于没有垃圾回收和析构函数自动调用机制的语言来说,Java中的finally非常重要.它能使程序员保证: 1.无论异常是否被抛出,finally子句总能被执行.这个特性我们可以用来解决以下问题:Java的异常不允许我们回到异常抛出的地点时,该如何应对?把try块放在循环里,建立了一个"程序继续执行之前必须要达到"的条件.还可以加入一个static类型的计数器之类的装置,使循环在放弃之前能尝试一定的次数,这将使程序的健壮性更上一个台阶. 2.无论try块发生了

更加强健的线程模型,解决线程卡死,退出异常情况

线程模型 1 package net.sz; 2 3 import java.util.Random; 4 import java.util.concurrent.ConcurrentLinkedQueue; 5 import org.apache.log4j.Logger; 6 7 /** 8 * 9 * <br> 10 * author 失足程序员<br> 11 * mail [email protected]<br> 12 * phone 13882122019&