RabbitMQ的任务分发

direct分发机制会根据分发关键字(routing_key),将task分发到指定的queue,work只需要监听相应的queue即可,在代码中,需要设置相应的routing_key

fanout机制相反,他会将task分发给所有的queue

fanout模式:

emit_log.py

# -*- coding: UTF-8 -*-
import pika

if __name__ == ‘__main__‘:

    connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel = connection.channel()
    channel.exchange_declare(exchange="logs2",type="direct")
    message = "You are awsome!"
    for i in range(0, 100):  # 循环100次发送消息
        if i%2==0:
            channel.basic_publish(exchange="logs2", routing_key=‘even‘, body=message + " " + str(i),)
        else:
            channel.basic_publish(exchange="logs2", routing_key=‘old‘, body=message + " " + str(i),)

    print "sending ", message

    #两个receive_log 都将接收到task

receive_log.py

pika

__author__ = callback(, , , body):
    body

__name__ == :
    connection=pika.BlockingConnection(pika.ConnectionParameters())
    channel=connection.channel()
    channel.exchange_declare(=,=)
    result=channel.queue_declare(=)
    queue_name=result..queue
    ,queue_name
    channel.queue_bind(=,=queue_name)
    channel.basic_consume(callback,=queue_name,=)
    channel.start_consuming()

receive_log2.py

pika

__author__ = callback(, , , body):
    body

__name__ == :
    connection=pika.BlockingConnection(pika.ConnectionParameters())
    channel=connection.channel()
    channel.exchange_declare(=,=)
    result=channel.queue_declare(=)
    queue_name=result.method.queue
    ,queue_name
    channel.queue_bind(=,=queue_name)
    channel.basic_consume(callback,=queue_name,=)
    channel.start_consuming()

可以看出两个work均接受到所有的消息

direct模式:

work的代码只需要将上述代码中的type改为type="direct",并绑定不同的exchange即可,

pika

__author__ = __name__ == :

    connection = pika.BlockingConnection(pika.ConnectionParameters())
    channel = connection.channel()
    channel.exchange_declare(=,=)
    message = i (, ):  i%==:
            channel.basic_publish(=, =, =message + + (i),)
        :
            channel.basic_publish(=, =, =message + + (i),)

    , message

receive_even_log.py

pika

__author__ = callback(, , , body):
    body

__name__ == :
    connection=pika.BlockingConnection(pika.ConnectionParameters())
    channel=connection.channel()
    channel.exchange_declare(=,=)
    result=channel.queue_declare(=)
    queue_name=result..queue
    ,queue_name
    channel.queue_bind(=,=queue_name,=)
    channel.basic_consume(callback,=queue_name,=)
    channel.start_consuming()

receive_old_log.py

pika

__author__ = callback(, , , body):
    body

__name__ == :
    connection=pika.BlockingConnection(pika.ConnectionParameters())
    channel=connection.channel()
    channel.exchange_declare(=,=)
    result=channel.queue_declare(=)
    queue_name=result.method.queue
    ,queue_name
    channel.queue_bind(=,=queue_name,=)
    channel.basic_consume(callback,=queue_name,=)
    channel.start_consuming()

从结果中看出:task只分发给了相应的queue

时间: 2024-10-20 22:47:15

RabbitMQ的任务分发的相关文章

Java使用RabbitMQ之订阅分发(Topic)

使用RabbitMQ进行消息发布和订阅,生产者将消息发送给转发器(exchange),转发器根据路由键匹配已绑定的消息队列并转发消息,主题模式支持路由键的通配. 生产者代码: 1 package org.study.exchange3.topic3; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import org.junit.Test; 6 import org.study

Java使用RabbitMQ之公平分发

发送消息: 1 package org.study.workfair; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import org.junit.Test; 6 import org.study.utils.ConnectionUtils; 7 8 import java.io.IOException; 9 import java.util.concurrent.Time

rabbitmq 公平分发和消息接收确认(转载)

