python2.0_s12_day10_rabbitMQ使用介绍

RabbitMQ    RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

    MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。    RabbitMQ安装
        Linux
        安装配置epel源
           $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm

        安装erlang,因为RabiitMQ用erlang语言写的
           $ yum -y install erlang

        安装RabbitMQ
           $ yum -y install rabbitmq-server
       注意:service rabbitmq-server start/stop
        MAC
        安装 http://www.rabbitmq.com/install-standalone-mac.html
    安装API
        pip install pika #pika是官方提供的,当然还有其他的
        or
        easy_install pika
        or
        源码

        https://pypi.python.org/pypi/pika
   一、实现最简单的队列通信    send端
 1         #!/usr/bin/evn python3.5
 2         #__author__:"ted.zhou"
 3         ‘‘‘
 4         zibbitMQ最简单的队列通信代码范例
 5         ‘‘‘
 6         import pika
 7
 8         connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) # 连接一个rabbitMQ,返回连接成功后的实例
 9         channel = connection.channel()      # 创建一个管道,用于传输各种队列.--连接成功后,还不能直接使用,需要在这个连接的实例中创建一个管道.
10
11         # 声明一个queue
12         # channel.queue_declare(queue=‘hello‘) # 在这个管道里声明一个队列 ,队列的名称为"hello"
13         ‘‘‘
14         使用pika连接并创建队列需要三步
15         1.使用pika.BlockingConnection() 创建一个连接
16         2.创建一个管道
17         3.声明一个队列
18         ‘‘‘
19
20         # 紧接着就可以通过这个管道发送内容了,在发送时,必须有三个参数
21         # exchege = ‘‘ 这个在发布订阅模式时,会用到,具体高级用法会提到,这里默认给‘‘,这样它内部还是会调用一个默认类型.
22         # routing_key = ‘hello‘,这里的routing_key 是选择通过哪个队列发送
23         # body = ‘Hello World!‘ 要发送的内容
24         channel.basic_publish(exchange=‘‘,
25                               routing_key=‘hello‘,   # 接收端不是这个参数,而是queue
26                               body=‘Hello World!‘)
27
28         print(" [x] Sent ‘Hello World!‘") # 生产者端打印发送信息,表示代码已经执行到这里
29         connection.close() # 关闭这个连接
    receive端
 1  #!/usr/bin/env python3.5
 2         __author__ = "ted.zhou"
 3         ‘‘‘
 4         python使用zabbitMQ实现最简单的队列通信之接收端代码范例
 5         ‘‘‘
 6
 7         import pika
 8
 9         # 使用pika模块,连接到指定的rabbitMQ服务器
