利用RabbitMQ实现RPC(python)

RPC——远程过程调用,通过网络调用运行在另一台计算机上的程序的函数\方法,是构建分布式程序的一种方式。RabbitMQ是一个消息队列系统,可以在程序之间收发消息。利用RabbitMQ可以实现RPC。本文所有操作都是在CentOS7.3上进行的,示例代码语言为Python。

RabbiMQ以及pika模块安装


yum install rabbitmq-server python-pika -y

systemctl    start rabbitmq-server

RPC的基本实现

RPC的服务端代码如下:


#!/usr/bin/env   python

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fun(n):

return 2*n

def on_request(ch, method, props, body):

n = int(body)

response = fun(n)

ch.basic_publish(exchange='',

routing_key=props.reply_to,

properties=pika.BasicProperties(correlation_id = props.correlation_id),

body=str(response))

ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(on_request, queue='rpc_queue')

print(" [x] Awaiting RPC requests")

channel.start_consuming()

以上代码中,首先与RabbitMQ服务建立连接,然后定义了一个函数fun(),fun()功能很简单,输入一个数然后返回该数的两倍,这个函数就是我们要远程调用的函数。on_request()是一个回调函数,它作为参数传递给了basic_consume(),当basic_consume()在队列中消费1条消息时,on_request()就会被调用,on_request()从消息内容body中获取数字,并传给fun()进行计算,并将返回值作为消息内容发给调用方指定的接收队列,队列名称保存在变量props.reply_to中。

RPC的客户端代码如下:


#!/usr/bin/env   python

import pika

import uuid

class RpcClient(object):

def __init__(self):

self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

self.channel = self.connection.channel()

result = self.channel.queue_declare(exclusive=True)

self.callback_queue = result.method.queue

self.channel.basic_consume(self.on_response, no_ack=True,

queue=self.callback_queue)

def on_response(self, ch, method, props, body):

if self.corr_id == props.correlation_id:

self.response = body

def call(self,n):

self.response = None

self.corr_id = str(uuid.uuid4())

self.channel.basic_publish(exchange='',

routing_key='rpc_queue',

properties=pika.BasicProperties(

reply_to = self.callback_queue,

correlation_id = self.corr_id,

),

body=str(n))

while self.response is None:

self.connection.process_data_events()

return str(self.response)

rpc = RpcClient()

print(" [x] Requesting")

response = rpc.call(2)

print(" [.] Got %r" % response)

代码开始也是连接RabbitMQ,然后开始消费消息队列callback_queue中的消息,该队列的名字通过Request的属性reply_to传递给服务端,就是在上面介绍服务端代码时提到过的props.reply_to,作用是告诉服务端把结果发到这个队列。 basic_consume()的回调函数变成了on_response(),这个函数从callback_queue的消息内容中获取返回结果。

函数call实际发起请求,把数字n发给服务端程序,当response不为空时,返回response值。

下面看运行效果,先启动服务端:

在另一个窗口中运行客户端:

成功调用了服务端的fun()并得到了正确结果(fun(2)结果为4)。

总结:RPC的实现过程可以用下图来表示(图片来自RabbitMQ官网):

当客户端启动时,它将创建一个callback queue用于接收服务端的返回消息Reply,名称由RabbitMQ自动生成,如上图中的amq.gen-Xa2..。同一个客户端可能会发出多个Request,这些Request的Reply都由callback queue接收,为了互相区分,就引入了correlation_id属性,每个请求的correlation_id值唯一。这样,客户端发起的Request就带由2个关键属性:reply_to告诉服务端向哪个队列返回结果;correlation_id用来区分是哪个Request的返回。

稍微复杂点的RPC

如果服务端定义了多个函数供远程调用怎么办?有两种思路,一种是利用Request的属性app_id传递函数名,另一种是把函数名通过消息内容发送给服务端。

1.我们先实现第一种,服务端代码如下:


#!/usr/bin/env   python

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def a():

return "a"

def b():

return "b"

def on_request(ch, method, props, body):

funname = props.app_id

if funname == "a":

response = a()

elif funname == "b":

response = b()

ch.basic_publish(exchange='',

routing_key=props.reply_to,

properties=pika.BasicProperties(correlation_id = \

props.correlation_id),

body=str(response))

ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(on_request, queue='rpc_queue')

print(" [x] Awaiting RPC requests")

channel.start_consuming()

这次我们定义了2个不同函数a()和b(),分别打印不同字符串,根据接收到的app_id来决定调用哪一个。

客户端代码:


#!/usr/bin/env   python

import pika

import uuid

class RpcClient(object):

def __init__(self):

self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

self.channel = self.connection.channel()

result = self.channel.queue_declare(exclusive=True)

