Python自动化开发学习的第十周----RabbitMQ

RabbitMQ   消息队列   消息的传递

安装   http://www.rabbitmq.com/install-standalone-mac.html

如果是在windows上安装还要安装erlang语言

安装python RabbitMQ

pip install pika
or
easy_install pika
or
源码

https://pypi.python.org/pypi/pika

实现最简单的队列通信

http://www.rabbitmq.com/getstarted.html

producer(生产者)

import  pika

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) #建立一个socket

channel = connection.channel()#建立一个管道

channel.queue_declare(queue="hello")#声明queue

channel.basic_publish(exchange="",routing_key="hello",body="Hello world")
#n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.

print("produce send to consume")

connection.close()#关闭

consumer(消费者)

import pika

connetion = pika.BlockingConnection(pika.ConnectionParameters("localhost"))

channel = connetion.channel()

channel.queue_declare(queue="hello")
#You may ask why we declare the queue again ? we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
#was run before. But we‘re not yet sure which program to run first. In such cases it‘s a good
# practice to repeat declaring the queue in both programs.

def callback(ch,method,properties,body):
    print(ch,method,properties)
    print(body)

channel.basic_consume(callback,queue="hello",no_ack=True)

print("waiting  for messages. To exit press ctrl+c")

channel.start_consuming()

消息分发轮询

这是一个一对多的情况,一个生产者对应多个消费者

在这种模式下,RabbitMQ会默认把p发的消息依次分发给各个消费者(c),跟负载均衡差不多

producer

import  pika

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) #建立一个socket

channel = connection.channel()#建立一个管道

channel.queue_declare(queue="hello")#声明queue

channel.basic_publish(exchange="",routing_key="hello",body="Hello world")
#n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.

print("produce send to consume")

connection.close()#关闭

consumer

import pika

connetion = pika.BlockingConnection(pika.ConnectionParameters("localhost"))

channel = connetion.channel()

channel.queue_declare(queue="hello")
#You may ask why we declare the queue again ? we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
#was run before. But we‘re not yet sure which program to run first. In such cases it‘s a good
# practice to repeat declaring the queue in both programs.

def callback(ch,method,properties,body):
    print(ch,method,properties)
    print(body)
    ch.basic_ack(delivery_tag = method.delivery_tag)  #手动确认消息完毕

channel.basic_consume(callback,queue="hello")  #这里主要把no_ack去掉

print("waiting  for messages. To exit press ctrl+c")

channel.start_consuming()

先启动生产者,然后启动3个消费者,生产者多发几条消息,你会发先消息依次被消费者接收。

如果生产者在发送数据时,突然消费者断开,怎么保障数据的不丢失?

去除消费者中的no_ack,如果生产者正在发送,突然消费者断开,那么第一个消费者没接收完,转到第2个消费者接收,再断开,转到第3个消费者,以此类推。。

消息持久化

生产者如果在发送数据时突然断开,就会导致消息和消息队列丢失,怎么才能保障生产者在断开时,消息和消息队列不丢失呢?

channel.queue_declare(queue=‘hello‘, durable=True) #保证消息队列不丢失
channel.basic_publish(exchange=‘‘,
                      routing_key="hello",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent  保证消息不丢失
                      )) 

消息的公平分发

如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。

消息持久化+消息的公平分发的完整代码

producer

import  pika

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) #建立一个socket

channel = connection.channel()#建立一个管道

channel.queue_declare(queue="hello1",
                      durable=True
                      )#声明queue

channel.basic_publish(exchange="",
                      routing_key="hello1",
                      body="Hello world",
                      properties=pika.BasicProperties
                      (delivery_mode = 2,) # make message persistent
                      )

print("produce send to consume")

connection.close()#关闭

consumer

import pika

connetion = pika.BlockingConnection(pika.ConnectionParameters("localhost"))

channel = connetion.channel()

channel.queue_declare(queue="hello1",
                      durable=True)

def callback(ch,method,properties,body):
    print(ch,method,properties)
    print(body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1) #消息公平分发主要是添加prefech_count=1

channel.basic_consume(
                    callback,
                    queue="hello1"
                    )

print("waiting  for messages. To exit press ctrl+c")

channel.start_consuming()

消息发布和订阅

之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,

An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息

fanout: 所有bind到此exchange的queue都可以接收消息

direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息

topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

   表达式符号说明:#代表一个或多个字符,*代表任何字符
      例:#.a会匹配a.a,aa.a,aaa.a等
          *.a会匹配a.a,b.a,c.a等
     注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout 

headers: 通过headers 来决定把消息发给哪些queue

fanout 广播式接收消息

fanout_produce

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.exchange_declare(exchange="logs",exchange_type="fanout")

channel.basic_publish(exchange="logs",
                      routing_key="",
                      body="hello world!555"
                      )

print("[x] Sent hello world")

connection.close()

fanout_consume

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.exchange_declare(exchange="logs",exchange_type="fanout")

result = channel.queue_declare(exclusive=True)
#不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queuename = result.method.queue

channel.queue_bind(exchange="logs",queue=queuename)

print(‘ [*] Waiting for logs. To exit press CTRL+C‘)

def callback(ch,method,properties,body):
    print(body)

channel.basic_consume(
                    callback,
                    queue=queuename,
                    no_ack=True
                    )

channel.start_consuming()

direct有选择的接收消息

RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

direct_producer

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘direct_logs‘,
                         exchange_type="direct")

severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘
message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘
channel.basic_publish(exchange=‘direct_logs‘,
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

direct_consumer

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘direct_logs‘,
                         exchange_type="direct")

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange=‘direct_logs‘,
                       queue=queue_name,
                       routing_key=severity)

print(‘ [*] Waiting for logs. To exit press CTRL+C‘)

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

topic更加细致的接收消息

To receive all the logs run:

python receive_logs_topic.py "#"

To receive all logs from the facility "kern":

python receive_logs_topic.py "kern.*"

Or if you want to hear only about "critical" logs:

python receive_logs_topic.py "*.critical"

You can create multiple bindings:

python receive_logs_topic.py "kern.*" "*.critical"

And to emit a log with a routing key "kern.critical" type:

python emit_log_topic.py "kern.critical" "A critical kernel error"

topic_producer

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘topic_logs‘,
                         exchange_type="topic")

severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘
message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘
channel.basic_publish(exchange=‘topic_logs‘,
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

topic_consumer

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘topic_logs‘,
                         exchange_type="topic")

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange=‘topic_logs‘,
                       queue=queue_name,
                       routing_key=severity)

print(‘ [*] Waiting for logs. To exit press CTRL+C‘)

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

Remote procedure call (RPC) 远程过程调用

RPC是指远程过程调用,也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数/方法,由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。

为什么RPC呢?就是无法在一个进程内,甚至一个计算机内通过本地调用的方式完成的需求,比如不同的系统间的通讯,甚至不同的组织间的通讯。由于计算能力需要横向扩展,需要在多台机器组成的集群上部署应用,

RPC的协议有很多,比如最早的CORBA,Java RMI,Web Service的RPC风格,Hessian,Thrift,甚至Rest API。

RPC的处理流程:

  1. 当客户端启动时,创建一个匿名的回调队列。
  2. 客户端为RPC请求设置2个属性:replyTo,设置回调队列名字;correlationId,标记request。
  3. 请求被发送到rpc_queue队列中。
  4. RPC服务器端监听rpc_queue队列中的请求,当请求到来时,服务器端会处理并且把带有结果的消息发送给客户端。接收的队列就是replyTo设定的回调队列。
  5. 客户端监听回调队列,当有消息时,检查correlationId属性,如果与request中匹配,那就是结果了。

rpc_server

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host=‘localhost‘))

channel = connection.channel()

channel.queue_declare(queue=‘rpc_queue‘)

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange=‘‘,
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id=                                                          props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue=‘rpc_queue‘)

print(" [x] Awaiting RPC requests")
channel.start_consuming() 

rpc_client

import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host=‘localhost‘))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange=‘‘,
                                   routing_key=‘rpc_queue‘,
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.corr_id,
                                   ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(10)")
response = fibonacci_rpc.call(10)
print(" [.] Got %r" % response)

原文地址:https://www.cnblogs.com/garrett0220/p/8116792.html

时间: 2024-07-29 04:56:39

Python自动化开发学习的第十周----RabbitMQ的相关文章

Python自动化开发学习的第三周---python基础学习