10         connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘))
11
12         # 连接创建成功后,实例化一个管道
13         channel = connection.channel()
14
15         # 然后在管道中声明一个队列,表示我这个管道里可以跑 ‘hello‘这个队列,
16         # 我们在发送端声明了一个‘hello‘ 的queue,这里为什么还要声明一次 ,因为当接收端先启动的时候,此时不声明,下面代码在接收时会报错.
17         # 当然发送端如果先启动了,这里声明也不会报错.
18
19
20         channel.queue_declare(queue=‘hello‘)
21
22         # 紧接着我们就要进行接收队列里的消息,但是接收之前我们要知道这个消息我们收来做哪些操作呢,只接过来没啥意义.
23         # 所以定义一个callback函数
24
25         # 这里注意,接收端定义的callback函数,一定要带三个参数
26         # 1.ch 2.method 3.properties 4.其后才是信息主题body
27         # 前面3个参数是做什么的,暂时用不到,后面高级的用法会举例
28         def callback(ch,method,properties,body):
29             print("[x] Received %r" %body)
30
31         # 紧接着定义接收,定义完接收并不是直接就接收了,这个和发送端的basic_publish()方法不太一样,basic_publish()是直接就发送了,而接收basic_consume()方法定义后,还需要调用一个start方法
32         # 定义管道的接收方法.
33         # 参数介绍: queue 指定 接收的队列名称 , no_ack=True 是定义此接收方法是否要确认执行完成,如果为True,
34         # 说明不需要验证执行状态,也就是说当一个callback需要处理6分钟,当5分钟时程序卡死了,此消息也就没了,如果为False,5分钟卡死后,消息在队列中依然存在
35         channel.basic_consume(callback,
36                               queue=‘hello‘,   # 发送端不是这个参数,而是routing_key
37                               no_ack=True)
38         print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
39         channel.start_consuming() # 开启接收,没有就阻塞
    晋级:    二、队列持久化&消息持久化:        我们上面的例子,在发送端管道cannel中声明了‘hello‘队列.        为了避免当接收端先启动的情况下,因为发送端还未运行程序导致rabbitMQ服务中没有‘hello‘队列,导致接收端程序报错,所以在接收端中的管道也声明了‘hello‘队列        无论是发送端还是接收端在管道cannel中声明了‘hello‘队列,在rabbitMQ服务器中,你都可以通过命令查看此队列的信息:            MacBook-Pro:~ tedzhou$ sudo rabbitmqctl  list_queues            Listing queues ...            hello  0        那么问题来了,当发送端发送了很多信息在‘hello‘队列中,接收端还没启动呢,这时候所有的信息都存在hello队列,如下这种情况:            MacBook-Pro:~ tedzhou$ sudo rabbitmqctl  list_queues            Listing queues ...            hello  7        如果此时rabbitMQ服务器挂了,或者重启了,会有两个问题:1.这个‘hello‘队列还存在吗? 2.‘hello‘队列中的信息还存在吗?        我们做下测试:
        停止rabbitMQ服务
            MacBook-Pro:~ tedzhou$ sudo rabbitmqctl stop
            Stopping and halting node ‘[email protected]‘ ...
        启动rabbitMQ服务
            MacBook-Pro:~ tedzhou$ sudo rabbitmq-server
        查看rabbitMQ的队列
            MacBook-Pro:~ tedzhou$ sudo rabbitmqctl  list_queues
            Listing queues ...
        结果证明了:
            1.队列没有了
            2.消息更没有了
        整成业务中,我们肯定希望这些队列和消息能够保留下来.所以我们要解决两个问题.            1.持久化队列            2.持久化消息    1.队列持久化代码范例        要在声明队列的时候,加上队列持久化参数        channel.queue_declare(queue=‘hello‘, durable=True)    2.消息持久化代码范例        要在发送消息的代码部分,加上消息持久化的属性,delivery_mode=2就是说这个消息持久化消息,直到消费掉.(老实说delivery_mode有30多种,常用的就这一种)        channel.basic_publish(exchange=‘‘,                      routing_key="task_queue",                      body=message,                      properties=pika.BasicProperties(                         delivery_mode = 2, # make message persistent                      ))    发送端要在声明队列和发送消息中更改代码
 1         #!/usr/bin/evn python3.5
 2         #__author__:"ted.zhou"
 3         ‘‘‘
 4         zibbitMQ最简单的队列通信代码范例
 5         ‘‘‘
 6         import pika
 7
 8         connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) # 连接一个rabbitMQ,返回连接成功后的实例
 9         channel = connection.channel()      # 创建一个管道,用于传输各种队列.--连接成功后,还不能直接使用,需要在这个连接的实例中创建一个管道.
10
11         # 声明一个queue
12         # channel.queue_declare(queue=‘hello‘) # 在这个管道里声明一个队列 ,队列的名称为"hello"
13         channel.queue_declare(queue=‘hello‘,durable=True) # durable=True 设置此队列持久化属性为True
14         ‘‘‘
15         使用pika连接并创建队列需要三步
16         1.使用pika.BlockingConnection() 创建一个连接
17         2.创建一个管道
18         3.声明一个队列
19         ‘‘‘
20
21         # 紧接着就可以通过这个管道发送内容了,在发送时,必须有三个参数
22         # exchege = ‘‘ 这个在发布订阅模式时,会用到,具体高级用法会提到,这里默认给‘‘,这样它内部还是会调用一个默认类型.
23         # routing_key = ‘hello‘,这里的routing_key 是选择通过哪个队列发送
24         # body = ‘Hello World!‘ 要发送的内容
25         channel.basic_publish(exchange=‘‘,
26                               routing_key=‘hello‘,   # 接收端不是这个参数,而是queue
27                               body=‘Hello World!‘,
28                               properties=pika.BasicProperties(          # 消息持久化加入的参数
29                                       delivery_mode = 2,)
30                               )
31
32         print(" [x] Sent ‘Hello World!‘") # 生产者端打印发送信息,表示代码已经执行到这里
33         connection.close() # 关闭这个连接
    接收端1.需要在声明队列中设置持久化属性,2.它要在callback中获得接收到的数据de
 1          #!/usr/bin/env python3.5
 2         __author__ = "ted.zhou"
 3         ‘‘‘
 4         python使用zabbitMQ实现最简单的队列通信之接收端代码范例
 5         ‘‘‘
 6
 7         import pika
 8
 9         # 使用pika模块,连接到指定的rabbitMQ服务器
10         connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘))
11
12         # 连接创建成功后,实例化一个管道
13         channel = connection.channel()
14
15         # 然后在管道中声明一个队列,表示我这个管道里可以跑 ‘hello‘这个队列,
16         # 我们在发送端声明了一个‘hello‘ 的queue,这里为什么还要声明一次 ,因为当接收端先启动的时候,此时不声明,下面代码在接收时会报错.
17         # 当然发送端如果先启动了,这里声明也不会报错.
18
19
20         #channel.queue_declare(queue=‘hello‘)
21         channel.queue_declare(queue=‘hello‘,durable=True) #durable=True 设置此队列持久化属性为True
22
23
24         # 紧接着我们就要进行接收队列里的消息,但是接收之前我们要知道这个消息我们收来做哪些操作呢,只接过来没啥意义.
25         # 所以定义一个callback函数
26
27         # 这里注意,接收端定义的callback函数,一定要带三个参数
28         # 1.ch 2.method 3.properties 4.其后才是信息主题body
29         # 前面3个参数是做什么的,暂时用不到,后面高级的用法会举例
30         def callback(ch,method,properties,body):
31             print("[x] Received %r" %body)
32             time.sleep(body.count(b‘.‘))
33             print(" [x] Done")
34             ch.basic_ack(delivery_tag = method.delivery_tag)    # 获得delivery_tag,具体啥一起,老师没说,就说咱加上!
35
36         # 紧接着定义接收,定义完接收并不是直接就接收了,这个和发送端的basic_publish()方法不太一样,basic_publish()是直接就发送了,而接收basic_consume()方法定义后,还需要调用一个start方法
37         # 定义管道的接收方法.
38         # 参数介绍: queue 指定 接收的队列名称 , no_ack=True 是定义此接收方法是否要确认执行完成,如果为True,
39         # 说明不需要验证执行状态,也就是说当一个callback需要处理6分钟,当5分钟时程序卡死了,此消息也就没了,如果为False,5分钟卡死后,消息在队列中依然存在
40         channel.basic_consume(callback,
41                               queue=‘hello‘)   # 发送端不是这个参数,而是routing_key
42
43         print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
44         channel.start_consuming() # 开启接收,没有就阻塞
    我们通过查看rabbitMQ里的队列情况,来验证下是否持久化成功.
 1 首先只运行发送端程序,运行6遍.
 2         查看队列:
 3             MacBook-Pro:~ tedzhou$ sudo rabbitmqctl  list_queues
 4             Listing queues ...
 5             hello  6
 6         停掉服务:
 7             MacBook-Pro:~ tedzhou$ sudo rabbitmqctl  stop
 8             Stopping and halting node ‘[email protected]‘ ...
 9         开启服务:
