RabbitMQ教程——远程过程调用(RPC)

远程过程调用(RPC)

(使用 pika 0.9.8 Python客户端)

第二篇教程中,我们学习了如何使用工作队列在多个workers之间分发耗时的任务。

但是假使我们需要在一台远程的计算机上执行一个函数并等待结果呢?那就将是一件不同的事情了。这种模式通常被称为远程过程调用RPC

在这份教程中,我们将使用RabbitMQ来构建一个RPC系统:一个客户端和一个可伸缩的RPC服务器。由于我们没有任何耗时的任务值得分发,我们将创建一个虚拟的RPC服务来返回Fibonacci数。

客户端接口

为了描述如何使用一个RPC服务,我们将创建一个简单的客户端类。它将暴露一个名为call的方法,而该方法将发送一个RPC请求,并阻塞直到接到回答。

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print "fib(4) is %r" % (result,)

一点关于RPC的说明

尽管RPC是计算领域一个相当常见的模式,但它常常受到批评。问题来自于程序员没有意识到一个函数调用是否是本地的时或如果它是一个慢RPC。困扰诸如导致了一个不可预知的系统并增加了不必须的调试复杂性。不仅没能简化软件,误用RPC还可能导致不可维护的意大利面条式的代码。

牢记,考虑下面的建议:

  • 确保一个调用是本地的还是远程的看起来很明显。
  • 为你的系统写文档。使得组件之间的以来清晰明了。
  • 处理错误情况。客户端在RPC服务器挂掉或执行了很长时间应该如何处理?

有疑问时避免使用RPC。如果可以,你应该使用一个异步的管道 - 而不是RPC - 如阻塞,结果被异步地推进下一个计算步骤。

回调队列

通常基于RabbitMQ执行RPC很简单。一个客户端发送一个请求消息,而一个服务器以一个响应消息来应答。为了接收一个响应,客户端需要在请求中发送一个‘callback‘队列地址。让我们來试一下:

result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange=‘‘,
                      routing_key=‘rpc_queue‘,
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)

# ... and some code to read a response message from the callback_queue ...

消息属性

AMQP协议预定义了伴随一个消息一起发送的14种属性。大多数属性很少被用到,除了如下的这些:

  • delivery_mode: 标记一个消息为persistent(以值2)或transient(其它任何值)。你可能还记得第二篇教程中的这个属性。
  • content_type: 用来描述编码的mime-type。比如对于常用的JSON编码,把属性设为application/json就是一个很好的实践。
  • reply_to: 常被用于命名一个callback队列。
  • correlation_id: 关联RPC响应和请求时很有用。

关联 id

在上面出现的方法中我们为每个RPC请求创建了一个callback队列。那相当没有效率,幸运地是有一个更好的方式 - 让我们为每个客户端创建一个单独的callback 队列。

那产生了一个新的问题,在那个队列中接收的响应到底属于哪个请求不是很清楚。那正是correlation_id属性应用的场合。我们将为每个请求设置一个唯一的值。稍后,当我们从callback队列中接收一条消息时,我们将查看这个属性,基于它我们将能够把一个响应与一个请求匹配起来。如果我们看到一个未知的correlation_id值,我们可以安全地丢弃消息 - 它不属于我们的请求。

你可能会问,我们为什么要忽略callback队列中的未知消息,而不是以一个error而failing?那是由于可能会在服务器端产生一个race condition。尽管可能性不大,RPC服务器可能在将答案发送给我们之后,但在为请求发送一个确认消息之前就死掉。如果发生了那种事,则重启后的RPC服务器将再次处理请求。那就是为什么在客户端上,我们必须优雅地处理重复的响应,而RPC应该是理想地幂等的。

总结

我们的RPC将像这样来工作:

  • 当客户端起来时,它创建一个匿名的exclusive callback队列。
  • 对于一个RPC请求,客户端发送一个消息,带有两个属性:reply_to,被设置为callback 队列,和correlation_id,被设置为给每个请求创建的一个唯一的值。
  • 请求被发送到一个rpc_queue队列。
  • RPC worker(aka: server) 在那个队列上等待。当一个请求出现时,它来执行工作,并发送一条带有结果的消息给客户端,使用来自于reply_to字段的队列。
  • 客户端在callback队列上等待数据。当一条消息出现时,它检查correlation_id属性。如果它与请求的那个匹配,它将把响应返回给应用。

