python学习-day11

一、Rabbitmq

RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。RabbitMQ使用的是AMQP协议,它是一种二进制协议。默认启动端口 5672。在 RabbitMQ 中,如下图结构:

  • 左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。生产者需要完成的任务:
  • 1 创建RabbitMQ连接
    2 获取信道
    3 声明交换器
    4 创建消息
    5 发布消息
    6 关闭信道
    7 关闭RabbitMQ连接 
  • 中间即是 RabbitMQ,其中包括了 交换机 和 队列。
  • 右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。消费者需要完成的任务:

    1 创建RabbitMQ连接
    2 获取信道
    3 声明交换器
    4 声明队列
    5 队列和交换器绑定
    6 消费信息
    7 关闭信道
    8 关闭RabbitMQ连接
  • Exchange: 接受客户端发送的消息,并根据Binding将消息路由给服务器中的队列,Exchange分为direct, fanout, topic三种。
  • Binding: 连接Exchange和Queue,包含路由规则。
  • Queue: 消息队列,存储还未被消费的消息。
  • Message: Header+Body
  • Channel: 通道,执行AMQP的命令;一个连接可创建多个通道以节省资源

1. dircted exchange

路由键exchange,该交换机收到消息后会把消息发送到指定routing-key的queue中。rabbitmq内部默认有一个特殊的dircted exchange,该交换机的name是空字符串,所有queue都默认binding 到该交换机上。所有binding到该交换机上的queue,routing-key都和queue的name一样。

生产者:

 1 import pika
 2 credentials = pika.PlainCredentials(‘admin‘, ‘123456‘)
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(
 4     ‘192.168.170.134‘,5672,‘/‘,credentials))
 5 channel = connection.channel()
 6
 7 # 声明queue
 8 channel.queue_declare(queue=‘hello‘)
 9
10 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
11 channel.basic_publish(exchange=‘‘,
12                       routing_key=‘hello‘,
13                       body=‘Hello World!‘)
14 print(" [x] Sent ‘Hello World!‘")
15 connection.close()

消费者:

 1 import pika
 2 credentials = pika.PlainCredentials(‘admin‘, ‘123456‘)
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(
 4     ‘192.168.170.134‘,5672,‘/‘,credentials))
 5 channel = connection.channel()
 6 def callback(ch, method, properties, body):
 7     print(" [x] Received %r" % body)
 8
 9
10 channel.basic_consume(callback,
11                       queue=‘hello‘,
12                       no_ack=True)
13
14 print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
15 channel.start_consuming()

队列绑定关键字,发送者将数据关键字发送到消息Exchange,Exchange根据关键字判定应该将数据发送至指定队列。

生产者:

 1 import pika,sys
 2
 3 credentials = pika.PlainCredentials(‘admin‘, ‘123456‘)
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5     ‘192.168.170.134‘,5672,‘/‘,credentials))
 6 channel = connection.channel()
 7 channel.exchange_declare(exchange=‘direct_logs‘,
 8                          type=‘direct‘)
 9
10 severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘
11 message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘
12 channel.basic_publish(exchange=‘direct_logs‘,
13                       routing_key=severity,
14                       body=message)
15 print(" [x] Sent %r:%r" % (severity, message))
16 connection.close()

消费者:

 1 import pika,sys
 2
 3 credentials = pika.PlainCredentials(‘admin‘, ‘123456‘)
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5     ‘192.168.170.134‘,5672,‘/‘,credentials))
 6 channel = connection.channel()
 7
 8 channel.exchange_declare(exchange=‘direct_logs‘,
 9                          type=‘direct‘)
10
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13
14 severities = sys.argv[1:]
15 if not severities:
16     sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
17     sys.exit(1)
18
19 for severity in severities:
20     channel.queue_bind(exchange=‘direct_logs‘,
21                        queue=queue_name,
22                        routing_key=severity)
23
24 print(‘ [*] Waiting for logs. To exit press CTRL+C‘)
25
26
27 def callback(ch, method, properties, body):
28     print(" [x] %r:%r" % (method.routing_key, body))
29
30
31 channel.basic_consume(callback,
32                       queue=queue_name,
33                       no_ack=True)
34
35 channel.start_consuming()