10             MacBook-Pro:~ tedzhou$sudo rabbitmq-server &
11         再次查看队列:
12             MacBook-Pro:~ tedzhou$ sudo rabbitmqctl  list_queues
13             Listing queues ...
14             hello  6
    验证结果: 持久化 队列&消息成功.

    用法晋级2    三.Work Queues        在这种模式下,RabbitMQ会默认把p发的消息依次分发给各个消费者(c),跟负载均衡差不多

    消息生产者代码
 1  import pika
 2
 3         connection = pika.BlockingConnection(pika.ConnectionParameters(
 4                        ‘localhost‘))
 5         channel = connection.channel()
 6
 7         #声明queue
 8         channel.queue_declare(queue=‘task_queue‘)
 9
10         #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
11         import sys
12
13         message = ‘ ‘.join(sys.argv[1:]) or "Hello World!"
14         channel.basic_publish(exchange=‘‘,
15                               routing_key=‘task_queue‘,
16                               body=message,
17                               properties=pika.BasicProperties(
18                               delivery_mode = 2, # make message persistent
19                               ))
20         print(" [x] Sent %r" % message)
21         connection.close()
    消费者代码:
 1  import pika,time
 2
 3         connection = pika.BlockingConnection(pika.ConnectionParameters(
 4                        ‘localhost‘))
 5         channel = connection.channel()
 6
 7
 8
 9         def callback(ch, method, properties, body):
