RabbitMQ(七):适用于云计算集群的远程调用(RPC)



在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成。那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会通过其它节点求来斐波纳契完成示例。

1. 客户端接口 Client interface

为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class。 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞知道收到RPC运算的结果。代码如下:

[python]
view plain
copy
print?

  1. fibonacci_rpc = FibonacciRpcClient()
  2. result = fibonacci_rpc.call(4)
  3. print "fib(4) is %r" % (result,)

<span style="font-size:18px;">fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print "fib(4) is %r" % (result,)</span>

2. 回调函数队列 Callback queue

总体来说,在RabbitMQ进行RPC远程调用是比较容易的。client发送请求的Message然后server返回响应结果。为了收到响应client在publish message时需要提供一个”callback“(回调)的queue地址。code如下:

[python]
view plain
copy
print?

  1. result = channel.queue_declare(exclusive=True)
  2. callback_queue = result.method.queue
  3. channel.basic_publish(exchange=‘‘,
  4. routing_key=‘rpc_queue‘,
  5. properties=pika.BasicProperties(
  6. reply_to = callback_queue,
  7. ),
  8. body=request)
  9. # ... and some code to read a response message from the callback_queue ...

<span style="font-size:18px;">result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)

# ... and some code to read a response message from the callback_queue ...</span>

2.1 Message properties

AMQP 预定义了14个属性。它们中的绝大多很少会用到。以下几个是平时用的比较多的:

  • delivery_mode: 持久化一个Message(通过设定值为2)。其他任意值都是非持久化。请移步RabbitMQ消息队列(三):任务分发机制
  • content_type: 描述mime-type 的encoding。比如设置为JSON编码:设置该property为application/json。
  • reply_to: 一般用来指明用于回调的queue(Commonly used to name a callback queue)。
  • correlation_id: 在请求中关联处理RPC响应(correlate RPC responses with requests)。

3. 相关id Correlation id

在上个小节里,实现方法是对每个RPC请求都会创建一个callback queue。这是不高效的。幸运的是,在这里有一个解决方法:为每个client创建唯一的callback queue。

这又有其他问题了:收到响应后它无法确定是否是它的,因为所有的响应都写到同一个queue了。上一小节的correlation_id在这种情况下就派上用场了:对于每个request,都设置唯一的一个值,在收到响应后,通过这个值就可以判断是否是自己的响应。如果不是自己的响应,就不去处理。

4. 总结

工作流程:

  • 当客户端启动时,它创建了匿名的exclusive callback queue.
  • 客户端的RPC请求时将同时设置两个properties:
    reply_to设置为callback queue;correlation_id设置为每个request一个独一无二的值.
  • 请求将被发送到an rpc_queue queue.
  • RPC端或者说server一直在等待那个queue的请求。当请求到达时,它将通过在reply_to指定的queue回复一个message给client。
  • client一直等待callback queue的数据。当message到达时,它将检查correlation_id的值,如果值和它request发送时的一致那么就将返回响应。

5. 最终实现

The code for rpc_server.py:

[python]
view plain
copy
print?

  1. #!/usr/bin/env python
  2. import pika
  3. connection = pika.BlockingConnection(pika.ConnectionParameters(
  4. host=‘localhost‘))
  5. channel = connection.channel()
  6. channel.queue_declare(queue=‘rpc_queue‘)
  7. def fib(n):
  8. if n == 0:
  9. return 0
  10. elif n == 1:
  11. return 1
  12. else:
  13. return fib(n-1) + fib(n-2)
  14. def on_request(ch, method, props, body):
  15. n = int(body)
  16. print " [.] fib(%s)"  % (n,)
  17. response = fib(n)
  18. ch.basic_publish(exchange=‘‘,
  19. routing_key=props.reply_to,
  20. properties=pika.BasicProperties(correlation_id = \
  21. props.correlation_id),
  22. body=str(response))
  23. ch.basic_ack(delivery_tag = method.delivery_tag)
  24. channel.basic_qos(prefetch_count=1)
  25. channel.basic_consume(on_request, queue=‘rpc_queue‘)
  26. print " [x] Awaiting RPC requests"
  27. channel.start_consuming()
<span style="font-size:18px;">#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
    n = int(body)

    print " [.] fib(%s)"  % (n,)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id =                                                      props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print " [x] Awaiting RPC requests"