完整代码

rpc_server.py的代码:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))

channel = connection.channel()

channel.queue_declare(queue=‘rpc_queue‘)

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
    n = int(body)

    print " [.] fib(%s)"  % (n,)
    response = fib(n)

    ch.basic_publish(exchange=‘‘,
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id =                                                      props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue=‘rpc_queue‘)

print " [x] Awaiting RPC requests"
channel.start_consuming()

服务器端的代码相当直接:

  • (4) 像通常那样,一开始我们就建立连接并声明队列。
  • (11) 我们声明了我们的fibonacci函数。它假设输入都是有效的正数。(不要期待在输入大数时这个函数仍能工作,它可能是最慢的递归实现了)。
  • (19) 我们为basic_consume声明了一个callback,RPC服务器的核心。当接到消息时执行它。它完成工作并发回响应。
  • (32) 我们可能想要运行多个服务器进程。为了在多个服务器之间平衡负载,我们需要设置prefetch_count setting。

rpc_client.py的代码:

#!/usr/bin/env python
import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host=‘localhost‘))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange=‘‘,
                                   routing_key=‘rpc_queue‘,
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print " [x] Requesting fib(30)"
response = fibonacci_rpc.call(30)
print " [.] Got %r" % (response,)

客户端代码要稍微复杂一点:

  • (7) 我们建立一个连接,channel并为回复声明一个exclusive ‘callback‘队列。
  • (16) 我们订阅‘callback‘队列,以便于我们能够收到RPC响应。
  • (18) 每次响应,‘on_response‘回调都会被执行,来做一点非常简单的工作,对于每一个响应消息,它都检查correlation_id是否是我们在寻找的那个。如果是,则把响应保存进self.response,然后打破consuming循环。
  • (23) 接下来,我们定义了我们的主call方法 - 它执行实际的RPC请求。
  • (24) 在这个方法中,我们首先要产生一个唯一correlation_id数,并保存它 - ‘on_response‘回调函数将使用这个值来捕获适当的响应。
  • (25) 下一步,我们发布请求消息,带有两个属性:reply_tocorrelation_id
  • (32) 此时我们可以坐下来休息一下,并等待适当的响应到达。
  • (33) 最后我们将响应返回给用户。

我们的RPC服务现在准备好了。我们可以启动服务器:

要请求一个Fibonacci数,则执行客户端:

当前的设计不是一个RPC服务仅有的可能的实现,但它有一些重要的优势:

  • 如果RPC服务器很慢,你可以通过运行另一个来扩展。试着在一个新的终端中运行第二个rpc_server.py。
  • 在客户端,RPC请求发送和接收只是一个消息。不需要异步地调用诸如queue_declare之类的。由此对于一个单独的RPC请求,RPC客户端只需要一个网络来回。

我们的代码仍然是过分简化了的,而没有去解决更复杂(但重要)的问题,比如:

  • 如果没有服务器在运行的话,那么客户但应该如何反应?
  • 一个客户端是否应该有一些RPC的超时机制?
  • 如果服务器失灵并抛出了一个异常,那它是否应该被转发给客户端呢?
  • 在处理消息之前,对于进入的无效消息做防护(比如检查边界等)。

此种方式实现的RPC,是否可以应对,同一个客户端同时发出多个RPC请求的情况?

(rpc_client.py和rpc_server.py 的完整代码)。

Done。

原文地址。

时间: 2024-07-29 06:36:09

RabbitMQ教程——远程过程调用(RPC)的相关文章

传入的表格格式数据流(TDS)远程过程调用(RPC)协议流不正确。参数 1 (""): 数据类型 0x38 未知

因公司升级数据库从sqlserver2000到sqlserver2008,数据源的配置还是使用sqlserver2000配置所以造成一下问题: 传入的表格格式数据流(TDS)远程过程调用(RPC)协议流不正确.参数 1 (""): 数据类型 0x38 未知 sqlserver2000数据源配置: <Resource name="jdbc/test" auth="Container" type="javax.sql.DataSour

远程过程调用(RPC)详解

原文同步至 http://waylau.com/remote-procedure-calls/ 本文介绍了什么是远程过程调用(RPC),RPC 有哪些常用的方法,RPC 经历了哪些发展阶段,以及比较了各种 RPC 技术的优劣. 什么是 RPC RPC 是远程过程调用(Remote Procedure Call)的缩写形式,Birrell 和 Nelson 在 1984 发表于 ACM Transactions on Computer Systems 的论文<Implementing remote

