RabbitMQ之工作队列

工作队列

工作队列(又称:任务队列Task Queues)是为了避免等待一些占用大量资源、时间的操作,当我们把任务Task当做消息发送队列中,一个运行在后台的工作者worker进程就会取出任务然后处理。

当有多个works,任务在它们之间共享

创建任务

创建任务的new_task.py

#!/usr/bin/env python
#-*- coding:utf8 -*-
import sys
import pika
import logging

logging.basicConfig(format=‘%(levlename)s:%(message)s‘,level=logging.CRITICAL)

def send():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
    channel = connection.channel()

    #创建一个队列,并且设置队列可以持久化,durable=True
    channel.queue_declare(queue=‘task_queue‘,durable=True)

    #将输入参数按照.号串联起来,后续在消费的时候每个点都sleep一秒钟
    if len(sys.argv) == 1:
        message = "Hello World!"
    else:
        message = ‘.‘.join(sys.argv[1])

    #向队列task_queue发送消息,routing_key指定,与queue_declare中对应
    #发送消息为message,对应参数body
    #设置消息持久化
    channel.basic_publish(exchange=‘‘,
            routing_key = ‘task_queue‘,
            body = message,
            properties = pika.BasicProperties(delivery_mode=2),
            )
    print " [x] Send %r" % (message,)

    connection.close()

if __name__ == ‘__main__‘:
    send()

需要重点说明:

1、消息队列持久化,设置channel.queue_declare中durable参数为True,这样在RabbitMQ-server重启之后,消息不会丢失

2、消息持久化,设置delivery_mode等于2

消息持久化

将消息持久化并不能完全保证不会丢失,以上代码只是告诉RabbitMQ要把消息存放到磁盘上,但是从RabbitMQ收到消息到保存之间还存在很小的时间间隔。RabbitMQ并不是所有的消息都使用fsync(2),可能保存到缓存中,并不一定会写到磁盘中。并不能保证真正的持久化,但可以应付简单的队列工作。

如果需要持久化,需要修改代码支持事务。

执行任务

执行任务,通过work.py来操作

#!/usr/bin/env python
#-*-coding:utf8 -*-

import pika
import time

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

    #每遇到一个点,就sleep一秒钟,模拟长时间任务
    sleep_time = body.count(‘.‘)*100
    print "slepp_time=%d" % sleep_time

    time.sleep(body.count(‘.‘)*100)

    print " [x] Done"

    #任务执行完成之后返回确认包
    #这样对于没有返回确认包的消息就不会丢失
    ch.basic_ack(delivery_tag= method.delivery_tag)

def work():
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
    channel = connection.channel()

    channel.queue_declare(queue=‘task_queue‘,durable=True)

    print "[*] Wating for messages.To exit press CTRL+C"

    #保证消息的公平分发,设置prefetch告诉同一时刻,不要发送超过1条消息给一个工作者
    #直到它已经处理上一条消息并且做出响应
    channel.basic_qos(prefetch_count=1)

    #开始消费消息
    channel.basic_consume(callback,queue=‘task_queue‘)

    #循环消费
    #channel.start_consuming()

if __name__ == ‘__main__‘:
    try:
        work()
    except KeyboardInterrupt,e:
        print "Exit"

重点说明

1、使用工作队列的一个好处就是能够并行的处理队列。如果任务堆积,只需要添加更多的工作者work即可

2、对于多个work,RabbitMQ会按照顺序把消息发送给每个消费者,这种方式为轮询(round-robin)

3、消息响应:如果一个work挂掉,上面代码实现将这个消息发送给其他work,而不是丢弃。

因此需要消息响应机制,每个work处理完成任务的时候,会发送一个ack,告诉RabbitMQ-server已经收到并处理某条消息,然后RabbitMQ-server释放并删除这条消息。

4、消息ack没有超时的概念,这样在处理一个非常耗时的消息任务时候就不会出现问题

5、消息ack默认是开启的,通过no_ack=True标识关闭,在回调函数中basic_ack中

6、如果忘记调用basic_ack的话,这样消息在程序退出后重新发送,会导致RabbitMQ-server中消息堆积,占用越来越多的内存。通过如下命令进行确认:

[email protected]:~/code/rabbitmq/ch2$ rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
task_queue	3	0
...done.

存在三个堆积的任务

7、关于队列大小:如果所有的工作者都在处理任务,队列就会被填满。需要留意这个问题,要么添加更多的工作者,要么使用其他策略,例如设置队列大小等。

参考链接

http://adamlu.net/rabbitmq/tutorial-two-python

RabbitMQ之工作队列

时间: 2024-10-08 08:13:38

RabbitMQ之工作队列的相关文章

【译】RabbitMQ:工作队列(Work Queue)

在第一篇我们写了两个程序通过一个命名的队列分别发送和接收消息.在这一篇,我们将创建一个工作队列在多个工作线程间分发耗时的工作任务. 工作队列的核心思想是避免立刻处理资源密集型任务导致必须等待其执行完成.相反的,我们安排这些任务在稍晚的时间完成.我们将一个任务封装为一个消息并把它发送到队列中.一个后台的工作线程将从队列中取出任务并最终执行.当你运行多个工作线程,这些任务将在这些工作线程间共享. 这个概念对于在一个HTTP请求中处理复杂任务的Web应用尤其有用. 准备工作 在前一篇中,我们发送了一条