self.callback_queue = result.method.queue

self.channel.basic_consume(self.on_response, no_ack=True,

queue=self.callback_queue)

def on_response(self, ch, method, props, body):

if self.corr_id == props.correlation_id:

self.response = body

def call(self,name):

self.response = None

self.corr_id = str(uuid.uuid4())

self.channel.basic_publish(exchange='',

routing_key='rpc_queue',

properties=pika.BasicProperties(

reply_to = self.callback_queue,

correlation_id = self.corr_id,

app_id = str(name),

),

body="request")

while self.response is None:

self.connection.process_data_events()

return str(self.response)

rpc = RpcClient()

print(" [x] Requesting")

response = rpc.call("b")

print(" [.] Got %r" % response)

函数call()接收参数name作为被调用的远程函数的名字,通过app_id传给服务端程序,这段代码里我们选择调用服务端的函数b(),rpc.call(“b”)。

执行结果:

结果显示成功调用了函数b,如果改成rpc.call(“a”),执行结果就会变成:

2.第二种实现方法,服务端代码:


#!/usr/bin/env   python

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def a():

return "a"

def b():

return "b"

def on_request(ch, method, props, body):

funname = str(body)

if funname == "a":

response = a()

elif funname == "b":

response = b()

ch.basic_publish(exchange='',

routing_key=props.reply_to,

properties=pika.BasicProperties(correlation_id = \

props.correlation_id),

body=str(response))

ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(on_request, queue='rpc_queue')

print(" [x] Awaiting RPC requests")

channel.start_consuming()

客户端代码:


#!/usr/bin/env   python

import pika

import uuid

class RpcClient(object):

def __init__(self):

self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

self.channel = self.connection.channel()

result = self.channel.queue_declare(exclusive=True)

self.callback_queue = result.method.queue

self.channel.basic_consume(self.on_response, no_ack=True,

queue=self.callback_queue)

def on_response(self, ch, method, props, body):

if self.corr_id == props.correlation_id:

self.response = body

def call(self,name):

self.response = None

self.corr_id = str(uuid.uuid4())

self.channel.basic_publish(exchange='',

routing_key='rpc_queue',

properties=pika.BasicProperties(

reply_to = self.callback_queue,

correlation_id = self.corr_id,

),

body=str(name))

while self.response is None:

self.connection.process_data_events()

return str(self.response)

rpc = RpcClient()

print(" [x] Requesting")

response = rpc.call("b")

print(" [.] Got %r" % response)

与第一种实现方法的区别就是没有使用属性app_id,而是把要调用的函数名放在消息内容body中,执行结果跟第一种方法一样。

一个简单的实际应用案例

下面我们将编写一个小程序,用于收集多台KVM宿主机上的虚拟机数量和剩余可使用的资源。程序由两部分组成,运行在每台宿主机上的脚本agent.py和管理机上收集信息的脚本collect.py。从RPC的角度,agent.py是服务端,collect.py是客户端。

agent.py代码如下:


#!/usr/bin/python

import pika

import libvirt

import psutil

import json

import socket

import os

import sys

from xml.dom import minidom

#配置RabbitMQ地址

RabbitMQServer=x.x.x.x

#连接libvirt,libvirt是一个虚拟机、容器管理程序。

def get_conn():

conn = libvirt.open("qemu:///system")

if conn == None:

print '--Failed to open connection to   QEMU/KVM--'

sys.exit(2)

else:

return conn

#获取虚拟机数量

def getVMcount():

conn = get_conn()

domainIDs = conn.listDomainsID()

return len(domainIDs)

#获取分配给所有虚拟机的内存之和

def getMemoryused():

conn = get_conn()

domainIDs = conn.listDomainsID()

used_mem = 0

for id in domainIDs:

dom = conn.lookupByID(id)

used_mem += dom.maxMemory()/(1024*1024)

return used_mem

#获取分配给所有虚拟机的vcpu之和

def getCPUused():

conn = get_conn()

domainIDs = conn.listDomainsID()

used_cpu = 0

for id in domainIDs:

dom = conn.lookupByID(id)

used_cpu += dom.maxVcpus()

return used_cpu

#获取所有虚拟机磁盘文件大小之和

def getDiskused():

conn = get_conn()

domainIDs = conn.listDomainsID()

diskused = 0

for id in domainIDs:

dom = conn.lookupByID(id)

xml = dom.XMLDesc(0)

doc = minidom.parseString(xml)

disks = doc.getElementsByTagName('disk')

for disk in disks:

if disk.getAttribute('device') == 'disk':

diskfile = disk.getElementsByTagName('source')[0].getAttribute('file')

diskused += dom.blockInfo(diskfile,0)[0]/(1024**3)

return diskused

