Python实现RabbitMQ中6种消息模型

RabbitMQ与Redis对比

? RabbitMQ是一种比较流行的消息中间件,之前我一直使用redis作为消息中间件,但是生产环境比较推荐RabbitMQ来替代Redis,所以我去查询了一些RabbitMQ的资料。相比于Redis,RabbitMQ优点很多,比如:

  1. 具有消息消费确认机制
  2. 队列,消息,都可以选择是否持久化,粒度更小、更灵活。
  3. 可以实现负载均衡

RabbitMQ应用场景

  1. 异步处理:比如用户注册时的确认邮件、短信等交由rabbitMQ进行异步处理
  2. 应用解耦:比如收发消息双方可以使用消息队列,具有一定的缓冲功能
  3. 流量削峰:一般应用于秒杀活动,可以控制用户人数,也可以降低流量
  4. 日志处理:将info、warning、error等不同的记录分开存储

RabbitMQ消息模型

? 这里使用Python的pika这个库来实现RabbitMQ中常见的6种消息模型。没有的可以先安装:

pip install pika


1.单生产单消费模型:即完成基本的一对一消息转发。

# 生产者代码
import pika

credentials = pika.PlainCredentials(‘chuan‘, ‘123‘)  # mq用户名和密码,没有则需要自己创建
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘,
                                                               port=5672,
                                                               virtual_host=‘/‘,
                                                               credentials=credentials))

# 建立rabbit协议的通道
channel = connection.channel()
# 声明消息队列,消息将在这个队列传递,如不存在,则创建。durable指定队列是否持久化
channel.queue_declare(queue=‘python-test‘, durable=False)

# message不能直接发送给queue,需经exchange到达queue,此处使用以空字符串标识的默认的exchange
# 向队列插入数值 routing_key是队列名
channel.basic_publish(exchange=‘‘,
                      routing_key=‘python-test‘,
                      body=‘Hello world!2‘)
# 关闭与rabbitmq server的连接
connection.close()
# 消费者代码
import pika

credentials = pika.PlainCredentials(‘chuan‘, ‘123‘)
# BlockingConnection:同步模式
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘,
                                                               port=5672,
                                                               virtual_host=‘/‘,
                                                            credentials=credentials))
channel = connection.channel()
# 申明消息队列。当不确定生产者和消费者哪个先启动时,可以两边重复声明消息队列。
channel.queue_declare(queue=‘python-test‘, durable=False)
# 定义一个回调函数来处理消息队列中的消息,这里是打印出来
def callback(ch, method, properties, body):
    # 手动发送确认消息
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print(body.decode())
    # 告诉生产者,消费者已收到消息

# 告诉rabbitmq,用callback来接收消息
# 默认情况下是要对消息进行确认的,以防止消息丢失。
# 此处将auto_ack明确指明为True,不对消息进行确认。
channel.basic_consume(‘python-test‘,
                      on_message_callback=callback)
                      # auto_ack=True)  # 自动发送确认消息
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
channel.start_consuming()


2.消息分发模型:多个收听者监听一个队列。

# 生产者代码
import pika

credentials = pika.PlainCredentials(‘chuan‘, ‘123‘)  # mq用户名和密码
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘,
                                                               port=5672,
                                                               virtual_host=‘/‘,
                                                               credentials=credentials))

# 建立rabbit协议的通道
channel = connection.channel()
# 声明消息队列,消息将在这个队列传递,如不存在,则创建。durable指定队列是否持久化。确保没有确认的消息不会丢失
channel.queue_declare(queue=‘rabbitmqtest‘, durable=True)

# message不能直接发送给queue,需经exchange到达queue,此处使用以空字符串标识的默认的exchange
# 向队列插入数值 routing_key是队列名
# basic_publish的properties参数指定message的属性。此处delivery_mode=2指明message为持久的
for i in range(10):
    channel.basic_publish(exchange=‘‘,
                          routing_key=‘python-test‘,
                          body=‘Hello world!%s‘ % i,
                          properties=pika.BasicProperties(delivery_mode=2))
# 关闭与rabbitmq server的连接
connection.close()
# 消费者代码,consume1与consume2
import pika
import time