java 执行sql错误 传入的表格格式数据流(TDS)远程过程调用(RPC)协议流不正确。参数 1 (&quot;&quot;): 数据类型 0x38 未知

连接数据库时设置:Statement stmt = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE ,ResultSet.CONCUR_READ_ONLY); 则会出现:[Microsoft][SQLServer 2000 Driver for JDBC][SQLServer]传入的表格格式数据流(TDS)远程过程调用(RPC)协议流不正确.参数 1 (""): 数据类型 0x38 未知. 解决的办法:将ResultSet.

转:传入的表格格式数据流(TDS)远程过程调用(RPC)协议流不正确 .

近期在做淘宝客的项目,大家都知道,淘宝的商品详细描述字符长度很大,所以就导致了今天出现了一个问题 VS的报错是这样子的  ” 传入的表格格式数据流(TDS)远程过程调用(RPC)协议流不正确“ 还说某个@desricption 过长之类的话 直觉告诉我,某个字段过长溢出了 第一时间  :看看字段的数据类型 ,该字段类型为text,也就是无限制的长度,所以,数据库是没有问题的 第二 时间 :看看三层,代码生成器自动生成的代码如下这段 [csharp] view plaincopyprint? db

RabbitMQ(六) ——RPC

RabbitMQ(六) --RPC (转载请附上本文链接--linhxx) 一.概述 RabbitMQ的RPC模式,支持生产者和消费者不在同一个系统中,即允许远程调用的情况.通常,消费者作为服务端,放置在远程的系统中,提供接口,生产者调用接口,并发送消息. RPC模式如下图所示: RPC模式是一种远程调用的模式,因为需要http请求,因此速度比系统内部调用慢.而且rpc模式下,通常不易区分哪些是来自外部的请求,哪些是内部的请求,导致整体速度较慢.因此,不能滥用rpc模式. 二.回调队列(Call

rabbitmq 命令&amp;&amp; rabbitmq教程(一)

先来个官方教程 http://www.rabbitmq.com 在windows 下 命名 去掉sudo 我是在windows下测试 用net调用 常用命令 控制台命令:sudo rabbitmqctl#只能在root权限下使用 本地节点默认被命名为”rabbit”.可以通过这个命令前使 用”-n”标志明确的指定节点名称, 例如: sudo rabbitmqctl -n [email protected] **** 控制台命令 启动:sudo rabbitmq-server start 启动应用

rabbitMQ学习笔记(七) RPC 远程过程调用

当客户端想要调用服务器的某个方法来完成某项功能时,就可以使用rabbitMQ支持的PRC服务. 其实RPC服务与普通的收发消息的区别不大, RPC的过程其实就是 客户端向服务端定义好的Queue发送消息,其中携带的消息就应该是服务端将要调用的方法的参数 ,并使用Propertis告诉服务端将结果返回到指定的Queue. 示例: 1 package com.zf.rabbitmq07; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.

RabbitMQ教程

1.引言 RabbitMQ--Rabbit Message Queue的简写,但不能仅仅理解其为消息队列,消息代理更合适.RabbitMQ 是一个由 Erlang 语言开发的AMQP(高级消息队列协议)的开源实现,其内部结构如下: RabbitMQ作为一个消息代理,主要和消息打交道,负责接收并转发消息.RabbitMQ提供了可靠的消息机制.跟踪机制和灵活的消息路由,支持消息集群和分布式部署.适用于排队算法.秒杀活动.消息分发.异步处理.数据同步.处理耗时任务.CQRS等应用场景. 下面我们就来学

RabbitMQ教程C#版 - 发布订阅

先决条件本教程假定 RabbitMQ 已经安装,并运行在localhost标准端口(5672).如果你使用不同的主机.端口或证书,则需要调整连接设置. 从哪里获得帮助如果您在阅读本教程时遇到困难,可以通过邮件列表 联系我们. 发布/订阅# (使用 .NET Client) 在 教程[2] 中,我们创建了一个工作队列,假设在工作队列中的每一个任务都只被分发给一个 Worker.那么在这一章节,我们要做与之完全不同的事,那就是我们将要把一条消息分发给多个消费者.这种模式被称为“发布/订阅”. 为了说