运行结果:

2. fanout exchange

发布/订阅exchange ,发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,二发布者发布消息时,会将消息放置在所有相关队列中。任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。

生产者:

 1 import pika,sys
 2
 3 credentials = pika.PlainCredentials(‘admin‘, ‘123456‘)
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5     ‘192.168.170.134‘,5672,‘/‘,credentials))
 6 channel = connection.channel()
 7 # 声明queue
 8 channel.exchange_declare(exchange=‘logs‘,
 9                          type=‘fanout‘)
10
11 message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!"
12 channel.basic_publish(exchange=‘logs‘,
13                       routing_key=‘‘,
14                       body=message)
15 print(" [x] Sent %r" % message)
16 connection.close()

消费者:

 1 import pika
 2
 3 credentials = pika.PlainCredentials(‘admin‘, ‘123456‘)
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5     ‘192.168.170.134‘,5672,‘/‘,credentials))
 6 channel = connection.channel()
 7
 8 result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
 9 queue_name = result.method.queue
10
11 channel.queue_bind(exchange=‘logs‘,
12                    queue=queue_name)
13
14 print(‘ [*] Waiting for logs. To exit press CTRL+C‘)
15
16
17 def callback(ch, method, properties, body):
18     print(" [x] %r" % body)
19
20
21 channel.basic_consume(callback,
22                       queue=queue_name,
23                       no_ack=True)
24
25 channel.start_consuming()

3. topic exchange

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入"路由值"和"关键字"进行匹配,匹配成功,则将数据发送到指定队列。

  • # :表示可以匹配0个或多个单词;
  • * :表示只能匹配一个单词。

生产者:

 1 import pika,sys
 2
 3 credentials = pika.PlainCredentials(‘admin‘, ‘123456‘)
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5     ‘192.168.170.134‘,5672,‘/‘,credentials))
 6 channel = connection.channel()
 7
 8 channel.exchange_declare(exchange=‘topic_logs‘,
 9                          type=‘topic‘)
10
11 routing_key = sys.argv[1] if len(sys.argv) > 1 else ‘anonymous.info‘
12 message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘
13 channel.basic_publish(exchange=‘topic_logs‘,
14                       routing_key=routing_key,
15                       body=message)
16 print(" [x] Sent %r:%r" % (routing_key, message))
17 connection.close()

消费者:

 1 import pika,sys
 2
 3 credentials = pika.PlainCredentials(‘admin‘, ‘123456‘)
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5     ‘192.168.170.134‘,5672,‘/‘,credentials))
 6 channel = connection.channel()
 7
 8 channel.exchange_declare(exchange=‘topic_logs‘,
 9                          type=‘topic‘)
10
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13
14 binding_keys = sys.argv[1:]
15 if not binding_keys:
16     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
17     sys.exit(1)
18
19 for binding_key in binding_keys:
20     channel.queue_bind(exchange=‘topic_logs‘,
21                        queue=queue_name,
22                        routing_key=binding_key)
23
24 print(‘ [*] Waiting for logs. To exit press CTRL+C‘)
25
26
27 def callback(ch, method, properties, body):
28     print(" [x] %r:%r" % (method.routing_key, body))
29
30
31 channel.basic_consume(callback,
32                       queue=queue_name,
33                       no_ack=True)
34
35 channel.start_consuming()

二、基于rabbitmq的RPC

基于rabbitmq的rpc实现流程:

(1)首先客户端发送一个reply_to和corrention_id的请求,发布到RPC队列中;

(2)服务器端处理这个请求,并把处理结果发布到一个回调Queue,此Queue的名称应当与reply_to的名称一致

(3)客户端从回调Queue中得到先前correlation_id设定的值的处理结果。如果碰到和先前不一样的corrention_id的值,将会忽略而不是抛出异常。

对于上面所提到的回调Queue中的消费处理使用的是BasicProperties类。

