rabbitmq redis

RabbitMQ

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。

对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列

用rabbitmq实现一个简单的生产者消费者模型

发送端代码

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(‘192.168.1.25‘))
channel = connection.channel()

channel.queue_declare(queue="hello")

channel.basic_publish(exchange=‘‘,
                     routing_key = ‘hello‘,
                     body=‘hello world‘,
)
print("Send hello world")
connection.close()

接收端代码

 1 import pika
 2
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(‘192.168.1.25‘))
 4 channel = connection.channel()
 5 channel.queue_declare(queue="hello")
 6
 7 def callback(ch,method,properties,body):
 8     print(ch,method,properties)
 9     print("received %s" %body)
10
11 channel.basic_consume(callback,
12                       queue=‘hello‘,
13                       no_ack=True)
14
15 print("waiting for messages to exit press ‘CTRL+C‘")
16 channel.start_consuming()

通过上述代码便可以实现一个简单的生产者消费者模型,但是现在的结果是:当开启多个消费者程序的时候,启动生产者发送消息,这个时候只有一个可以收到,并且再次启动,会下一个消费者收到,类似一个轮询的关系。

acknowledgment 消息不丢失(通过客户端设置实现)

通过no_ack = False参数设置,如果消费者遇到情况突然中断了没有收到,那么RabbitMQ会重新将任务添加到队列中

下面将接收端的代码进行更改:

#AUTHOR:FAN
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(‘192.168.1.25‘))
channel = connection.channel()
channel.queue_declare(queue="hello")

def callback(ch,method,properties,body):
    print(ch,method,properties)
    time.sleep(10)
    print("received %s" %body)

channel.basic_consume(callback,
                      queue=‘hello‘,
                      no_ack=False)

print("waiting for messages to exit press ‘CTRL+C‘")
channel.start_consuming()

标注的地方就是代码修改的地方,通过将no_ack更改为False,以及在callback回到函数这里让等待10s,这样启动接收端后,再启动发送算,在还没有打印数据的时候将客户端关闭,然后再启动,发现依然可以收到刚才发送端发送的数据。

但是这种方式只能实现客户端断开重新连接的时候数据不丢失,如果是rabbitmq挂了的情况如何解决?

durable消息不丢失(通过在服务端设置保证数据不丢失)

这个时候生产者和消费者的代码都需要改动

发送者代码

 1 import pika
 2
 3
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(‘192.168.1.25‘))
 5 channel = connection.channel()
 6
 7 channel.queue_declare(queue=‘fan‘,durable=True)
 8
 9 channel.basic_publish(exchange=‘‘,
10                       routing_key=‘fan‘,
11                       body=‘hello world‘,
12                       properties = pika.BasicProperties(
13                           delivery_mode=2
14                       ))
15
16 print("send ‘hello world‘")
17 connection.close()

接收者的代码

 1 import pika
 2 import time
 3
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(‘192.168.1.25‘))
 5 channel = connection.channel()
 6
 7 channel.queue_declare(queue=‘fan‘,durable=True)
 8
 9 def callback(ch,method,properies,body):
10     print("received %s" %body)
11     time.sleep(10)
12     print("is ok")
13     ch.basic_ack(delivery_tag=method.delivery_tag)
14
15 channel.basic_consume(callback,
16                       queue=‘fan‘,
17                       no_ack=False)
18
19 print("waitting for messages.To exit press CTRL+C")
20 channel.start_consuming()

这样即使在接收者接收数据过程中rabbitmq服务器出现问题了,在服务恢复之后,依然可以收到数据

发布订阅

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

通过exchange type = fanout参数实现

代码例子:

发布者:

 1 #AUTHOR:FAN
 2
 3 import pika
 4 import sys
 5
 6 connection = pika.BlockingConnection(pika.ConnectionParameters(‘192.168.8.103‘))
 7 channel = connection.channel()
 8
 9 channel.exchange_declare(exchange="fan",
10                          type=‘fanout‘)
11
12 message = ‘ ‘.join(sys.argv[1:]) or "info :hello world"
13 channel.basic_publish(exchange = ‘fan‘,
14                       routing_key=‘‘,
15                       body=message)
16
17 print("send %s" %message)
18 connection.close()

订阅者:

#AUTHOR:FAN

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(‘192.168.8.103‘))

channel = connection.channel()

channel.exchange_declare(exchange="fan",
                         type=‘fanout‘)

#随机生成队列名字
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

#将exchange和队列绑定
channel.queue_bind(exchange=‘fan‘,
                   queue=queue_name)

print("waiting for fan ,To exit press CTRL+C")
def callback(ch,method,proerties,body):
    print("---",body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

关键字发送

通过参数:exchange type = direct实现

之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

代码例子如下:

消费者代码:

 1 #AUTHOR:FAN
 2 import pika
 3 import sys
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(‘192.168.1.25‘))
 5 channel = connection.channel()
 6 channel.exchange_declare(exchange=‘direct_logs_1‘,
 7                          type=‘direct‘)
 8 result = channel.queue_declare(exclusive=True)
 9 queue_name = result.method.queue
10
11 severities = sys.argv[1:]
12 if not severities:
13     sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
14     exit(1)
15 print(severities)
16 for severity in severities:
17     print(severity)
18     channel.queue_bind(exchange=‘direct_logs_1‘,
19     queue=queue_name,
20     routing_key=severity)
21 print("waiting for logs,To exit press CTRL+C")
22 def callback(ch,method,properties,body):
23     print("%s:%s" %(method.routing_key,body))
24
25 channel.basic_consume(callback,
26                       queue=queue_name,
27                       no_ack=True)
28 channel.start_consuming()