10             print(" [x] Received %r" % body)
11             time.sleep(body.count(b‘.‘))
12             print(" [x] Done")
13             ch.basic_ack(delivery_tag = method.delivery_tag)
14
15
16         channel.basic_consume(callback,
17                               queue=‘task_queue‘,
18                               )
19
20         print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
21         channel.start_consuming()
    当你多次运行一个生产者的代码,而运行3个消费者的代码,你会发现消息会轮询3个消费者程序,也就是消费者会依次接收到代码,这个就像简单的负载均衡.    那么问题来了,加入运行消费者程序的3台机器的配置不一样,好的1台,消费一条消息需要1分钟, 性能差的机器要10分钟,那么前面说到的负载均衡就会导致,差的严重影响效率.    我们在LVS这类负载均衡是可以设置权重,同样消费者在接收消息时也可以设置相应的功能,但不是权重,它比权重更人性化,它可以保证一个消费者程序,同时只能保证1个信息在消费,当然也可以设置同一时刻保证在消费2个信息

    具体实现代码如下:        生产者代码不变:
 1             #!/usr/bin/env python
 2             import pika
 3             import sys
 4
 5             connection = pika.BlockingConnection(pika.ConnectionParameters(
 6                     host=‘localhost‘))
 7             channel = connection.channel()
 8
 9             channel.queue_declare(queue=‘task_queue‘, durable=True)
10
11             message = ‘ ‘.join(sys.argv[1:]) or "Hello World!"
12             channel.basic_publish(exchange=‘‘,
13                                   routing_key=‘task_queue‘,
14                                   body=message,
15                                   properties=pika.BasicProperties(
16                                      delivery_mode = 2, # make message persistent
17                                   ))
18             print(" [x] Sent %r" % message)
19             connection.close()
        消费者代码加入channel.basic_qos(prefetch_count=1),代码如下:
 1             #!/usr/bin/env python
 2             import pika
 3             import time
 4
 5             connection = pika.BlockingConnection(pika.ConnectionParameters(
 6                     host=‘localhost‘))
 7             channel = connection.channel()
 8
 9             channel.queue_declare(queue=‘task_queue‘, durable=True)
10             print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
11
12             def callback(ch, method, properties, body):
13                 print(" [x] Received %r" % body)
14                 time.sleep(body.count(b‘.‘))
15                 print(" [x] Done")
16                 ch.basic_ack(delivery_tag = method.delivery_tag)
17
18             channel.basic_qos(prefetch_count=1)  #表示同意时刻保证客户端程序只处理一个消息
19             channel.basic_consume(callback,
20                                   queue=‘task_queue‘)
21
22             channel.start_consuming()
    rabbitMQ高级用法    四、Publish\Subscribe(消息发布\订阅)         之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,        Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息

        fanout: 所有bind到此exchange的queue都可以接收消息        direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息        topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息           表达式符号说明:#代表一个或多个字符,*代表任何字符              例:#.a会匹配a.a,aa.a,aaa.a等                 *.a会匹配a.a,b.a,c.a等             注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout 

        headers: 通过headers 来决定把消息发给哪些queue

				
时间: 2024-10-12 00:04:01

python2.0_s12_day10_rabbitMQ使用介绍的相关文章

python2.0_day16_django_url_view_models_template介绍

本节内容 Django流程介绍 Django url Django view Django models Django template Django form Django admin Django流程 Django流程介绍略 直接拷贝Alex博客内容 创建Django工程 django-admin startproject day16_site 创建工程目录 cd day16_site/ python3.5 manage.py startapp app01 进入目录创建app01程序目录简介

<转>Python3.x和Python2.x的区别介绍