#使agent.py进入守护进程模式

def daemonize(stdin='/dev/null',stdout='/dev/null',stderr='/dev/null'):

try:

pid = os.fork()

if pid > 0:

sys.exit(0)

except OSError,e:

sys.stderr.write("fork #1 failed: (%d) %s\n" % (e.errno,e.strerror))

sys.exit(1)

os.chdir("/")

os.umask(0)

os.setsid()

try:

pid = os.fork()

if pid > 0:

sys.exit(0)

except OSError,e:

sys.stderr.write("fork #2 failed: (%d) %s\n" % (e.errno,e.strerror))

sys.exit(1)

for f in sys.stdout,sys.stderr,: f.flush()

si = file(stdin,'r')

so = file(stdout,'a+',0)

se = file(stderr,'a+',0)

os.dup2(si.fileno(),sys.stdin.fileno())

os.dup2(so.fileno(),sys.stdout.fileno())

os.dup2(se.fileno(),sys.stderr.fileno())

daemonize('/dev/null','/root/kvm/agent.log','/root/kvm/agent.log')

#连接RabbitMQ

connection = pika.BlockingConnection(pika.ConnectionParameters(host= RabbitMQServer))

channel = connection.channel()

channel.exchange_declare(exchange='kvm',type='fanout')

result = channel.queue_declare(exclusive=True)

queue_name = result.method.queue

channel.queue_bind(exchange='kvm',queue=queue_name)

def on_request(ch,method,props,body):

sys.stdout.write(body+'\n')

sys.stdout.flush()

mem_total = psutil.virtual_memory()[0]/(1024*1024*1024)

cpu_total = psutil.cpu_count()

statvfs = os.statvfs('/datapool')

disk_total = (statvfs.f_frsize * statvfs.f_blocks)/(1024**3)

mem_unused = mem_total - getMemoryused()

cpu_unused = cpu_total - getCPUused()

disk_unused = disk_total - getDiskused()

data = {

'hostname':socket.gethostname(),#宿主机名

'vm':getVMcount(),#虚拟机数量

'available memory':mem_unused,#可用内存

'available cpu':cpu_unused,#可用cpu核数

'available disk':disk_unused#可用磁盘空间

}

json_str = json.dumps(data)

ch.basic_publish(exchange='',

routing_key=props.reply_to,

properties=pika.BasicProperties(correlation_id=props.correlation_id),

body=json_str

)

ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(on_request,queue=queue_name)

sys.stdout.write(" [x] Awaiting RPC requests\n")

sys.stdout.flush()

channel.start_consuming()

collect.py代码如下:


#!/usr/bin/python

import pika

import uuid

import json

import datetime

#配置RabbitMQ地址

RabbitMQServer=x.x.x.x

class RpcClient(object):

def __init__(self):

self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=RabbitMQServer))

self.channel = self.connection.channel()

self.channel.exchange_declare(exchange='kvm',type='fanout')

result = self.channel.queue_declare(exclusive=True)

self.callback_queue = result.method.queue

self.channel.basic_consume(self.on_responses,no_ack=True,queue=self.callback_queue)

self.responses = []

def on_responses(self,ch,method,props,body):

if self.corr_id == props.correlation_id:

self.responses.append(body)

def call(self):

timestamp = datetime.datetime.strftime(datetime.datetime.now(),'%Y-%m-%dT%H:%M:%SZ')

self.corr_id = str(uuid.uuid4())

self.channel.basic_publish(exchange='kvm',

routing_key='',

properties=pika.BasicProperties(

reply_to = self.callback_queue,

correlation_id = self.corr_id,

),

body='%s: receive a request' % timestamp

)

#定义超时回调函数

def outoftime():

self.channel.stop_consuming()

self.connection.add_timeout(30,outoftime)

self.channel.start_consuming()

return self.responses

rpc = RpcClient()

responses = rpc.call()

for i in responses:

response = json.loads(i)

print(" [.] Got %r" % response)

本文在前面演示的RPC都是只有一个服务端的情况,客户端发起请求后是用一个while循环来阻塞程序以等待返回结果的,当self.response不为None,就退出循环。

如果在多服务端的情况下照搬过来就会出问题,实际情况中我们可能有几十台宿主机,每台上面都运行了一个agent.py,当collect.py向几十个agent.py发起请求时,收到第一个宿主机的返回结果后就会退出上述while循环,导致后续其他宿主机的返回结果被丢弃。这里我选择定义了一个超时回调函数outoftime()来替代之前的while循环,超时时间设为30秒。collect.py发起请求后阻塞30秒来等待所有宿主机的回应。如果宿主机数量特别多,可以再调大超时时间。

脚本运行需要使用的模块pika和psutil安装过程:


