RabbitMQ--work queues(二)

封装一个task到一个message,并发送到queue。consumer会去除task并执行这个task。

这里我们简化了操作,发送消息到队列中,consumer取出消息计算里面‘.‘号有几个就sleep几秒。

task.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
channel = connection.channel()

channel.queue_declare(queue=‘task_queue‘, durable=True)

message = ‘ ‘.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange=‘‘,
                      routing_key=‘task_queue‘,
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print(" [x] Sent %r" % message)
connection.close()

work.py

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
channel = connection.channel()

channel.queue_declare(queue=‘task_queue‘, durable=True)
print(‘ [*] Waiting for messages. To exit press CTRL+C‘)

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b‘.‘))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue=‘task_queue‘)

channel.start_consuming()

代码解释

channel.queue_declare(queue=‘task_queue‘, durable=True)
告诉rabbitmq永不丢失queue,即使rabbitmq server挂掉。
def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count(‘.‘) )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue=‘hello‘)

在先前例子中,如果consumer突然死掉,回丢失掉正在处理的信息。如何避免呢?如果consumer死掉,怎么将这条信息发送的其他consumer呢?就是使用上面代码
channel.basic_qos(prefetch_count=1)

This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don‘t dispatch a new message to a worker until it has processed and acknowledged the previous one.Instead, it will dispatch it to the next worker that is not still busy.
直到消费完这个消息再给consumer派送新的消息,如果没消费完,将消息发送给另一个consumer.
时间: 2024-08-27 04:44:59

RabbitMQ--work queues(二)的相关文章

PHP之RABBITMQ安装篇(二)-WINDOWS下安装

PHP之RABBITMQ安装篇(二)-WINDOWS下安装 AMQP扩展安装 在PHP上安装RabbitMQ之前,先安装PHP的扩展amqp,在安装amqp之前,先查看自己的PHP版本 首先根据PHP的版本选择amqp的版本,再次要下载稳定版本的amqp.我的PHP是5.6.25的,所以选择1.4.0版本的amqp.下载地址:https://pecl.php.net/package/amqp 然后根据PHP的版本,线程安全是否激活,多少位的,来选择下载哪个版本:我的PHP是5.6版本的,线程安全

RabbitMQ Work Queues(工作队列)

RabbitMQ Work Queues(工作队列) 工作队列模式为一个生产者对应多个消费者,但是只有一个消费者获得消息,即一个队列被多个消费者监听,但一条消息只能被其中的一个消费者获取 代码如下: 生产者代码: public class WorkSend { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //获取连接

RabbitMQ学习(二).NET Client之Work Queues

2 Work queues Distributing tasks among workers Python | Java | Ruby | PHP| C# 转载请注明出处:jiq?钦's technical Blog Work Queues (using the .NET Client) 前面已经介绍过了如何编写程序去发送消息到命名队列,以及从命名队列接收消息. 在这个部分我们将创建一个工作队列(Work Queue),用于将耗时任务(time-consuming tasks)分发给多个工作者(

RabbitMQ指南之二:工作队列(Work Queues)

在上一章的指南中,我们写了一个命名队列:生产者往该命名队列发送消息.消费从从该命名队列中消费消息.在本章中,我们将创建一个工作队列,用于在多个工作者之间分配耗时的任务.工作队列(即任务队列)的主要思想是避免立即执行那些需要等他们执行完成的资源密集型任务.相反,我们将任务安排在稍后完成.我们将任务封装为消息并将其发送到队列,后台运行的工作进程将取出任务并执行完成.如果你启动了多个工作者,这些任务将在多个工作者之间分享. 这个概念也即我们说的异步,在项目中,有时候一个简单的Web请求,后台要做一系统

RabbitMQ官方教程二 Work Queues(GOLANG语言实现)

在第一个教程中,我们编写了程序来发送和接收来自命名队列的消息. 在这一部分中,我们将创建一个工作队列,该队列将用于在多个worker之间分配耗时的任务. 工作队列(又称任务队列)的主要思路是避免立即执行资源密集型任务(比如耗时较长的邮件发送.文件处理等),而不得不等待它完成. 相反,我们安排任务在以后完成(异步完成). 我们将任务封装为消息并将其发送到队列. 在后台运行的工作进程将获取任务并最终执行作业. 当您运行许多worker时,他们将共享任务. 这个概念在Web应用程序中特别有用,因为在W

RabbitMQ --- Work Queues(工作队列)

目录 RabbitMQ --- Hello Mr.Tua 前言 Work Queues 即工作队列,它表示一个 Producer 对应多个 Consumer,包括两种分发模式:轮循分发(Round-robin)和公平分发(Fair dispatch).旨在为了避免立即执行任务时出现占用很多资源和时间却又必须等待完成的现象. 原理分析: Producer 把工作任务转化为消息发送给队列,当后台有一个 Consumer 进程在运行时,它会不间断地从队列中取出消息来执行:当后台有多个 Consumer

RabbitMQ系列之二:work queue

server端代码: 1 package com.example.workqueue; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8 import com.rabbitmq.client.MessagePropertie

RabbitMQ学习(二)工作队列

1.工作队列(Work Queue)又叫任务队列(Task Queue)指将任务分发个多个消费者. 2.实际操作: 这里使用一个生产者产生多条数据提供给3个消费者 生产者代码: public class Producter { //队列名称 private final static String QUEUE_NAME = "Work_Queue"; public static void main(String[] args) throws IOException, TimeoutExc

RabbitMQ指南(C#)(二)工作队列

上一节我们实现了向指定的队列发送和接收消息.这一节,我们主要讲工作队列,用于在多个消费者之间分配置实时任务. 工作队列方式主要是为了防止在执行一个耗费资源的任务时,要等待其结束才能处理其它事情.我们将任务的执行延迟,将其封装成一个消息,然后发送给一个列队.后台再运行一个程序从队列里取出消息,然后执行任务.如果有多个消费者,还可以分享任务. 对于Web应用程序来说,这样就可以使用Http的短请求来处理复杂的业务. 准备 我们发送一个字符串来代表复杂的任务,然后用Thread.Sleep()来模拟耗

rabbitMQ学习笔记(二) 简单的发送与接收消息 HelloWorld

首先要下载rabbitmq的javaClient库,然后加入到项目中,下载地址为:http://www.rabbitmq.com/releases/rabbitmq-java-client/v3.1.5/rabbitmq-java-client-bin-3.1.5.zip 1.发送消息 发送消息首先要获取与rabbitmq-server的连接,然后从渠道(chann)中指定的queue发送消息 , 不能定义两个queue名字相同,但属性不同 示例: Sender01.java 1 package