credentials = pika.PlainCredentials(‘chuan‘, ‘123‘)
# BlockingConnection:同步模式
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘,
                                                               port=5672,
                                                               virtual_host=‘/‘,
                                                               credentials=credentials))
channel = connection.channel()
# 申明消息队列。当不确定生产者和消费者哪个先启动时,可以两边重复声明消息队列。
channel.queue_declare(queue=‘rabbitmqtest‘, durable=True)
# 定义一个回调函数来处理消息队列中的消息,这里是打印出来
def callback(ch, method, properties, body):
    # 手动发送确认消息
    time.sleep(10)
    print(body.decode())
    # 告诉生产者,消费者已收到消息
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 如果该消费者的channel上未确认的消息数达到了prefetch_count数,则不向该消费者发送消息
channel.basic_qos(prefetch_count=1)
# 告诉rabbitmq,用callback来接收消息
# 默认情况下是要对消息进行确认的,以防止消息丢失。
# 此处将no_ack明确指明为True,不对消息进行确认。
channel.basic_consume(‘python-test‘,
                      on_message_callback=callback)
                      # auto_ack=True)  # 自动发送确认消息
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
channel.start_consuming()


3.fanout消息订阅模式:生产者将消息发送到Exchange,Exchange再转发到与之绑定的Queue中,每个消费者再到自己的Queue中取消息。

# 生产者代码
import pika

credentials = pika.PlainCredentials(‘chuan‘, ‘123‘)  # mq用户名和密码
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘,
                                                               port=5672,
                                                               virtual_host=‘/‘,
                                                               credentials=credentials))
# 建立rabbit协议的通道
channel = connection.channel()
# fanout: 所有绑定到此exchange的queue都可以接收消息(实时广播)
# direct: 通过routingKey和exchange决定的那一组的queue可以接收消息(有选择接受)
# topic: 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息(更细致的过滤)
channel.exchange_declare(‘logs‘, exchange_type=‘fanout‘)

#因为是fanout广播类型的exchange,这里无需指定routing_key
for i in range(10):
    channel.basic_publish(exchange=‘logs‘,
                          routing_key=‘‘,
                          body=‘Hello world!%s‘ % i)

# 关闭与rabbitmq server的连接
connection.close()
import pika

credentials = pika.PlainCredentials(‘chuan‘, ‘123‘)
# BlockingConnection:同步模式
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘,
                                                               port=5672,
                                                               virtual_host=‘/‘,
                                                               credentials=credentials))
channel = connection.channel()

#作为好的习惯,在producer和consumer中分别声明一次以保证所要使用的exchange存在
channel.exchange_declare(exchange=‘logs‘,
                         exchange_type=‘fanout‘)

# 随机生成一个新的空的queue,将exclusive置为True,这样在consumer从RabbitMQ断开后会删除该queue
# 是排他的。
result = channel.queue_declare(‘‘, exclusive=True)

# 用于获取临时queue的name
queue_name = result.method.queue

# exchange与queue之间的关系成为binding
# binding告诉exchange将message发送该哪些queue
channel.queue_bind(exchange=‘logs‘,
                   queue=queue_name)

# 定义一个回调函数来处理消息队列中的消息,这里是打印出来
def callback(ch, method, properties, body):
    # 手动发送确认消息
    print(body.decode())
    # 告诉生产者,消费者已收到消息
    #ch.basic_ack(delivery_tag=method.delivery_tag)

# 如果该消费者的channel上未确认的消息数达到了prefetch_count数,则不向该消费者发送消息
channel.basic_qos(prefetch_count=1)
# 告诉rabbitmq,用callback来接收消息
# 默认情况下是要对消息进行确认的,以防止消息丢失。
# 此处将no_ack明确指明为True,不对消息进行确认。
channel.basic_consume(queue=queue_name,
                      on_message_callback=callback,
                      auto_ack=True)  # 自动发送确认消息
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
channel.start_consuming()


4.direct路由模式:此时生产者发送消息时需要指定RoutingKey,即路由Key,Exchange接收到消息时转发到与RoutingKey相匹配的队列中。

# 生产者代码,测试命令可以使用:python produce.py error 404error
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel()