yum install -y python-pip python-devel

pip install pika

wget --no-check-certificate https://pypi.python.org/packages/source/p/psutil/psutil-2.1.3.tar.gz

tar zxvf psutil-2.1.3.tar.gz

cd psutil-2.1.3/ && python setup.py install

脚本运行效果演示:

原文地址:http://blog.51cto.com/3646344/2097020

时间: 2024-08-30 08:01:57

利用RabbitMQ实现RPC(python)的相关文章

rabbitmq学习(四):利用rabbitmq实现远程rpc调用

一.rabbitmq实现rpc调用的原理 ·rabbitmq实现rpc的原理是:客户端向一个队列中发送消息,并注册一个回调的队列用于接收服务端返回的消息,该消息需要声明一个叫做correaltionId的属性,该属性将是该次请求的唯一标识.服务端在接受到消息(在需要时可以验证correaltionId)后,处理消息,并将消息发送到客户端注册的回调队列中.原理图如下: 二.代码实现 下面我们将模拟实现一个rpc客户端和rpc服务端.客户端给服务端发送message,服务端收到后处理message,

RabbitMQ 实现RPC

实现RPC 首先要弄明白,RPC是个什么东西. (RPC) Remote Procedure Call Protocol 远程过程调用协议 在一个大型的公司,系统由大大小小的服务构成,不同的团队维护不同的代码,部署在不同的机器.但是在做开发时候往往要用到其它团队的方法,因为已经有了实现.但是这些服务部署不同的机器上,想要调用就需要网络通信,这些代码繁琐且复杂,一不小心就会写的很低效.RPC协议定义了规划,其它的公司都给出了不同的实现.比如微软的wcf,以及现在火热的WebApi. 在Rabbit

RabbitMQ中RPC的实现及其通信机制

RabbitMQ中RPC的实现:客户端发送请求消息,服务端回复响应消息,为了接受响应response,客户端需要发送一个回调队列的地址来接受响应,每条消息在发送的时候会带上一个唯一的correlation_id,相应的服务端处理计算后会将结果返回到对应的correlation_id. RPC调用流程: 当生产者启动时,它会创建一个匿名的独占回调队列,对于一个RPC请求,生产者发送一条具有两个属性的消息:reply_to(回调队列),correlation_id(每个请求的唯一值),请求被发送到r

RabbitMQ(四):RPC的实现

原文:RabbitMQ(四):RPC的实现 一.RPC RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议.有很多方式可以实现,譬如UNIX RPC.REST API.WCF和SOAP.这些传统的RPC实现方法有共同之处:那就是客户端和服务器端紧密相连,客户端直接连接上服务器,发送一个请求,然后就停下来等待服务器的应答. 这种点对点的性质模式有很多好处,它使得在小范围内的拓扑变得简单.但是当有众多服务器的

利用Fabric+Capistrano实现Python自动化部署

Fabric是一个用于应用(批量)部署和系统(批量)管理的Python库和命令行工具,关于Fabric的介绍请参考:http://www.fabfile.org/. Capistrano是一个用Ruby语言编写的远程服务器自动化和部署工具,关于Capistrano的介绍请参考:http://capistranorb.com/. 本文仅使用Python语言和部分Linux或Windows系统命令,借助Fabric模块和Capistrano的部署思路,实现在Linux平台和Windows平台的自动化

Rabbitmq -Publish_Subscribe模式- python编码实现

what is Exchanges ?? Let's quickly go over what we covered in the previous tutorials: A producer is a user application that sends messages. A queue is a buffer that stores messages. A consumer is a user application that receives messages. The core id

linux 下 rpc python 实例之使用XML-RPC进行远程文件共享

这是个不错的练习,使用python开发P2P程序,或许通过这个我们可以自己搞出来一个P2P下载工具,类似于迅雷.XML-RPC是一个远程过程调用(remote procedure call,RPC)的分布式计算协议,通过XML将调用函数封装,并使用HTTP协议作为传送机制[摘自维基百科] 1.先做一个小小的尝试: 首先进入命令行,输入vim pythonServer.py,然后输入一下代码: from simpleXMLRPCServerr import SimpleXMLRPCServerr

Rabbitmq -Routeing模式- python编码实现

(using the pika 0.10.0 Python client) In the previous tutorial we built a simple logging system. We were able to broadcast log messages to many receivers. In this tutorial we're going to add a feature to it - we're going to make it possible to subscr

利用树莓派控制步进电机——Python语言

步进电机的优点在于它能够被精确定位,正向或反向一次性转动"一步",并且也能够连续转动. #!/usr/bin/env python ######################################################### # File name: stepMotor.py # Author: Jason Dai # Date: 2015/01/26 ########################################################