python运维开发之第十一天(RabbitMQ,redis)

一、RabbitMQ

python的Queue与RabbitMQ之间的理解:

python的进程或线程Queue只能python自己用。RabbitMQ队列多个应用之间共享队列,互相通信。

1、简单的实现生产者与消费者

  生产者

  (1)建立socket连接;(2)声明一个管道;(3)声明队列(queue);(4)通过管道发消息;(5)routing_key(queue名字);(6)body(内容)

  消费者

  (1)建立连接;(2)声明管道;(3)声明队列;(4)消费者声明队列(防止生产者后启动,消费者报错);(5)消费消息;(6)callback如果收到消息就调用函数处理消息 queue队列名字;

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/

import pika
#建立socket连接
connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘))
#声明一个管道
channel = connection.channel()
#声明一个队列
channel.queue_declare(queue=‘hello‘)
#通过管道发消息,routing_key 队列queue名字 ,body发送内容
channel.basic_publish(exchange=‘‘,
                      routing_key=‘hello‘,
                      body=‘Hello World! 1 2‘)
print("[x] send ‘Hello World! 1 2 ‘")
connection.close()

producer

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/

import pika,time
#建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘))
#声明一个管道
channel = connection.channel()
#声明队列,防止生产者(发送端)没开启,消费者端报错
channel.queue_declare(queue=‘hello‘)
#ch管道的内存对象地址,如果收到消息就调用函数callback,处理消息
def callbak(ch,method,properties,body):
    print("[x] Received %r " % body)
    # time.sleep(30)
#消费消息
channel.basic_consume(callbak,
                      queue=‘hello‘,
                      no_ack=True #消息有没处理,都不给生产者发确认消息
                      )
print(‘[*] Waitting for messages TO exit press ctrl+c‘)
channel.start_consuming() #开始

consumer

2、消费者对生产者,可以1对多,而且默认是轮询机制

no_ack=True如果注释掉的话,消费者端不给服务器端确认收到消息,服务器端就不会把要发的消息从队列里清除

如下图注释了no_ack,加了一个时间,

开启三个消费者,一个生产者,生产者只send一次数据,挨个停止consumer,会发现同一条消息会被重新发给下一个consumer,直到producer收到consumer的确认收到的消息

3、队列查询

清除队列消息

4、消息持久化

(1)durable只是队列持久化

channel.queue_declare(queue=‘hello‘,durable=True)

生产者和消费者都需要添加durable=True

(2)要实现消息持久化,还需要

5、消息(1对多)实现权重功能

消费者端添加在消费消息之前

channel.basic_qos(prefetch_count=1)

6、广播消息fanout(纯广播)订阅发布

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘logs‘,
                         type=‘fanout‘)
#message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!"
message = "info: Hello World!2"

channel.basic_publish(exchange=‘logs‘,
                      routing_key=‘‘,
                      body=message)
print(" [x] Sent %r" % message)

connection.close()

fanout_producer

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘logs‘,
                         type=‘fanout‘)
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
print("random queuename",queue_name)

channel.queue_bind(exchange=‘logs‘,
                   queue=queue_name)

print(‘ [*] Waiting for logs. To exit press CTRL+C‘)

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

fanout_consumer

7、direct广播模式(有选择性的发送接收消息)

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘direct_logs‘,
                         type=‘direct‘)

severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘
message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘
channel.basic_publish(exchange=‘direct_logs‘,
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

direct_producer

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘direct_logs‘,
                         type=‘direct‘)

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange=‘direct_logs‘,
                       queue=queue_name,
                       routing_key=severity)

print(severities)
print(‘ [*] Waiting for logs. To exit press CTRL+C‘)

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

direct_consumer

8、更细致的消息判断 type = topic

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘topic_logs‘,
                         type=‘topic‘)

routing_key = sys.argv[1] if len(sys.argv) > 1 else ‘anonymous.info‘
message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘
channel.basic_publish(exchange=‘topic_logs‘,
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

topic_producer

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : Willpower-chen
# @blog: http://www.cnblogs.com/willpower-chen/

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘topic_logs‘,
                         type=‘topic‘)

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange=‘topic_logs‘,
                       queue=queue_name,
                       routing_key=binding_key)

print(‘ [*] Waiting for logs. To exit press CTRL+C‘)

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

topic_consumer


 
 
时间: 2024-08-28 14:41:04

python运维开发之第十一天(RabbitMQ,redis)的相关文章

python运维开发笔记4

