RabbitMQ(一) -- Work Queues

RabbitMQ(一) -- Work Queues

  RabbitMQ使用Work Queues的主要目的是为了避免资源使用密集的任务,它不同于定时任务处理的方式,而是把任务封装为消息添加到队列中。而消息队列正是共享于多个工作者中使用,它们可以随意pop出数据进行处理。

消息的持久化 Message durability

为了保证`rabbitmq`意外重启等原因造成的消息丢失,通过设置消息的durable来实现数据的持久化,但是需要生产者和消费者同时设置持久化才能生效。

需要注意的是,`rabbitmq`并不允许更改已经创建的消息队列的属性,假如之前已经创建过非持久化的hello消息队列,那么会返回一个错误信息。

设置消息队列的可持久化属性(第二个参数):

channel.queue_declare(queue=‘hello‘, durable=True)

在消息发送时,需要指定`delivery_mode`来实现消息持久化:

channel.basic_publish(exchange=‘‘, routing_key="task_queue", body=message, properties=pika.BasicProperties(delivery_mode = 2, # make message persistent))

平均分配 Fair dispatch

`rabbitmq`实现了消息均分的功能,通过设置`basic.qos`方法的`prefetch_count`来实现。它会告诉`rabbitmq`的生产者不要给一个消费者分配过多的任务,也就是说不要在消费者处理完成已经接收到的任务之前分配新的任务。

channel.basic_qos(prefetch_count=1)

其中prefetch_count为可以接受处理的任务个数,如果未达到上限rabbitmq会继续向消费者推送任务。

实例

生产者

#!/usr/bin/env python
# coding=utf-8

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel()

channel.queue_declare(queue=‘task_queue‘, durable=True)

for i in range(100):
    message = str(i) + ‘ Hello World!‘
    channel.basic_publish(exchange=‘‘,
    routing_key=‘task_queue‘,
    body=message,
    properties=pika.BasicProperties(delivery_mode = 2, # make message persistent))
    print " [x] Sent %r" % (message,)
    time.sleep(1)
connection.close()

消费者

#!/usr/bin/env python
# coding=utf-8

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel()

channel.queue_declare(queue=‘task_queue‘, durable=True)
print ‘ [*] Waiting for messages. To exit press CTRL+C‘

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep(2)
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue=‘task_queue‘)

channel.start_consuming()
时间: 2024-11-08 23:05:58

RabbitMQ(一) -- Work Queues的相关文章

RabbitMQ :VHost,Exchanges, Queues,Bindings and Channels

和RabbitMQ这个项目的缘分好奇怪,很长一段时间内是只关注源代码,真的是Erlang开源项目中的典范;现在要在项目中应用RabbitMQ,从新的视角切入,全新的感觉.仿佛旧情人换了新衣,虽是熟稔却有不曾领略的风情. RabbitMQ提供了一整套机制来处理消息的发送,接收,容错,管理,上一篇文章中我提到了一篇Rabbits and warrens的文章,是一篇非常棒的入门文章,但是里面忽略了不少细节,我沿着RabbitMQ in Action重新梳理了一遍,笔记于此,备忘. Exchanges

2. RabbitMQ 之Work Queues (工作队列)

在上一篇揭开RabbitMQ的神秘面纱一文中,我们编写了程序来发送和接收来自命名队列的消息. 本篇我们将会创建一个 Work Queue(工作队列) 来使用分发任务在多个任务中. 前提:本教程假定RabbitMQ 已在标准端口(15672)上的localhost上安装并运行.如果您使用不同的主机,端口或凭据,则需要调整连接设置. 1. Work Queue 工作队列 工作队列(又称:任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成. 相反,我们安排任务稍后完成.我们将任务

【转】Spring websocket 使用

http://docs.spring.io/spring/docs/current/spring-framework-reference/html/websocket.html https://spring.io/guides/gs/messaging-stomp-websocket/ https://github.com/rstoyanchev/spring-websocket-portfolio 项目中用到了消息的实时推送,查资料后用到了Spring websocket,找了很多资料,还是感

分布式队列编程:模型、实战

介绍 作为一种基础的抽象数据结构,队列被广泛应用在各类编程中.大数据时代对跨进程.跨机器的通讯提出了更高的要求,和以往相比,分布式队列编程的运用几乎已无处不在.但是,这种常见的基础性的事物往往容易被忽视,使用者往往会忽视两点: 使用分布式队列的时候,没有意识到它是队列. 有具体需求的时候,忘记了分布式队列的存在. 文章首先从最基础的需求出发,详细剖析分布式队列编程模型的需求来源.定义.结构以及其变化多样性.通过这一部分的讲解,作者期望能在两方面帮助读者:一方面,提供一个系统性的思考方法,使读者能

Mirantis OpenStack HA

Mysql使用Galera做Active/Active集群,同时使用Pacemaker,因为Galera mysql用到了领导机选举机制quorum,所以控制节点至少三个 RabbitMQ使用mirrored queues,运行在Active/Active模式 有状态服务如neutron agents使用Pacemaker做Active/Passive部署 无状态服务前端加HAProxy,所以无状态服务并没有部署在计算节点上 Controller Node(至少3节点)的HA部署图如下,每个co

RabbitMQ学习(二).NET Client之Work Queues

2 Work queues Distributing tasks among workers Python | Java | Ruby | PHP| C# 转载请注明出处:jiq?钦's technical Blog Work Queues (using the .NET Client) 前面已经介绍过了如何编写程序去发送消息到命名队列,以及从命名队列接收消息. 在这个部分我们将创建一个工作队列(Work Queue),用于将耗时任务(time-consuming tasks)分发给多个工作者(

RabbitMQ --- Work Queues(工作队列)

目录 RabbitMQ --- Hello Mr.Tua 前言 Work Queues 即工作队列,它表示一个 Producer 对应多个 Consumer,包括两种分发模式:轮循分发(Round-robin)和公平分发(Fair dispatch).旨在为了避免立即执行任务时出现占用很多资源和时间却又必须等待完成的现象. 原理分析: Producer 把工作任务转化为消息发送给队列,当后台有一个 Consumer 进程在运行时,它会不间断地从队列中取出消息来执行:当后台有多个 Consumer

RabbitMQ (消息队列)专题学习03 Work Queues(工作队列)

一.概述 工作队列(Work queues) (使用Java客户端) 在前面的专题学习中,我们使用Java语言实现了一个简单的从名为"hello"的队列中发送和接收消息的程序,在这部内容中我们将创建一个工作队列被用来分配定时消息任务,而且通过多个接收者(工作者)实现. 工作队列(又名任务队列),主要的思想是为了避免立即做一个资源密集型的任务(多消息同时密集发送),不必等待它完成,当运行许多工作者的让任务都在她们之间共享. 它在web应用中是非常有用的,因为在很短的时间内http请求窗口

RabbitMQ官方中文入门教程(PHP版) 第二部分:工作队列(Work queues)

工作队列 在第一篇教程中,我们已经写了一个从已知队列中发送和获取消息的程序.在这篇教程中,我们将创建一个工作队列(Work Queue),它会发送一些耗时的任务给多个工作者(Works ). 工作队列(又称:任务队列——Task Queues)是为了避免等待一些占用大量资源.时间的操作.当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理.当你运行多个工作者(workers),任务就会在它们之间共享. 这个概念在网络应用中是非常有用的,它可