rabbitmq的简单介绍

python操作rabbitmq的模块叫做pika

1个生产者对应1个消费者

如果生产者发送一条消息,那么消费者也只能接受到1条消息如果生产者发送两条消息,那么消费者就可以接受到2条消息

1个生产者对应2个消费者如果生产者发送1条消息,那么只有1个消费者能接受到消息如果生产者发送2条消息,那么每个消费者都接受到1条消息

如果生产者先启动了,生产者的代码已经执行完成,程序退出,这个时候消费者才启动,消费者也能接受到消息

work queue如果一个生产者对应多个消费者,那么生产者会依次把消息轮训到多个消费者上,实现一个负载均衡的效果

但是如果rabbitmq server重启后,则队列和消息就会丢失1、队列持久化durable=True2、消息持久化delivery_mode = 2

如果申明一个queue为持久化,那么就需要在服务端和客户端都需要设置

如果申明一个消息的持久化,加一个delivery_mode = 2

消息公平分发确保每个消费者同时只能有固定的数量的任务在处理,比如这个固定的任务是1或者2或者3之类的这个需要在消费者端设置,设置我这个消费者同时只能处理几个任务设置方法如下,这个1就是同时只能处理一个任务channel.basic_qos(prefetch_count=1)

发布订阅前面的一条消息只能被一个客户端消费,那么发布和订阅就是一条消息可以被多个客户端消息,这里就需要用到“Exchange”,Exchange在定义的时候是有类型的,以决定到底哪些queue符合条件,可以接受消息,一个exchange可以绑定多个queuefanout:所有绑定在这个exchange上的que都会接受到消息

direct:通过routingKey和exchange决定的哪个唯一的queue可以接收消息        可以根据关键字发送,即:队列绑定关键字,发送者将根据关键字发送消息        到指定的队列中

topic:所有符合routingKey(此时可以是一个表达式)的routingKey所绑定       的queue都可以接收到消息       表达式符合说明:#号代表一个或多个字符,*代表人和字符       例如       #a.a会匹配a.a,aa.a,aaa.a       *.a会匹配a.a,b.a,c.aheaders:通过headers来决定把消息发给哪些queue

1、先看下使用rabbitmq实现最简单的通信先看生产端的代码
import pika
#操作rabbitmq的模块,官方的模块叫做pika

# 1、先连接到rabbitmq
# 2、创建一个管道
# 3、在管道中跑队列

connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#生成一个阻塞的连接,连接上rabbitmaq

test_channel = connection.channel()
#创建一个管道

test_channel.queue_declare(queue="hello2")
#在管道中申明一个队列,队列的名字就叫做hello

# test_channel.queue_declare(queue="hello",durable=True)
#设置hello这个queue为持久化,这个持久化是重启rabbitmq server的服务器重启,这个queue也不会丢失,需要在客户端和服务端都需要做持久化durable=True

#一旦申明一个queue后,没有给他持久化,就不能修改为持久化了,只有删除queue或者
#重新申明一个queue,才能把queue设置为持久化

test_channel.basic_publish(exchange="",
                           routing_key="hello2",
                           body="hello,my first python rabbitmq message 2")

# test_channel.basic_publish(exchange="",
#                            routing_key="hello",
#                            body="hello,my first python rabbitmq message 2")

#rabbitmq不能直接发数据到queue中,必须要通过一个中转器(交换器),类似于路由器一样的东西,也就是这里说的exchange,交换器的作用就是控制消息往哪个队列中发消息
#routing_key就是往哪个队列中发数据
#body就是往这个队列中发的内容
#上面这3个是规定,必须要按照上面的格式去写

print("[x] send ‘hello,my first python rabbitmq message‘")
#打印一下,这里没有特殊意义

test_channel.close()
# 关闭这个rabbitmq

 

在看接收端的代码

import pika

client_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#首先也是要连接上rabbitmq,和生产者端是一样的

channel = client_connection.channel()
#生成一个管道

channel.queue_declare(queue="hello2")
#在管道申明一个队列,在服务端已经申明了一个管道,名称为hello,这里为什么又要创建一个管道
#这里主要为了避免代码报错,如果客户端先执行,发现没有hello这个队列,则会报错,所以这里申
# 明一个队列,如果没有这个管道,则创建一个管道,如果有,就什么也不做,直接忽略了。