# 声明一个名为direct_logs的direct类型的exchange
# direct类型的exchange
channel.exchange_declare(exchange=‘direct_logs‘,
                         exchange_type=‘direct‘)

# 从命令行获取basic_publish的配置参数
severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘
message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘

# 向名为direct_logs的exchage按照设置的routing_key发送message
channel.basic_publish(exchange=‘direct_logs‘,
                      routing_key=severity,
                      body=message)

print(" [x] Sent %r:%r" % (severity, message))
connection.close()
# 消费者代码,测试可以使用:python consume.py error
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel()

# 声明一个名为direct_logs类型为direct的exchange
# 同时在producer和consumer中声明exchage或queue是个好习惯,以保证其存在
channel.exchange_declare(exchange=‘direct_logs‘,
                         exchange_type=‘direct‘)

result = channel.queue_declare(‘‘, exclusive=True)
queue_name = result.method.queue

# 从命令行获取参数:routing_key
severities = sys.argv[1:]
if not severities:
    print(sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],))
    sys.exit(1)

for severity in severities:
    # exchange和queue之间的binding可接受routing_key参数
    # fanout类型的exchange直接忽略该参数。direct类型的exchange精确匹配该关键字进行message路由
    # 一个消费者可以绑定多个routing_key
    # Exchange就是根据这个RoutingKey和当前Exchange所有绑定的BindingKey做匹配,
    # 如果满足要求,就往BindingKey所绑定的Queue发送消息
    channel.queue_bind(exchange=‘direct_logs‘,
                       queue=queue_name,
                       routing_key=severity)

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body,))

channel.basic_consume(queue=queue_name,
                      on_message_callback=callback,
                      auto_ack=True)

channel.start_consuming()
5.topic匹配模式:更细致的分组,允许在RoutingKey中使用匹配符。

*:匹配一个单词
#:匹配0个或多个单词

# 生产者代码,基本不变,只需将exchange_type改为topic(测试:python produce.py rabbitmq.red
# red color is my favorite
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel()

# 声明一个名为direct_logs的direct类型的exchange
# direct类型的exchange
channel.exchange_declare(exchange=‘topic_logs‘,
                         exchange_type=‘topic‘)

# 从命令行获取basic_publish的配置参数
severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘
message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘

# 向名为direct_logs的exchange按照设置的routing_key发送message
channel.basic_publish(exchange=‘topic_logs‘,
                      routing_key=severity,
                      body=message)

print(" [x] Sent %r:%r" % (severity, message))
connection.close()
# 消费者代码,(测试:python consume.py *.red)
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel()

# 声明一个名为direct_logs类型为direct的exchange
# 同时在producer和consumer中声明exchage或queue是个好习惯,以保证其存在
channel.exchange_declare(exchange=‘topic_logs‘,
                         exchange_type=‘topic‘)

result = channel.queue_declare(‘‘, exclusive=True)
queue_name = result.method.queue

# 从命令行获取参数:routing_key
severities = sys.argv[1:]
if not severities:
    print(sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],))
    sys.exit(1)

for severity in severities:
    # exchange和queue之间的binding可接受routing_key参数
    # fanout类型的exchange直接忽略该参数。direct类型的exchange精确匹配该关键字进行message路由
    # 一个消费者可以绑定多个routing_key
    # Exchange就是根据这个RoutingKey和当前Exchange所有绑定的BindingKey做匹配,
    # 如果满足要求,就往BindingKey所绑定的Queue发送消息
    channel.queue_bind(exchange=‘topic_logs‘,
                       queue=queue_name,
                       routing_key=severity)

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body,))

channel.basic_consume(queue=queue_name,
                      on_message_callback=callback,
                      auto_ack=True)

channel.start_consuming()


6.RPC远程过程调用:客户端与服务器之间是完全解耦的,即两端既是消息的发送者也是接受者。

# 生产者代码
import pika
import uuid