1.性能Py3.0运行 pystone benchmark的速度比Py2.5慢30%.Guido认为Py3.0有极大的优化空间,在字符串和整形操作上可以取得很好的优化结果.Py3.1性能比Py2.5慢15%,还有很大的提升空间. 2.编码Py3.X源码文件默认使用utf-8编码,这就使得以下代码是合法的:>>> 中国 = 'china'>>>print(中国)china 3. 语法1)去除了<>,全部改用!=2)去除``,全部改用repr()3)关键词加入a

python2.0_s12_day12_html介绍

html 就像一个裸体的人css 就像是人穿的衣服js 就像是人做的动作一.网页文件HTML的构成 1.对应规则的选择,就如同我们写python时#!/usr/bin/env python3.5 这么一段代码一样,在HTML我们要先声明选择哪个对应规则 <!DOCTYPE html> 2.<html></html> 整个结构的声明 3.在<html></html>体内声明脑子<head> </head> 4.在<ht

IPython介绍

1. IPython介绍 ipython是一个python的交互式shell,比默认的python shell好用得多,支持变量自动补全,自动缩进,支持bash shell命令,内置了许多很有用的功能和函数.学习ipython将会让我们以一种更高的效率来使用python.同时它也是利用Python进行科学计算和交互可视化的一个最佳的平台. IPython提供了两个主要的组件: 1.一个强大的python交互式shell 2.供Jupyter notebooks使用的一个Jupyter内核(IPy

linux下的python基本介绍

[python]linux下的python安装及初步学习 linux下python的安装 尽管我的unbuntu 10.11版本已经默认更新了python的安装,但这里还是重新介绍一下如何在linux下安装   python.在网上也有一些安装教程. 摘来如下 1.下载源代码 http://www.python.org/ftp/python/2.5.2/Python-2.5.2.tar.bz2 2. 安装 $ tar –jxvf Python-2.5.2.tar.bz2 $ cd Python-

如何在CentOS6上安装Python2.7和Python3.3

原文来自http://toomuchdata.com/2014/02/16/how-to-install-python-on-centos/,个人觉得对在linux安装新版本Python是很有参考意义,因而转载,原文是英文的,本人简单翻译下,大家看懂即可,有不妥的地方请留言. 如何在CentOS 6上同时安装Python 2.7和Python 3.3 本文将介绍如何在CentOS 6上安装Python 2.7和3.3.下面以Python 2.7.6和Python 3.3.5为例进行说明,但本人实

virtualenv介绍

背景说明 本项目是基于<深入理解flask>一书,主要是用来记录学习历程和交流心得,所以写得不好请大神勿喷. 准备工作 一.virtualenv介绍 也许 Virtualenv 是你在开发中最愿意使用的,如果你在生产机器上有 shell 权限的时候,你也会愿意用上 virtualenv. virtualenv 解决了什么问题?如果你像我一样喜欢 Python 的话,有很多机会在基于 Flask 的 web 应用外的其它项目上使用 Python. 然而项目越多,越有可能在不同版本的 python

python基础1 介绍、数据类型、流程控制

一.Python介绍 python的创始人为吉多·范罗苏姆(Guido van Rossum).1989年的圣诞节期间,吉多·范罗苏姆为了在阿姆斯特丹打发时间,决心开发一个新的脚本解释程序,作为ABC语言的一种继承. 最新的TIOBE排行榜,Python赶超PHP占据第4,成为除c\c++\java之外的全球第4大最流行的编程语言! Python是什么样的语言? 按照编译型.解释型列举出最常用的几种语言的分类 编译型.解释型各自优缺点: 编译型 优点:编译器一般会有预编译的过程对代码进行优化.因

烂泥:python2.7和python3.5源码安装

本文由ilanniweb提供友情赞助,首发于烂泥行天下 想要获得更多的文章,可以关注我的微信ilanniweb 前几天在centos6.6安装ansible时,一直提示python版本不对,导致不能安装.只能手工进行手工安装python,为了以后不再添这个坑,特记录相关的安装过程. 本文以python2.7和python3.5安装为例. 一.安装python2.7 在安装python之前,我们先来安装在进行python编译时,所需要的的各种依赖软件包,如下: yum -y install xz