服务端:

 1 import pika
 2
 3 cre_publiser = pika.PlainCredentials(‘admin‘, ‘123456‘)
 4 conn_para = pika.ConnectionParameters(‘192.168.170.134‘,5672,‘/‘,cre_publiser)
 5 connection = pika.BlockingConnection(conn_para)
 6
 7 # 建立会话
 8 channel = connection.channel()
 9
10 # 声明RPC请求队列
11 channel.queue_declare(queue=‘rpc_queue‘)
12
13 # 数据处理方法
14 def fib(n):
15     if n == 0:
16         return 0
17     elif n == 1:
18         return 1
19     else:
20         return fib(n-1) + fib(n-2)
21
22 # 对RPC请求队列中的请求进行处理
23 def on_request(ch, method, props, body):
24     n = int(body)
25
26     print(" [.] fib(%s)" % n)
27
28     # 调用数据处理方法
29     response = fib(n)
30
31     # 将处理结果(响应)发送到回调队列
32     ch.basic_publish(exchange=‘‘,
33                      routing_key=props.reply_to,
34                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
35                      body=str(response))
36     ch.basic_ack(delivery_tag = method.delivery_tag)
37
38 # 负载均衡,同一时刻发送给该服务器的请求不超过一个
39 channel.basic_qos(prefetch_count=1)
40
41 channel.basic_consume(on_request,
42                       queue=‘rpc_queue‘)
43
44 print(" [x] Awaiting RPC requests")
45 channel.start_consuming()

客户端:

 1 import pika
 2 import uuid
 3 class FibonacciRpcClient(object):
 4     def __init__(self):
 5         self.cre_publiser = pika.PlainCredentials(‘admin‘, ‘123456‘)
 6         self.conn_para = pika.ConnectionParameters(‘192.168.170.134‘,5672,‘/‘,self.cre_publiser)
 7         self.connection = pika.BlockingConnection(self.conn_para)
 8
 9         self.channel = self.connection.channel()
10
11         result = self.channel.queue_declare(exclusive=True)
12         self.callback_queue = result.method.queue
13
14         self.channel.basic_consume(self.on_response,
15                                    no_ack=True,
16                                    queue=self.callback_queue)
17
18     def on_response(self, ch, method, props, body):
19         if self.corr_id == props.correlation_id:
20             self.response = body
21
22     def call(self, n):
23         self.response = None
24         self.corr_id = str(uuid.uuid4())
25         self.channel.basic_publish(exchange=‘‘,
26                                    routing_key=‘rpc_queue‘,
27                                    properties=pika.BasicProperties(
28                                    reply_to=self.callback_queue,
29                                    correlation_id=self.corr_id,
30                                    ),
31                                    body=str(n))
32         while self.response is None:
33             self.connection.process_data_events()
34         return int(self.response)
35
36
37 fibonacci_rpc = FibonacciRpcClient()
38
39 print(" [x] Requesting fib(6)")
40 response = fibonacci_rpc.call(6)
41 print(" [.] Got %r" % response)

时间: 2024-11-10 05:42:19

python学习-day11的相关文章

python学习day11 面向对象编程 类和实例

class Student(object): #class后面紧接着是类名,即Student,类名通常是大写开头的单词,紧接着是(object),表示该类是从哪个类继承下来的.通常,如果没有合适的继承类,就使用object类,这是所有类最终都会继承的类. def __init__(self,name,score): #通过定义一个特殊的__init__方法,在创建实例的时候,就把类的name,score等属性绑上去,__init__方法的第一个参数永远是self,表示创建的实例本身,因此,在__

python学习day11

一.命名空间和作用域 A.命名空间分为三种: 1.全局命名空间 2.局部命名空间 3.内置命名空间 三种命名空间的加载值顺序 1.全局命名空间--在运行程序时运行,不在函数内部,从上到下按顺序加载 2.局部命名空间--程序运行中,调用函数才加载 3.内置命名空间--运行程序之前已加载 三种命名空间的取值顺序 在局部调用:局部->全局->内置 在全局调用:全局->局部 B.作用域 作用域就是作用范围,按照生效范围可以分为全局作用域和局部作用域 全局作用域:包含内置名称空间.全局名称空间,在