# 在一个类中封装了connection建立、queue声明、consumer配置、回调函数等
class FibonacciRpcClient(object):
    def __init__(self):
        # 建立到RabbitMQ Server的connection
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))

        self.channel = self.connection.channel()

        # 声明一个临时的回调队列
        result = self.channel.queue_declare(‘‘, exclusive=True)
        self._queue = result.method.queue

        # 此处client既是producer又是consumer,因此要配置consume参数
        # 这里的指明从client自己创建的临时队列中接收消息
        # 并使用on_response函数处理消息
        # 不对消息进行确认
        self.channel.basic_consume(queue=self._queue,
                                   on_message_callback=self.on_response,
                                   auto_ack=True)
        self.response = None
        self.corr_id = None

    # 定义回调函数
    # 比较类的corr_id属性与props中corr_id属性的值
    # 若相同则response属性为接收到的message
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        # 初始化response和corr_id属性
        self.corr_id = str(uuid.uuid4())

        # 使用默认exchange向server中定义的rpc_queue发送消息
        # 在properties中指定replay_to属性和correlation_id属性用于告知远程server
        # correlation_id属性用于匹配request和response
        self.channel.basic_publish(exchange=‘‘,
                                   routing_key=‘rpc_queue‘,
                                   properties=pika.BasicProperties(
                                       reply_to=self._queue,
                                       correlation_id=self.corr_id,
                                   ),
                                   # message需为字符串
                                   body=str(n))

        while self.response is None:
            self.connection.process_data_events()

        return int(self.response)

# 生成类的实例
fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
# 调用实例的call方法
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

# 消费者代码,这里以生成斐波那契数列为例
import pika

# 建立到达RabbitMQ Server的connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel()

# 声明一个名为rpc_queue的queue
channel.queue_declare(queue=‘rpc_queue‘)

# 计算指定数字的斐波那契数
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

