RabbitMQ(六) ——RPC

RabbitMQ(六)

——RPC

(转载请附上本文链接——linhxx)

一、概述

RabbitMQ的RPC模式,支持生产者和消费者不在同一个系统中,即允许远程调用的情况。通常,消费者作为服务端,放置在远程的系统中,提供接口,生产者调用接口,并发送消息。

RPC模式如下图所示:

RPC模式是一种远程调用的模式,因为需要http请求,因此速度比系统内部调用慢。而且rpc模式下,通常不易区分哪些是来自外部的请求,哪些是内部的请求,导致整体速度较慢。因此,不能滥用rpc模式。

二、回调队列(Callback queue)

要实现rpc模式,生产者需要发送回调队列。方法如下:

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$msg = new AMQPMessage(

$payload,

array(‘reply_to‘ => $queue_name));

$channel->basic_publish($msg, ‘‘, ‘rpc_queue‘);

三、AMQP消息属性

AMQP是一种消息传输协议,RabbitMQ是基于此协议的组件。其中,对消息(Message)的属性,有如下几个常用定义:

1)delivery_mode

消息的模式,当该值是2,表示消息需要持久化;当值是1,表示该消息不用持久化。

2)content_type

消息的类型,用来标记消息编码的类型,通常采用json,则此值是application/json,类似在HTML中的定义。

3)reply_to

用于定义回调队列的名字。

4)correlation_id

用于关联rpc的消息请求发送与消息响应接收。(request 和 response)

四、correlation_id

采用回调队列的方式,效率不够高,rabbitMQ还提供了一种方式,即correlation_id。对于每个请求,可以设定一个特定的、唯一的correlation_id。则当收到回复时,仍查看回复消息的此属性,则可以将请求和响应进行关联。

此时,当接收的correlation_id不存在,则表示该响应不是对该生产者发送请求的响应,生产者会丢弃接收到的消息。

采用抛弃消息,而不是报错,是因为服务端(消费者)可能存在竞争条件(race condition),因此存在可能,当服务器(消费者)刚刚发送ack回馈消息给客户端(生产者),服务器宕机。则服务器重启后,会重新发送ack给客户端。但是由于客户端在此之前,已经完成处理,则再次接收到correlation_id时,correlation_id不存在。因此,客户端对待重复的回复是采取抛弃消息的方式,而不是报错。

五、工作流程

1、生产者(Client)开始生产消息后,创建了匿名的、独一无二的回调队列。

2、生产者(Client)发送请求时,包含两个属性:reply_to,即回调队列;correlation_id,即用于标记请求的属性。

3、请求(request )被发送到rpc_queue队列。

4、消费者(The RPC worker)等待请求,收到时,其处理生产者发送的特定的reply_to的消息。

5、生产者等待消息的ack回复,当收到回复后,其校验correlation_id,如果正确则回到应用程序中。

——written by linhxx

更多最新文章,欢迎关注微信公众号“决胜机器学习”,或扫描右边二维码。

原文地址:https://www.cnblogs.com/linhxx/p/8434180.html

时间: 2024-11-07 08:40:45

RabbitMQ(六) ——RPC的相关文章

RabbitMQ 实现RPC

实现RPC 首先要弄明白,RPC是个什么东西. (RPC) Remote Procedure Call Protocol 远程过程调用协议 在一个大型的公司,系统由大大小小的服务构成,不同的团队维护不同的代码,部署在不同的机器.但是在做开发时候往往要用到其它团队的方法,因为已经有了实现.但是这些服务部署不同的机器上,想要调用就需要网络通信,这些代码繁琐且复杂,一不小心就会写的很低效.RPC协议定义了规划,其它的公司都给出了不同的实现.比如微软的wcf,以及现在火热的WebApi. 在Rabbit

RabbitMQ中RPC的实现及其通信机制

RabbitMQ中RPC的实现:客户端发送请求消息,服务端回复响应消息,为了接受响应response,客户端需要发送一个回调队列的地址来接受响应,每条消息在发送的时候会带上一个唯一的correlation_id,相应的服务端处理计算后会将结果返回到对应的correlation_id. RPC调用流程: 当生产者启动时,它会创建一个匿名的独占回调队列,对于一个RPC请求,生产者发送一条具有两个属性的消息:reply_to(回调队列),correlation_id(每个请求的唯一值),请求被发送到r

RabbitMQ(四):RPC的实现

原文:RabbitMQ(四):RPC的实现 一.RPC RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议.有很多方式可以实现,譬如UNIX RPC.REST API.WCF和SOAP.这些传统的RPC实现方法有共同之处:那就是客户端和服务器端紧密相连,客户端直接连接上服务器,发送一个请求,然后就停下来等待服务器的应答. 这种点对点的性质模式有很多好处,它使得在小范围内的拓扑变得简单.但是当有众多服务器的

RabbitMQ六:通过routingkey模拟日志

序言 本章文章进入深入了解RabbiMQ,平时项目中我们经常用到记录日志,常见的不外乎:Info.debug.warn.Error.     情境进入:先简单说一下我们需求,我们开发过程中会遇到很多日志记录,每种日志正常我们会放在不同时的文件夹(当然有的也可以合并,具体问题具体分析),现在我们就记录不同的日志,然后根据不同的类型,进行查找日志记录. 100个数内,实现1(Info) .2(debug).3(warn).4(Error)       5(Info) .6(debug).7(warn

利用RabbitMQ实现RPC(python)

RPC--远程过程调用,通过网络调用运行在另一台计算机上的程序的函数\方法,是构建分布式程序的一种方式.RabbitMQ是一个消息队列系统,可以在程序之间收发消息.利用RabbitMQ可以实现RPC.本文所有操作都是在CentOS7.3上进行的,示例代码语言为Python. RabbiMQ以及pika模块安装 yum install rabbitmq-server python-pika -y systemctl    start rabbitmq-server RPC的基本实现 RPC的服务端

RabbitMQ (六) : 订阅者模式之路由模式 ( direct )

路由模式下,生产者发送消息时需要指定一个路由键(routingKey),交换机只会把消息转发给包含该路由键的队列 这里,我们改变一下声明交换机的方式. 我们通过管理后台添加一个交换机. 添加后,生产者和消费者的代码中就不需要再声明交换机了.同样,也可以通过管理后台添加队列,那么代码中也不需要声明队列了. 生产者 public class Producer { private const string ExchangeName = "test_exchange_direct"; publ

RabbitMQ中文文档PHP版本(六)--远程过程调用(RPC)

2019年12月10日10:05:54 原文:https://www.rabbitmq.com/tutorials/tutorial-six-php.html 远程过程调用(RPC) (使用php-amqplib) 先决条件 本教程假定RabbitMQ 已在标准端口(5672)的本地主机上安装并运行.如果您使用其他主机,端口或凭据,则连接设置需要进行调整. 在哪里获得帮助 如果您在阅读本教程时遇到困难,可以 通过邮件列表与我们联系. 在第二篇教程中,我们学习了如何使用工作队列在多个工作人员之间分

RabbitMQ - RPC in Java

这次试着用RabbitMQ进行RPC. 其实用RabbitMQ搞RPC也没什么特别的. 只是我们需要在请求中再加入一个callback queue. 比如这样: callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties                             .Builder()                             .re

(转)RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC)

在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成.那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会通过其它节点求来斐波纳契完成示例. 1. 客户端接口 Client interface 为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class. 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞知道收到RPC运算的结果.代码如下: [python] vie