channel.start_consuming()
</span>

The server code is rather straightforward:

  • (4) As usual we start by establishing the connection and declaring the queue.
  • (11) We declare our fibonacci function. It assumes only valid positive integer input. (Don‘t expect this one to work for big numbers, it‘s probably the slowest recursive implementation possible).
  • (19) We declare a callback for
    basic_consume, the core of the RPC server. It‘s executed when the request is received. It does the work and sends the response back.
  • (32) We might want to run more than one server process. In order to spread the load equally over multiple servers we need to set theprefetch_count setting.

The code for rpc_client.py:

[python]
view plain
copy
print?

  1. #!/usr/bin/env python
  2. import pika
  3. import uuid
  4. class FibonacciRpcClient(object):
  5. def __init__(self):
  6. self.connection = pika.BlockingConnection(pika.ConnectionParameters(
  7. host=‘localhost‘))
  8. self.channel = self.connection.channel()
  9. result = self.channel.queue_declare(exclusive=True)
  10. self.callback_queue = result.method.queue
  11. self.channel.basic_consume(self.on_response, no_ack=True,
  12. queue=self.callback_queue)
  13. def on_response(self, ch, method, props, body):
  14. if self.corr_id == props.correlation_id:
  15. self.response = body
  16. def call(self, n):
  17. self.response = None
  18. self.corr_id = str(uuid.uuid4())
  19. self.channel.basic_publish(exchange=‘‘,
  20. routing_key=‘rpc_queue‘,
  21. properties=pika.BasicProperties(
  22. reply_to = self.callback_queue,
  23. correlation_id = self.corr_id,
  24. ),
  25. body=str(n))
  26. while self.response is None:
  27. self.connection.process_data_events()
  28. return int(self.response)
  29. fibonacci_rpc = FibonacciRpcClient()
  30. print " [x] Requesting fib(30)"
  31. response = fibonacci_rpc.call(30)
  32. print " [.] Got %r" % (response,)

<span style="font-size:18px;">#!/usr/bin/env python
import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print " [x] Requesting fib(30)"
response = fibonacci_rpc.call(30)
print " [.] Got %r" % (response,)</span>

The client code is slightly more involved:

  • (7) We establish a connection, channel and declare an exclusive ‘callback‘ queue for replies.
  • (16) We subscribe to the ‘callback‘ queue, so that we can receive RPC responses.
  • (18) The ‘on_response‘ callback executed on every response is doing a very simple job, for every response message it checks if thecorrelation_id is the one we‘re looking for. If so, it saves the response
    inself.response and breaks the consuming loop.
  • (23) Next, we define our main
    call method - it does the actual RPC request.
  • (24) In this method, first we generate a unique
    correlation_id number and save it - the ‘on_response‘ callback function will use this value to catch the appropriate response.
  • (25) Next, we publish the request message, with two properties:
    reply_to and correlation_id.
  • (32) At this point we can sit back and wait until the proper response arrives.
  • (33) And finally we return the response back to the user.

开始rpc_server.py:

[python]
view plain
copy
print?

  1. $ python rpc_server.py
  2. [x] Awaiting RPC requests
<span style="font-size:18px;">$ python rpc_server.py
 [x] Awaiting RPC requests</span>

通过client来请求fibonacci数:

[python]
view plain
copy
print?

  1. $ python rpc_client.py
  2. [x] Requesting fib(30)
<span style="font-size:18px;">$ python rpc_client.py
 [x] Requesting fib(30)</span>

现在这个设计并不是唯一的,但是这个实现有以下优势:

  • 如何RPC server太慢,你可以扩展它:启动另外一个RPC server。
  • 在client端, 无所进行加锁能同步操作,他所作的就是发送请求等待响应。

我们的code还是挺简单的,并没有尝试去解决更复杂和重要的问题,比如:

  • 如果没有server在运行,client需要怎么做?
  • RPC应该设置超时机制吗?
  • 如果server运行出错并且抛出了异常,需要将这个问题转发到client吗?
  • 需要边界检查吗?
时间: 2024-07-29 15:04:42

RabbitMQ(七):适用于云计算集群的远程调用(RPC)的相关文章

(转)RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC)