生产者代码

 1 import pika
 2 import sys
 3
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(‘192.168.1.25‘))
 5 channel = connection.channel()
 6
 7 channel.exchange_declare(exchange=‘direct_logs_1‘,
 8                          type=‘direct‘)
 9
10 print(sys.argv)
11 severity = sys.argv[1] if len(sys.argv) >1 else "error"
12 message = ‘ ‘.join(sys.argv[2:]) or ‘hello world‘
13 channel.basic_publish(exchange=‘direct_logs_1‘,
14                       routing_key = severity,
15                       body = message)
16 print("send %s:%s" %(severity,message))
17 connection.close()

模糊匹配

通过参数exchange type = topic实现

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

# 表示可以匹配 0 个 或 多个 单词

*  表示只能匹配 一个 单词

--------------------还没有整理完

时间: 2024-10-14 08:06:43

rabbitmq redis的相关文章

使用python操作RabbitMQ,Redis,Memcache,SQLAlchemy 其二

一.概念 1.Memcached Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度.Memcached基于一个存储键/值对的hashmap.其守护进程(daemon )是用C写的,但是客户端可以用任何语言来编写,并通过memcached协议与守护进程通信. 2.RabbitMQ RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统.他遵循Mozilla

Python操作rabbitmq redis memcache SQLalchemy

Memcached Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度.Memcached基于一个存储键/值对的hashmap.其守护进程(daemon )是用C写的,但是客户端可以用任何语言来编写,并通过memcached协议与守护进程通信. Memcached安装和基本使用 Memcached安装: 1 2 3 4 5 6 7 8 wget http://memc

消息队列介绍、RabbitMQ&Redis的重点介绍与简单应用

消息队列介绍.RabbitMQ.Redis 一.什么是消息队列 这个概念我们百度Google能查到一大堆文章,所以我就通俗的讲下消息队列的基本思路. 还记得原来写过Queue的文章,不管是线程queue还是进程queue他都是一种消息队列.他都是基于生产者消费者模型来处理消息. Python中的进程queue,是用于父进程与子进程,或者同属于一个父进程下的多个子进程之间进行信息交互.注意这种queue只能在同一个python程序下才能用,如果两个python程序,或者Python和别的什么程序,

SpringBoot+RabbitMQ+Redis实现商品秒杀

业务分析 一般而言,商品秒杀大概可以拆分成以下几步: 用户校验 校验是否多次抢单,保证每个商品每个用户只能秒杀一次 下单 订单信息进入消息队列,等待消费 减少库存 消费订单消息,减少商品库存,增加订单记录 付款 十五分钟内完成支付,修改支付状态 创建表 goods_info 商品库存表 列 说明 id 主键(uuid) goods_name 商品名称 goods_stock 商品库存 package com.jason.seckill.order.entity; /** * 商品库存 */ pu

那些年被我坑过的Python——第十章Broker(rabbitMQ/redis)

基于RabbitMQ的direct任务驱动异步RPC程序实现: RPC_dispatcher指令分发器: 1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 __Author__ = "Zhang Xuyao" 4 5 import pika 6 import uuid 7 import time 8 import threading 9 10 11 class RpcDispatcher(object): 12 def __init_

RabbitMQ(转)

add by zhj: 如果用Python,那可以用celery,它是一个分布式任务队列,它的broker可以选择Rabbitmq/Redis/Mongodb等, celery通过Kombu这个library来调用Rabbitmq的接口.我们可以认为Kombu是把Rabbitmq进行了封装,使其更符合Python的 风格,当然,Kombu也对其它非AMQ协议的消息进行了封装.可以使用rabbitmqctl list_bindings查看queue与exchange的绑定 情况. 英文原文:htt

python3+celery+redis实现异步任务

一.原理 Celery是基于Python开发的一个分布式任务队列框架,支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度.它是Python写的库,但是它实现的通讯协议也可以使用ruby,php,javascript等调用.异步任务除了消息队列的后台执行的方式,还是一种则是定时计划任务. Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行.我们通常使用它来实现异步任务(async task)和定时任务(crontab).它的架构组成

老男孩教育2016年linux运维在线教学课程大纲

老男孩教育2016年linux全科班大纲  linux运维全科班在线课程地址(直播加录播) http://edu.51cto.com/px/train/41   linux运维就业班在线课程地址(直播加录播) http://edu.51cto.com/px/train/40 课程详情: 本课程包含老男孩教育linux运维就业班及高级架构师班全部内容,更有python自动化基础课程. linux运维就业班课程: 核心课程至少经过8年锤炼,历经近30期讲解次数,体系完整,内容完善重点分明:包括有li

异步任务利器Celery(一)介绍

django项目开发中遇到过一些问题,发送请求后服务器要进行一系列耗时非常长的操作,用户要等待很久的时间.可不可以立刻对用户返回响应,然后在后台运行那些操作呢? crontab定时任务很难达到这样的要求 ,异步任务是很好的解决方法,有一个使用python写的非常好用的异步任务工具Celery. broker.worker和backend Celery的架构由三部分组成,消息中间件(broker),任务执行单元(worker)和任务执行结果存储(result backends)组成. 应用程序调用