原文地址:http://www.jianshu.com/p/f63820fe2638 当生产者投递消息到broker,rabbitmq把消息分发到消费者. 如果设置了autoAck=true 消费者会自动确认收到信息.这时broker会立即将消息删除,这种情况下如果消费者出现异常(连接中断)该消息就会丢失.为了保证消息能够被正确的消费,rabbitmq支持消息确认. String basicConsume(String queue, boolean autoAck, Consumer callb

RabbitMQ指南(Java)

原文地址:http://www.rabbitmq.com/getstarted.html 翻译得不好,欢迎指出. 一.Hello World 1.基本概念介绍 RabbitMQ是一个消息代理(或者说消息队列),它的主要意图很明显,就是接收和转发消息.你可以把它想象成一个邮局:当你把一封邮件放入邮箱,邮递员会帮你把邮件送到收件人的手上.在这里,RabbitMQ就好比一个邮箱.邮局或者邮递员. RabbitMQ和邮局的主要区别在于,RabbitMQ不是处理邮件,而是接收.存储和将消息以二进制的方式转

循环分发消息(Round-robin dispatching)

在上节中,我们发送了一个"Hello World!"字符串消息.现在发送多个字符串消息表示复杂任务.我们现在像图片重置大小,渲染PDF文件这样的真实任务,但我们使用 Thread.sleep() 假装正在我们忙.我们将字符串中的点的数量作为其复杂性:每个点都占1秒钟"工作".例如,一个包含"..."这样的假任务就会需要三秒钟. NewTask.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

RabbitMQ实例教程:用Java搞定工作队列

在上一节中,我们学会了使用编程的方式发送和接收一个命名好的队列.本节中我们将会使用工作队列在多个工作者之间分发任务. 工作队列的核心思想是避免立即处理高密集度必须等待完成的任务.它采用了安排任务的方式,将一个任务封装成一个消息把它放进队列.在后台运行的工作进程到时候会将它弹出并执行,这样任务队列中的任务就会被工作进程共享执行. 工作队列适用于Web应用中在一个短的HTTP请求中处理复杂任务的场景. 在上节中,我们发送了一个"Hello World!"字符串消息.现在发送多个字符串消息表

rabbitmq消息队列学习——"工作队列"

二."工作队列" 在第一节中我们发送接收消息直接从队列中进行.这节中我们会创建一个工作队列来分发处理多个工作者中的耗时性任务. 工作队列主要是为了避免进行一些必须同步等待的资源密集型的任务.实际上我们将这些任务时序话稍后分发完成.我们将某个任务封装成消息然后发送至队列,后台运行的工作进程将这些消息取出然后执行这些任务.当你运行多个工作进程的时候,这些任务也会在它们之间共享. 前期准备 上一节的练习中我们发送的是简单包含"Hello World!"的消息,这节我们还发

RabbitMQ 关键词

RabbitMQ是流行的开源消息队列系统,用erlang语言开发.RabbitMQ是AMQP(高级消息队列协议)的标准实现. RabbitMQ中间件分为服务端(RabbitMQ Server)和客户端(RabbitMQ Client),服务端可以理解为是一个消息的代理消费者,客户端又分为消息生产者(Producer)和消息消费者(Consumer). 1.消息生产者(Producer):主要生产消息并将消息基于TCP协议,通过建立Connection和Channel,将消息传输给RabbitMQ

RabbitMQ消息队列应用

RabbitMQ消息队列应用 消息通信组件Net分布式系统的核心中间件之一,应用与系统高并发,各个组件之间解耦的依赖的场景.本框架采用消息队列中间件主要应用于两方面:一是解决部分高并发的业务处理:二是通过消息队列传输系统日志.目前业界使用较多的消息队列组件有RabbitMQ.ActiveMQ.MSMQ.kafka.zeroMQ等,本文对系统架构之MQ Component诠释,并采用RabbitMQ作为消息队列中间件. 图1- 消息队列组件示意图 一.RabbitMQ介绍 RabbitMQ是一款基