# def callback(body):
def callback(ch,method,properties,body):
    print("[x] Receive %r" %body)
    print("----->sh",ch)
    print("----->method",method)
    print("----->properties",properties)

#回调函数的这些参数是规定的,必须要这样定义,该函数就是在消费者收到消息后调用的函数

channel.basic_consume(callback,
                      queue="hello2",
                      no_ack=True)

#开始接受数据
#callable,回调函数,自己的定义的函数,如果收到消息,则调用callable这个函数
# 从队列hello中接受消息
#no_ack如果是True,就是不需要确认;如果no_ack为False,则如果接受端的数据接受完了,则会给服务端发一个确认消息

print("[*] waiting for message")
channel.start_consuming()
#这个时候开始接受数据,如果没有数据,则会阻塞,如果有数据,则接受数据,变为阻塞

 

2、在看如何实现序列化持久化

先看生产端的代码

import pika
#操作rabbitmq的模块,官方的模块叫做pika

# 1、先连接到rabbitmq
# 2、创建一个管道
# 3、在管道中跑队列

connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#生成一个阻塞的连接,连接上rabbitmaq

test_channel = connection.channel()
#创建一个管道

test_channel.queue_declare(queue="test_duralbe_queue",durable=True)
#durable=True的作用是申明一个持久化的序列test_duralbe_queue,如果重启rabbitmq server后,这个test_duralbe_queue还会一直保留,下次不需要在申明这个序列

#这里需要注意,重启rabbitmq server后序列还在,但是消息已经没有了。

#一旦申明一个queue后,没有给他持久化,就不能修改为持久化了,只有删除queue或者
#重新申明一个queue,才能把queue设置为持久化

test_channel.basic_publish(exchange="",
                           routing_key="test_duralbe_queue",
                           body="hello,my first python rabbitmq message 2")

# test_channel.basic_publish(exchange="",
#                            routing_key="hello",
#                            body="hello,my first python rabbitmq message 2")

#rabbitmq不能直接发数据到queue中,必须要通过一个中转器(交换器),类似于路由器一样的东西,也就是这里说的exchange,交换器的作用就是控制消息往哪个队列中发消息
#routing_key就是往哪个队列中发数据
#body就是往这个队列中发的内容
#上面这3个是规定,必须要按照上面的格式去写

print("[x] send ‘hello,my first python rabbitmq message‘")
#打印一下,这里没有特殊意义

test_channel.close()
# 关闭这个rabbitmq

 

在看接收端的代码

import pika

client_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#首先也是要连接上rabbitmq,和生产者端是一样的

channel = client_connection.channel()
#生成一个管道

channel.queue_declare(queue="test_duralbe_queue",durable=True)
#durable=True的作用是申明一个持久化的序列test_duralbe_queue,如果重启rabbitmq server后,这个test_duralbe_queue还会一直保留,下次不需要在申明这个序列

# def callback(body):
def callback(ch,method,properties,body):
    print("[x] Receive %r" %body)
    print("----->sh",ch)
    print("----->method",method)
    print("----->properties",properties)

#回调函数的这些参数是规定的,必须要这样定义,该函数就是在消费者收到消息后调用的函数

channel.basic_consume(callback,
                      queue="test_duralbe_queue",
                      no_ack=True)

#开始接受数据
#callable,回调函数,自己的定义的函数,如果收到消息,则调用callable这个函数
# 从队列hello中接受消息
#no_ack如果是True,就是不需要确认;如果no_ack为False,则如果接受端的数据接受完了,则会给服务端发一个确认消息

print("[*] waiting for message")
channel.start_consuming()
#这个时候开始接受数据,如果没有数据,则会阻塞,如果有数据,则接受数据,变为阻塞

 

3、在看如何实现消息持久化

先看生产端的代码

import pika
#操作rabbitmq的模块,官方的模块叫做pika

# 1、先连接到rabbitmq
# 2、创建一个管道
# 3、在管道中跑队列

connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#生成一个阻塞的连接,连接上rabbitmaq

test_channel = connection.channel()
#创建一个管道

test_channel.queue_declare(queue="test_duralbe_queue",durable=True)
#durable=True的作用是申明一个持久化的序列test_duralbe_queue,如果重启rabbitmq server后,这个test_duralbe_queue还会一直保留,下次不需要在申明这个序列

#这里需要注意,重启rabbitmq server后序列还在,但是消息已经没有了。