在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成.那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会通过其它节点求来斐波纳契完成示例. 1. 客户端接口 Client interface 为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class. 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞知道收到RPC运算的结果.代码如下: [python] vie

RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC) [转]

在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成.那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会通过其它节点求来斐波纳契完成示例. 1. 客户端接口 Client interface 为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class. 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞知道收到RPC运算的结果.代码如下: [python] vie

网格计算, 云计算, 集群计算, 分布式计算, 超级计算

网格计算, 云计算, 集群计算, 分布式计算, 超级计算 整体来说都有将任务分割.运算.组合,只是协同和处理的重点不同: 超级计算强调的是高并行计算能力,应用设备多是超级计算机如天河一号,是infiniband的高并行处理架构,实现总线级协同,一般采用计算能力更强的GPU而非CPU:集群计算和分布式计算是相对于设备部署结构来说,这种计算相对超算来说,对于计算的并行处理及响应要求较低,需要实现的是网络环境下的协同,实现的效果受网络环境影响.网格计算是集群计算和分布式计算与超级计算中间的产物,是在原

Apache shiro集群实现 (七)分布式集群系统下---cache共享

Apache shiro集群实现 (一) shiro入门介绍 Apache shiro集群实现 (二) shiro 的INI配置 Apache shiro集群实现 (三)shiro身份认证(Shiro Authentication) Apache shiro集群实现 (四)shiro授权(Authentication)--访问控制 Apache shiro集群实现 (五)分布式集群系统下的高可用session解决方案 Apache shiro集群实现 (六)分布式集群系统下的高可用session

搭建 RabbitMQ Server 高可用集群

阅读目录: 准备工作 搭建 RabbitMQ Server 单机版 RabbitMQ Server 高可用集群相关概念 搭建 RabbitMQ Server 高可用集群 搭建 HAProxy 负载均衡 因为公司测试服务器暂不能用,只能在自己电脑上重新搭建一下 RabbitMQ Server 高可用集群,正好把这个过程记录下来,以便日后查看. 公司测试服务器上的 RabbitMQ 集群,我搭建的是三台服务器,因为自己电脑空间有限,这边只能搭建两台服务器用作高可用集群,用的是 Vagrant 虚拟机

在CentOS7上配置RabbitMQ 3.6.3集群与高可用

在CentOS7上配置RabbitMQ 3.6.3集群与高可用 集群概述 通过 Erlang 的分布式特性(magic cookie 认证节点)进行 RabbitMQ 集群,各 RabbitMQ 服务为对等节点,即每个节点都提供服务给客户端连接,进行消息发送与接收. 这些节点通过 RabbitMQ HA 队列(镜像队列)进行消息队列结构复制.本文中搭建 3 个节点,并且都是磁盘节点(所有节点状态保持一致,节点完全对等),只要有任何一个节点能够工作,RabbitMQ 集群对外就能提供服务. 环境

Atitit.分布式远程调用&#160;&#160;rpc &#160;rmi &#160;CORBA的关系

Atitit.分布式远程调用  rpc  rmi  CORBA的关系 1. 远程调用(包括rpc,rmi,rest)1 2. 分布式调用大体上就分为两类,RPC式的,REST式的1 3. RPC(远程过程调用)是什么 1 4. 传输的数据2 5. 序列化与反序列化3 6. ref  谁能用通俗的语言解释一下什么是 RPC 框架? - Java - 知乎.html3 1. 远程调用(包括rpc,rmi,rest) RPC的协议有很多,比如最早的CORBA,Java RMI,Web Service的

rabbitMQ学习(一):集群环境搭建

环境:四台centOS操作系统服务器192.168.0.73,192.168.0.74,192.168.0.75,192.168.0.76 1 在每台服务器上按照如下步骤安装rabbitMQ 1.1 rabbitMQ是基于erlang语言开发的,因此先安装erlang 执行命令: yum install erlang 1.2 安装rabbitMQ 分别执行命令 1 wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.0/rabbi

RabbitMQ的安装及集群搭建方法

RabbitMQ安装 1 安装erlang 下载地址:http://www.erlang.org/downloads 博主这里采用的是otp_src_19.1.tar.gz (200MB+) [[email protected] util]# tar zxvf otp_src_19.1.tar.gz [[email protected] util]# cd otp_src_19.1 [[email protected] otp_src_19.1]# ./configure --prefix=/o