1.函数如何被调用,通过return返回值来调用 2.生成器和return区别 yield 生成器返回对象,可以迭代 可以执行 glob模块 类似shell中的正则匹配 shlex模块  Popen 将命令参数直接分词 cmd = "ps ax -o pid,ppid,cmd" shlex.split(cmd) ['ps','ax','-o','pid,ppid,cmd'] ['mysql','-u','root','-p123','-e','show processlist'] p

python运维开发笔记5

diff -Nur Diff和patch是Linux标配的工具.在Windows上,也有移植的版本可以使用. 使用中注意: 1.保证文件名完全相同,避免只有大小写不同的文件存在. 2.Diff只能比较文本文件,二进制文件只能判断是否相同,不能记录差异.Diff根据文件的前几个字节判断文件是不是文本文件. 使用方法: 可以对单个文件或者整个目录树进行处理. 以最实用的方式举例: diff –Nur dir_old dir_new > dir.patch patch –p0 < dir.patch

Python运维开发基础

Python基础知识分为以下几块 1.Python概述 2.基础语法 3.数据结构 4.Python进阶 5.实训案例 一.Python概述 1.Python简介 2.Hello World 3.搭建开发环境 4.习题 Python简介 尽管我是学计算机出身的,但是我对Python的认识是在毕业后.Python是我喜欢的语言,简洁,优美,容易使用.重要的一点是他是开源的项目. 官方网站 https://www.python.org 学习网站 http://www.okpython.com 对于简

Python运维开发基础01-语法基础【转】

开篇导语 整个Python运维开发教学采用的是最新的3.5.2版,当遇到2.x和3.x版本的不同点时,会采取演示的方式,让同学们了解. 教学预计分为四大部分,Python开发基础,Python开发进阶,Python网页编程,Python项目实战 Python开发基础分为语法基础篇,文件基础篇,函数基础篇,模块基础篇 语法基础篇中,我着重希望训练同学的是作为开发应该具备的一种逻辑思路. 文件基础篇中,我们需要练习的是如何将数据永久性的存储在硬盘上,提供读,写. 函数基础篇中,我重点是要引导同学们构

重磅|0元学 Python运维开发,别再错过了

51reboot 运维开发又双叒叕的搞活动了,鉴于之前 51reboot 的活动反馈,每次活动结束后(或者已经结束了很长时间)还有人在问活动的事情.这一次小编先声明一下真的不想在此次活动结束后再听到类似下面的话了 我之前不知道有活动 (这个锅小编我背了) 这个活动还有吗? 我想了解一下这个活动 跟我说可以给我按活动价算吗? .......... 针对以上类似的询问小编在此统一回复大家: 好了,接下来进入今天的重中之重 本次活动:Python 运维开发——18天训练营 本课程为:网络班+面授班(北

python运维开发常用模块(一)psutil

1.模块简介 psutil是一个跨平台库(http://code.google.com/p/psutil/),能够轻 松实现获取系统运行的进程和系统利用率(包括CPU.内存.磁盘.网 络等)信息.它主要应用于系统监控,分析和限制系统资源及进程的管 理.它实现了同等命令行工具提供的功能,如ps.top.lsof.netstat. ifconfig.who.df.kill.free.nice.ionice.iostat.iotop.uptime. pidof.tty.taskset.pmap等.目前

python运维开发常用模块(四)文件对比模块difflib

1.difflib介绍 difflib作为 Python的标准库模块,无需安装,作用是对比文本之间的差异,且支持 输出可读性比较强的HTML文档,与Linux下的diff命令相似.我们可以 使用difflib对比代码.配置文件的差别,在版本控制方面是非常有用. Python 2.3或更高版本默认自带difflib模块,无需额外安装. 示例1:两个字符串的差异对比 [[email protected] part2]$ cat simple1.py #!/usr/bin/python #_*_cod

python运维开发(二十一)----文件上传和验证码+session

内容目录: 文件上传 HTML Form表单提交 ajax提交 原生ajax提交 jQuery Ajax提交 验证码+session 文件和图片的上传功能

python运维开发(七)----面向对象(上)

内容目录: 面向对象应用场景 类和对象的创建 类中的__init__构造方法 self理解 面向对象的三大特性:封装.继承.多态 概述 面向过程:根据业务逻辑从上到下写垒代码 函数式:将某功能代码封装到函数中,日后便无需重复编写,仅调用函数即可 面向对象:对函数进行分类和封装,让开发“更快更好更强...” 面向过程编程最易被初学者接受,其往往用一长段代码来实现指定功能,开发过程中最常见的操作就是粘贴复制,即:将之前实现的代码块复制到现需功能处. 面向对象应用场景 当某一些函数具有相同参数时,可以