#一旦申明一个queue后,没有给他持久化,就不能修改为持久化了,只有删除queue或者
#重新申明一个queue,才能把queue设置为持久化

test_channel.basic_publish(exchange="",
                           routing_key="test_duralbe_queue",
                           body="hello,my first python rabbitmq message 3",
                           properties=pika.BasicProperties(delivery_mode=2))

#properties=pika.BasicProperties(delivery_mode=2这个意思消息持久化,如果先执行生产者,执行完成后,生产者的代码退出,rabbitmq server重启后,
#直接启动消费者,消费者还可以接收到生产者发的消息,这个就是消息持久化

#rabbitmq不能直接发数据到queue中,必须要通过一个中转器(交换器),类似于路由器一样的东西,也就是这里说的exchange,交换器的作用就是控制消息往哪个队列中发消息
#routing_key就是往哪个队列中发数据
#body就是往这个队列中发的内容
#上面这3个是规定,必须要按照上面的格式去写

print("[x] send ‘hello,my first python rabbitmq message‘")
#打印一下,这里没有特殊意义

test_channel.close()
# 关闭这个rabbitmq

 

在看接收端的代码

import pika

client_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#首先也是要连接上rabbitmq,和生产者端是一样的

channel = client_connection.channel()
#生成一个管道

channel.queue_declare(queue="test_duralbe_queue",durable=True)
#durable=True的作用是申明一个持久化的序列test_duralbe_queue,如果重启rabbitmq server后,这个test_duralbe_queue还会一直保留,下次不需要在申明这个序列

# def callback(body):
def callback(ch,method,properties,body):
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print("[x] Receive %r" %body)
    print("----->sh",ch)
    print("----->method",method)
    print("----->properties",properties)

#如果要实现消息持久化,需要在消费者端加这么一个行
# ch.basic_ack(delivery_tag=method.delivery_tag)

#回调函数的这些参数是规定的,必须要这样定义,该函数就是在消费者收到消息后调用的函数

channel.basic_consume(callback,
                      queue="test_duralbe_queue",
                      no_ack=True)

#开始接受数据
#callable,回调函数,自己的定义的函数,如果收到消息,则调用callable这个函数
# 从队列hello中接受消息
#no_ack如果是True,就是不需要确认;如果no_ack为False,则如果接受端的数据接受完了,则会给服务端发一个确认消息

print("[*] waiting for message")
channel.start_consuming()
#这个时候开始接受数据,如果没有数据,则会阻塞,如果有数据,则接受数据,变为阻塞

 

4、再看如何用rabbitmaq实现

先看生产端的代码

import pika
#操作rabbitmq的模块,官方的模块叫做pika

# 1、先连接到rabbitmq
# 2、创建一个管道
# 3、在管道中跑队列

connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#生成一个阻塞的连接,连接上rabbitmaq

test_channel = connection.channel()
#创建一个管道

test_channel.queue_declare(queue="test_duralbe_queue",durable=True)
#durable=True的作用是申明一个持久化的序列test_duralbe_queue,如果重启rabbitmq server后,这个test_duralbe_queue还会一直保留,下次不需要在申明这个序列

#这里需要注意,重启rabbitmq server后序列还在,但是消息已经没有了。

#一旦申明一个queue后,没有给他持久化,就不能修改为持久化了,只有删除queue或者
#重新申明一个queue,才能把queue设置为持久化

test_channel.basic_publish(exchange="",
                           routing_key="test_duralbe_queue",
                           body="hello,my first python rabbitmq message 3",
                           properties=pika.BasicProperties(delivery_mode=2))

#properties=pika.BasicProperties(delivery_mode=2这个意思消息持久化,如果先执行生产者,执行完成后,生产者的代码退出,rabbitmq server重启后,
#直接启动消费者,消费者还可以接收到生产者发的消息,这个就是消息持久化

#rabbitmq不能直接发数据到queue中,必须要通过一个中转器(交换器),类似于路由器一样的东西,也就是这里说的exchange,交换器的作用就是控制消息往哪个队列中发消息
#routing_key就是往哪个队列中发数据
#body就是往这个队列中发的内容
#上面这3个是规定,必须要按照上面的格式去写

print("[x] send ‘hello,my first python rabbitmq message‘")
#打印一下,这里没有特殊意义

test_channel.close()
# 关闭这个rabbitmq

  

在看接收端的代码