RabbitMQ 笔记-工作队列

工作队列的主要思想是不用等待资源密集型的任务处理完成, 为了确保消息或者任务不会丢失,rabbitmq 支持消息确信 ACK.ACK机制是消费者端从rabbitmq收到消息并处理完成后,反馈给rabbitmq,rabbitmq收到反馈信息后将消息从队列中删除 如果rabbitmq向消费者改善消息时,消费者服务器挂了,消息也不会超时,即使一个消息需要非常长的时间处理,也不会导致消息超时,永远不会从rabbitmq中删除, 忘记通过basicAck返回确认信息是个严重的错误 rabbitmq不允许重

RabbitMQ入门(二)工作队列

??在文章RabbitMQ入门(一)之Hello World,我们编写程序通过指定的队列来发送和接受消息.在本文中,我们将会创建工作队列(Work Queue),通过多个workers来分配耗时任务. ??工作队列(Work Queue,也被成为Task Queue,任务队列)的中心思想是,避免立即执行一个资源消耗巨大且必须等待其完成的任务.相反地,我们调度好队列可以安排该任务稍后执行.我们将一个任务(task)封装成一个消息,将它发送至队列.一个在后台运行的work进程将会抛出该任务,并最终执

RabbitMQ系列 第三篇:工作队列Work Queue

在上篇中我们实现了程序来从一个已经命名的队列里发送和接收消息.本篇博文中我们将要创建工作队列用来在多个执行角色间,使用定时器来分散执行任务. 工作队列的主要思想就是避开立刻处理某个资源消耗交大的任务并且需要等待它执行完成.取而代之的是我们可以将它加入计划列表,并在后边执行这些任务.我们将任务分装成一个消息,并发送到队列中.后台的工作程序在接收到消息后将会立刻执行任务.当运行多个执行器时,任务将会在他们之间共享. 这个概念在web应用程序中是比较实用的,对于一些在一个短的http请求里无法完成的复

RabbitMQ (二)工作队列 -摘自网络

这篇中我们将会创建一个工作队列用来在工作者(consumer)间分发耗时任务.工作队列的主要任务是:避免立刻执行资源密集型任务,然后必须等待其完成.相反地,我们进行任务调度:我们把任务封装为消息发送给队列.工作进行在后台运行并不断的从队列中取出任务然后执行.当你运行了多个工作进程时,任务队列中的任务将会被工作进程共享执行. 这样的概念在web应用中极其有用,当在很短的HTTP请求间需要执行复杂的任务. 1. 准备 我们使用Thread.sleep来模拟耗时的任务.我们在发送到队列的消息的末尾添加

RabbitMQ实例教程:用Java搞定工作队列

在上一节中,我们学会了使用编程的方式发送和接收一个命名好的队列.本节中我们将会使用工作队列在多个工作者之间分发任务. 工作队列的核心思想是避免立即处理高密集度必须等待完成的任务.它采用了安排任务的方式,将一个任务封装成一个消息把它放进队列.在后台运行的工作进程到时候会将它弹出并执行,这样任务队列中的任务就会被工作进程共享执行. 工作队列适用于Web应用中在一个短的HTTP请求中处理复杂任务的场景. 在上节中,我们发送了一个"Hello World!"字符串消息.现在发送多个字符串消息表

RabbitMQ --- Work Queues(工作队列)

目录 RabbitMQ --- Hello Mr.Tua 前言 Work Queues 即工作队列,它表示一个 Producer 对应多个 Consumer,包括两种分发模式:轮循分发(Round-robin)和公平分发(Fair dispatch).旨在为了避免立即执行任务时出现占用很多资源和时间却又必须等待完成的现象. 原理分析: Producer 把工作任务转化为消息发送给队列,当后台有一个 Consumer 进程在运行时,它会不间断地从队列中取出消息来执行:当后台有多个 Consumer

rabbitmq消息队列学习——"工作队列"

二."工作队列" 在第一节中我们发送接收消息直接从队列中进行.这节中我们会创建一个工作队列来分发处理多个工作者中的耗时性任务. 工作队列主要是为了避免进行一些必须同步等待的资源密集型的任务.实际上我们将这些任务时序话稍后分发完成.我们将某个任务封装成消息然后发送至队列,后台运行的工作进程将这些消息取出然后执行这些任务.当你运行多个工作进程的时候,这些任务也会在它们之间共享. 前期准备 上一节的练习中我们发送的是简单包含"Hello World!"的消息,这节我们还发

RabbitMQ学习(二)工作队列

1.工作队列(Work Queue)又叫任务队列(Task Queue)指将任务分发个多个消费者. 2.实际操作: 这里使用一个生产者产生多条数据提供给3个消费者 生产者代码: public class Producter { //队列名称 private final static String QUEUE_NAME = "Work_Queue"; public static void main(String[] args) throws IOException, TimeoutExc