# 回调函数,从queue接收到message后调用该函数进行处理
def on_request(ch, method, props, body):
    # 由message获取要计算斐波那契数的数字
    n = int(body)
    print(" [.] fib(%s)" % n)
    # 调用fib函数获得计算结果
    response = fib(n)

    # exchage为空字符串则将message发送个到routing_key指定的queue
    # 这里queue为回调函数参数props中reply_ro指定的queue
    # 要发送的message为计算所得的斐波那契数
    # properties中correlation_id指定为回调函数参数props中co的rrelation_id
    # 最后对消息进行确认
    ch.basic_publish(exchange=‘‘,
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id=props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 只有consumer已经处理并确认了上一条message时queue才分派新的message给它
channel.basic_qos(prefetch_count=1)

# 设置consumeer参数,即从哪个queue获取消息使用哪个函数进行处理,是否对消息进行确认
channel.basic_consume(queue=‘rpc_queue‘, on_message_callback=on_request)

print(" [x] Awaiting RPC requests")

# 开始接收并处理消息
channel.start_consuming()

原文地址:https://www.cnblogs.com/kcxg/p/12625389.html

时间: 2025-01-02 16:42:17

Python实现RabbitMQ中6种消息模型的相关文章

rabbitmq五种消息模型整理

目录 0. 配置项目 1. 基本消息模型 1.1 生产者发送消息 1.2 消费者获取消息(自动ACK) 1.3 消息确认机制(ACK) 1.4 消费者获取消息(手动ACK) 1.5 自动ACK存在的问题 1.6 演示手动ACK 2. work消息模型 2.1 生产者 2.2 消费者1 2.3 消费者2 2.4 能者多劳 3. 订阅模型分类 4. 订阅模型-Fanout 4.1 生产者 4.2 消费者1 4.3 消费者2 4.4 测试 5. 订阅模型-Direct 5.1 生产者 5.2 消费者1

JMS两种消息模型

前段时间学习EJB,接触到了JMS(Java消息服务),JMS支持两种消息模型:Point-to-Point(P2P)和Publish/Subscribe(Pub/Sub),即点对点和发布订阅模型. 个人觉得这两个模型挺容易理解的,因为生活中的例子还挺多的. 1,  P2P模型 有以下概念:消息队列(Queue).发送者(Sender).接收者(Receiver).每个消息都被发送到一个特定的队列,接收者从队列获取消息.队列保留着消息,直到它们被消费或超时. (1) 每个消息只有一个消费者(Co

OpenCV Using Python——RGB颜色空间中的统计肤色模型

RGB颜色空间中的统计肤色模型 1. 统计肤色模型简介 前几篇文章提出的参数肤色模型,由于参数值固定,所以来自测试图像集的分割误差有可能很大(不过在光照足够的情况下已经够用了).对于光线比较昏暗的场合,一般人们不太会训练这样的数据来提取参数,同时光照不足时颜色容易跟背景混在一起,所以做出来效果也不好.在光照昏暗的场合,直接在自然光和复杂背景下分割肤色到现在为止依然是比较困难的事.这里讨论的仅仅是单个像素是否为肤色,如果要求在大面积肤色的背景下分割出手或脸来,这种不依赖上下文的方法就完蛋了. 那么

python图工具中基于随机块模型动态网络社团检测

原文链接:http://tecdat.cn/?p=7602 这是“政治博客圈和2004年美国大选”中的政治博客网络图,但是边缘束是使用随机块模型确定的(注:下图与图相同(即,布局和数据相同)). Tiago论文中的5-我只是在上面放了一个黑色背景 . 边缘配色方案与Adamic和Glance的原始论文中的相同,即每个节点对应一个博客URL,颜色反映政治取向,红色代表保守派,蓝色代表自由派.橙色边从自由派博客到保守派博客,紫色边从保守派到自由派(参见Adamic和Glance中的图1). 颜色方案

python collection模块中几种数据结构(Counter、OrderedDict、namedtup)

collection模块中有几种数据结构我们可能用得到. Counter是字典的子类,负责计数的一个字典,支持 + 加法 - 减法 & 求公共元素 | 求并集 print('Counter类型的应用') c = Counter("dengjingdong") #c = Counter({'n': 3, 'g': 3, 'd': 2, 'i': 1, 'o': 1, 'e': 1, 'j': 1}) print("原始数据:",c) print("最

理解RabbitMQ中的AMQP-0-9-1模型

前提 之前有个打算在学习RabbitMQ之前,把AMQP详细阅读一次,挑出里面的重点内容.后来找了下RabbitMQ的官方文档,发现了有一篇文档专门介绍了RabbitMQ中实现的AMQP模型部分,于是直接基于此文档和个人理解写下这篇文章. AMQP协议 AMQP全称是Advanced Message Queuing Protocol,它是一个(分布式)消息传递协议,使用和符合此协议的客户端能够基于使用和符合此协议的消息传递中间件代理(Broker,也就是经纪人,个人感觉叫代理合口一些)进行通信.

消息模型:主题和队列的区别

一.消息队列的演进 1.初始阶段 最初的消息队列,就是一个严格意义上的队列.队列是一种数据结构,先进先出,在消息入队出队过程中,保证这些消息严格有序.早期的消息队列就是按照“队列”的数据结构设计的. 队列模型: 生产者(Producer)发消息就是入队操作,消费者(Consumer)收消息就是出队也就是删除操作,服务端存放消息的容器自然就称为“队列”. 如果有多个生产者往同一个队列里面发送消息,这个队列中可以消费到的消息,就是这些生产者生产的所有消息的合集.消息的顺序就是这些生产者发送消息的自然

Kafka消息模型

一.消息传递模型 传统的消息队列最少提供两种消息模型,一种P2P,一种PUB/SUB,而Kafka并没有这么做,巧妙的,它提供了一个消费者组的概念,一个消息可以被多个消费者组消费,但是只能被一个消费者组里的一个消费者消费,这样当只有一个消费者组时就等同与P2P模型,当存在多个消费者组时就是PUB/SUB模型. Kafka 的 consumer 是以pull的形式获取消息数据的. pruducer push消息到kafka cluster ,consumer从集群中pull消息,如下图.该博客主要

【RabbitMQ】如何进行消息可靠投递【上篇】

说明 前几天,突然发生线上报警,钉钉连发了好几条消息,一看是RabbitMQ相关的消息,心头一紧,难道翻车了? [橙色报警]?应用[xxx]在[08-15?16:36:04]发生[错误日志异常],alertId=[xxx].由[org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620]触发. 应用xxx?可能原因如下 服务名为: ?异常为:org.springframework.amqp.rabbit.lis