import pika

client_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#首先也是要连接上rabbitmq,和生产者端是一样的

channel = client_connection.channel()
#生成一个管道

channel.queue_declare(queue="test_duralbe_queue",durable=True)
#durable=True的作用是申明一个持久化的序列test_duralbe_queue,如果重启rabbitmq server后,这个test_duralbe_queue还会一直保留,下次不需要在申明这个序列

# def callback(body):
def callback(ch,method,properties,body):
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print("[x] Receive %r" %body)
    print("----->sh",ch)
    print("----->method",method)
    print("----->properties",properties)

channel.basic_qos(prefetch_count=1)

#如果要实现消息持久化,需要在消费者端加这么一个行
# ch.basic_ack(delivery_tag=method.delivery_tag)

#回调函数的这些参数是规定的,必须要这样定义,该函数就是在消费者收到消息后调用的函数

channel.basic_consume(callback,
                      queue="test_duralbe_queue",
                      no_ack=True)

#开始接受数据
#callable,回调函数,自己的定义的函数,如果收到消息,则调用callable这个函数
# 从队列hello中接受消息
#no_ack如果是True,就是不需要确认;如果no_ack为False,则如果接受端的数据接受完了,则会给服务端发一个确认消息

print("[*] waiting for message")
channel.start_consuming()
#这个时候开始接受数据,如果没有数据,则会阻塞,如果有数据,则接受数据,变为阻塞

 

5、在看如何使用rabbitmq实现全部广播

先看生产端的代码

import pika
import sys

# 如果是fanout,则不需要申明queue

client_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#首先也是要连接上rabbitmq,和生产者端是一样的

channel = client_connection.channel()

channel.exchange_declare(exchange="logs",
                         exchange_type="fanout")

message = "".join(sys.argv[1:]) or "info:hello world!"

channel.basic_publish(exchange="logs",
                      routing_key="",
                      body=message)

print("[x] send %r" % message)

client_connection.close()

  

再看接收端的代码

import pika

client_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#首先也是要连接上rabbitmq,和生产者端是一样的
channel = client_connection.channel()

channel.exchange_declare(exchange="logs",
                         exchange_type="fanout")

result = channel.queue_declare(exclusive=True)
#不指定queue的名字,rabbitmq自动随机分配一个名字,并且在这个queue会在消费者
#断开后自动被删除

queue_name = result.method.queue

channel.queue_bind(exchange="logs",
                   queue=queue_name)
#把queue绑定到exchange上,他才能接受消息