本周内容 1.函数的基本语法和特性 2.参数与局部变量 3.返回值 4.递归函数 5.匿名函数 6.函数式编程介绍 7.高阶函数 8.内置函数 ---------分割线------------ 1.函数的基本语法和特性 函数是什么? 定义: 函数是指将一组语句的集合通过一个名字(函数名)封装起来,要想执行这个函数,只需调用其函数名即可 特性: 减少重复代码 使程序变的可扩展 使程序变得易维护 语法定义 1 def sayhi():#函数名 2 print("Hello, I'm nobody!&

Python自动化开发学习的第一周作业---三级菜单

作业需求: (1)运行程序输出第一级菜单(2)选择一级菜单某项,输出二级菜单,同理输出三级菜单(3)让用户选择是否要退出(4)有返回上一级菜单的功能 1 data = { 2 "天津":{ 3 "南开区":{ 4 "南开大学":{ 5 "历史系":{}, 6 "文学系":{}, 7 "英语系":{}, 8 }, 9 "科技大学":{ 10 "计算机系&quo

Python自动化开发学习12-MariaDB

关系型数据库 主流的关系型数据库大概有下面这些: Oracle : 甲骨文公司的企业级的数据库 SQL Server : 微软的 MySQL : 免费的数据库,现在也属于Oracle的旗下产品 MariaDB : 开源的数据库,MySQL的一个分支 PostgreSQL : 也是开源的 SQLite : 一款轻量级的数据库 DB2 : IBM的 RDBMS 术语 RDBMS(Relational Database Management System)即关系数据库管理系统,在开始之前,先了解下RD

Python自动化开发学习5

模块 在模块中,我们可以定义变量.函数,还有类(这个还没学到).总之应该就是所有的代码.先建一个文件,取名为module,py,内容如下: # 一下是module.py的内容 string = "This is module,py" def say_hi():     print("Hi") def test():     return "test in module.py" 在上面的模块中我们定义了1个变量和2个函数,现在我们要在另外一个文件中

Python自动化开发学习16-前端内容综合进阶

css补充 这里再补充几个css的知识点,下面会用到 最小宽度-min-width 设置元素的最小宽度.举例说明,比如设置一个百分比的宽度,那么元素的宽度的像素值是会随着页面大小而变化的.如果设置一个最小的像素宽度,那么当变化到最小值之后,不会继续变小.在下面的例子中,会出现滚动条,保证元素的宽度: <body> <div style="height: 80px;width: 100%;background-color: blue;min-width: 800px;"

Python自动化开发学习19-Django

接下来,我们把Django分为视图(View).路由系统(URL).ORM(Model).模板(Templates )这4块进行学习. 视图 提交数据 上节课已经用过 request.POST.get() 获取提交的数据了,现在来看看有多选框的情况,多选的话应该要提交多个数据.先写一个有单选.多选.下拉列表的html: <body> <form action="/choice/" method="post"> <p> 性别: &l

Python自动化开发学习22-Django上

session 上节已经讲了使用Cookie来做用户认证,但是Cookie的问题缺点:敏感信息不适合放在cookie里,敏感信息只能放在服务器端优势:把部分用户数据分散的存放在每个客户端,减轻服务端的压力Cookie是保存在用户浏览器端的键值对,Session是保存在服务器端的键值对.Session依赖Cookie,Cookie保存随机字符串,凭借这个随机字符串获取到服务器端Session里的内容.用Session来优化用户登录:用户登录后,生成一个随机字符串,通过Cookie发送给客户端保存.

python自动化开发学习【第六天】

import  re   # 正则只能匹配字符串 通配符: .      可以匹配任何字符(除了换行符),只能匹配一个字符 \      反斜杠,转义字符 ^     开头符号需要转义,匹配字符串的开始,也可以是否定运算符不许转义 $      匹配字符串的结束 \w    匹配字母或数字或下划线或汉字 \W   匹配非字母数字 \s     匹配任意的空白符 ,等价于[\t\n\r\f] \S    匹配任意非空字符 \d    匹配数字 \D    匹配任意服数字 \A    匹配字符串开始

python自动化开发学习-9 socket网络编程

一. 简介 python提供了两个级别访问的网络服务: 低级别的网络服务支持节本的socket,它提供了标准的BSD sockets API,可以访问底层操作系统socket接口的全部方法. 高级别的网络服务模块socketserver,它提供了服务器中心类,可以简化网络服务器的开发. socket介绍 socket通常也称作"套接字",用于描述IP地址和端口,是一个通信链的句柄,应用程序通常通过"套接字"向网络发出请求或者应答网络请求. socket起源于Unix