python学习:程序控制结构·作业20141219

Python学习:程序控制结构 20141219 编程环境: windows 7 x64 python 2.7.6 题目: 1 编写程序,完成下列题目(1分) 题目内容: 如果列出10以内自然数中3或5的倍数,则包括3,5,6,9.那么这些数字的和为23.要求计算得出任意正整数n以内中3或5的倍数的自然数之和. 输入格式: 一个正整数n. 输出格式: n以内中3或5的倍数的自然数之和. 输入样例: 10 输出样例: 23 时间限制:500ms内存限制:32000kb n = int(raw_in

python学习第二天

python学习的第二天就是个灾难啊,这天被打击了,自己写的作业被否认了,不说了,写博客还是个好习惯的,要坚持下去,就不知道能坚持到什么时候.呵呵!!! 这天教的知识和第一天的知识相差不大,区别在于比第一天讲的更细了(我们是两个老师教的,风格是不一样的),这次也写那些比较细的知识点. python的简介 (1)你的程序一定要有个主文件. (2)对于python,一切事物都是对象,对象基于类创建.#似懂非懂,不过有那么点似懂. 知识点 #__divmod__ 会把两个数字相除的商和余数以元组的方式

[Python 学习] 二、在Linux平台上使用Python

这一节,主要介绍在Linux平台上如何使用Python 1. Python安装. 现在大部分的发行版本都是自带Python的,所以可以不用安装.如果要安装的话,可以使用对应的系统安装指令. Fedora系统:先以root登入,运行 yum install python Ubuntu系统:在root组的用户, 运行 sudo apt-get install python 2. 使用的Python的脚本 Linux是一个以文件为单位的系统,那么我们使用的Python是哪一个文件呢? 这个可以通过指令

python学习之最简单的用户注册及登录验证小程序

文章都是从我的个人博客上粘贴过来的哦,更多内容请点击 http://www.iwangzheng.com 正如很多同学所知道的,楼主开始学习python了,前进的道路曲曲折折,有荆棘也有陷阱,从最简单的小程序写起,每天练习,将python进行到底. 有一点比较别扭的就是python的换行之后空四个空格,ruby都是两个,并且python在方法和循环语句的第一句都要加冒号 mysql> show create table user; mysql> alter table user add sal

python学习--创建模块

昨天做了python客户端和服务器端通信,并把接收到的信息写到数据库,因为对数据库进行操作是个经常调用的行为,所以我想把调用数据库的操作写成一个module来给其它python程序调用,所以将昨天的服务器端程序拆分为两个文件: 1.主程序python.py #!/usr/bin/env python import socket import json import connmysql s = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0) h

OpenCV之Python学习笔记

OpenCV之Python学习笔记 直都在用Python+OpenCV做一些算法的原型.本来想留下发布一些文章的,可是整理一下就有点无奈了,都是写零散不成系统的小片段.现在看 到一本国外的新书<OpenCV Computer Vision with Python>,于是就看一遍,顺便把自己掌握的东西整合一下,写成学习笔记了.更需要的朋友参考. 阅读须知: 本文不是纯粹的译文,只是比较贴近原文的笔记:         请设法购买到出版社出版的书,支持正版. 从书名就能看出来本书是介绍在Pytho

Python学习day5作业-ATM和购物商城

Python学习day5作业 Python学习day5作业 ATM和购物商城 作业需求 ATM: 指定最大透支额度 可取款 定期还款(每月指定日期还款,如15号) 可存款 定期出账单 支持多用户登陆,用户间转帐 支持多用户 管理员可添加账户.指定用户额度.冻结用户等 购物车: 商品信息- 数量.单价.名称 用户信息- 帐号.密码.余额 用户可充值 购物历史信息 允许用户多次购买,每次可购买多件 余额不足时进行提醒 用户退出时 ,输出当次购物信息 用户下次登陆时可查看购物历史 商品列表分级显示 1