print("waiting for message")
def callback(ch,method,properties,body):
    print("receive message %r" %body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

 

6、在看使用rabbitmap实现指定的广播

先看发送端的代码

import pika
import sys

#这个的意思是,把队列绑定在一个管道中,不同的客户端把这个队列申明为不同的key,在生产端
#可以指定这个消息发送到队列中的哪些key中

#根据关键字把消息发送到指定的队列中

client_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#首先也是要连接上rabbitmq,和生产者端是一样的

channel = client_connection.channel()
#创建一个管道

channel.exchange_declare(exchange="direct_logs",
                         exchange_type="direct")

#申明管道的类型

severity = sys.argv[1] if len(sys.argv) > 1 else "info"
# 指定往哪个队列中发,如果没有指定,则往info里发

message = "".join(sys.argv[1:]) or "hello world"
channel.basic_publish(exchange="direct_logs",
                      routing_key=severity,
                      #往队列中的那个routing_key中发
                      body=message)

print("[x] send %r" % message)

client_connection.close()

  

在看接收端的代码

import pika
import sys

client_connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#首先也是要连接上rabbitmq,和生产者端是一样的
channel = client_connection.channel()

channel.exchange_declare(exchange="direct_logs",
                         exchange_type="direct")

result = channel.queue_declare(exclusive=True)
#不指定queue的名字,rabbitmq自动随机分配一个名字,并且在这个queue会在消费者
#断开后自动被删除

queue_name = result.method.queue

severities = sys.argv[1:]

if not severities:
    sys.stderr.write("Usage: %s [info] [warning ] [error]\n" %sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange="direct_logs",
                       queue=queue_name,
                       routing_key=severity)
#把队列申明为指定的路由key,客户端申明为路由key为1,那么只有服务端发送到队列中路由key为1
# 的客户端,才能收到消息

print("waiting for message")
def callback(ch,method,properties,body):
    print("receive message %r" %body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

  

				
时间: 2024-07-28 14:49:08

rabbitmq的简单介绍的相关文章

rabbitmq简单介绍

引言 你是否遇到过两个(多个)系统间需要通过定时任务来同步某些数据?你是否在为异构系统的不同进程间相互调用.通讯的问题而苦恼.挣扎?如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题.消息服务擅长于解决多系统.异构系统间的数据交换(消息通知/通讯)问题,你也可以把它用于系统间服务的相互调用(RPC).本文将要介绍的RabbitMQ就是当前最主流的消息中间件之一. RabbitMQ简介 AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层

rabbitmq 基础概念介绍

[引言] 你是否遇到过两个(多个)系统间需要通过定时任务来同步某些数据?你是否在为异构系统的不同进程间相互调用.通讯的问题而苦恼.挣扎?如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题. 消息服务擅长于解决多系统.异构系统间的数据交换(消息通知/通讯)问题,你也可以把它用于系统间服务的相互调用(RPC).本文将要介绍的RabbitMQ就是当前最主流的消息中间件之一. [RabbitMQ简介] AMQP,即Advanced Message Queuing Protocol,高级消息队列协议

RabbitMQ简单介绍及安装使用

一.RabbitMQ简单介绍 二.安装配置1.安装环境 CentOS7 server1 190.168.3.250安装包依赖[[email protected] ~]# yum -y install gcc gcc-c++ m4 ncurses-devel openssl-devel2.安装RabbitMQ 按顺序安装:3.配置[[email protected] ~]# vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.14/ebin/rabbit.a

Rabbitmq 简单介绍,安装和go客户端使用

Rabbitmq 简单介绍,安装和go客户端使用 1,消息队列介绍 1.1 什么是消息队列? 消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户.消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列互交.消息会保存在队列中,直到接收者取回它.消息队列,一般我们会简称他为MQ(Message

RabbitMQ简单介绍+Windows环境安装

文章目录 1.RabbitMQ简介2.RabbitMQ与其他MQ有什么不同3.RabbitMQ环境安装3.1 安装erlang3.2 安装rabbitmq-server4. RabbitMQ管理平台介绍 1.RabbitMQ简介 RabbitMQ 是一个由 erlang 开发的 AMQP(Advanced Message Queue )的开源实现.AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在

运维神器Chef简单介绍和安装笔记

首先大概解释一下Chef Chef有三个重要的概念:(如上图所示) 它们的合作关系大致是这样的, Workstation把资源或者说是一些要被运行的命令上传到Chef-Server上, Nodes自动通过Chef-Server拿到属于自己的执行任务到本地执行,这样可达到一个将军指挥千军万马的效果:smirk:. Chef Server 存放所有通过Workstation上传的资源,和用户等公共数据(用PostgreSQL). 可以干脆叫它为资源服务器,大家都可以与它通讯(用RabbitMQ ),

RabbitMQ安装&简单使用

.什么是RabbitMQ.详见 http://www.rabbitmq.com/. 作用就是提高系统的并发性,将一些不需要及时响应客户端且占用较多资源的操作,放入队列,再由另外一个线程,去异步处理这些队列,可极大的提高系统的并发能力. 2.安装 RabbitMQ服务:http://www.rabbitmq.com/download.html.    (安装完RabbitMQ服务后,会在Windows服务中看到.如果没有Erlang运行环境,在安装过程中会提醒先安装Erlang环境.http://

python的列表,元组和字典简单介绍

引 入 java                                   python 存取多个值:数组或list集合 ------------------------> 列表,元组 key-value格式:    Map        ------------------------>    字典 自己学习发现,java跟python这两门面向对象语言在数据类型的定义上,很多思想都是互通的,这里不说java,简单介绍一下python的列表,元组和字典. 一.列表 List: 最通

javascript的return语句简单介绍

javascript的return语句简单介绍:return语句在js中非常的重要,不仅仅具有返回函数值的功能,还具有一些特殊的用法,有个清晰的把握是非常有必要的.下面就结合实例简单介绍一下return语句的作用.一.用来返回控制和函数结果:通常情况,return语句对于一个函数是很有必要的,因为往往需要函数在一系列的代码执行后会得到一个期望的返回值,而此值就是通过return语句返回,并且将控制权返回给主调函数.语法格式: return 表达式 代码实例如下: function add(){