RabbitMQ(四):RPC的实现

原文:RabbitMQ(四):RPC的实现

一、RPC

  RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。有很多方式可以实现,譬如UNIX RPC、REST API、WCF和SOAP。这些传统的RPC实现方法有共同之处:那就是客户端和服务器端紧密相连,客户端直接连接上服务器,发送一个请求,然后就停下来等待服务器的应答。

  这种点对点的性质模式有很多好处,它使得在小范围内的拓扑变得简单。但是当有众多服务器的时候,客户端如何发现在那台服务器上可以找到其他想要的服务就变的麻烦,SOAP和大多数的企业RPC已经采用复杂的补充协议和服务目录,但也带来了额外的复杂度和众多故障点。

  但是,用RabbitMQ来实现RPC可以无需关心由那台服务器来处理,也不必担心服务器奔溃,只需要简单的发送消息,然后等待响应即可。一般接触RabbitMQ的都是用发后即忘模型,用于发送邮件等通知或者处理其他并行处理事件,也就是AMQP的消息是单向的。如何才能让服务器将处理结果返回给原始的客户端呢?

二、消息应答和私有队列

  RabbitMQ有一个优雅的解决方案:使用消息来发回应答。在每个AMQP消息头里有个字段reply_to.消息的生产者可以通过该字段来确定队列的名称,并监听应答队列等待应答。然后接收消息的RPC服务器能偶检查reply_to字段,并创建包含应答内容的新的消息,并以队列名称为路由键,通过应答队列将处理结果发回给生产者。这里我们不需要创建应答队列的名字也不需要将应答队列绑定到交换器上,这是因为没有声明队列的名称RabbitMQ会自动申明,消息发布到RabbitMQ在没有指名交换器的时候,RabbitMQ就会让位目的地是应答队列,而路由键就是应答队列名称。

  所以RabbitMQ实现RPC需要比一般的消息通信多以下几个步骤:

  1. 生产者创建一个应答队列,并监听该队列。
  2. 生产者为消息头中的Reply_to和CorrelationId字段赋值。reply_to是应答队列的名称,CorrelationId是相关标识由消费者返回后对比确认是返回我们的结果。
  3. 消费者返回生产者发送的消息头,并且不需要绑定交换器,并将Reply_to参数作为路由键发送消息到应答队列。

三、自己实现简单的RPC

  其实简单的讲就是生产者在发送消息后接收消息,消费者在接受消息后发送消息,生产者多了一步接收处理消息,消费者多了一步发送消息。我这里简化了一些操作,争取用最少的代码实现,具体代码如下:

  生产者:

private static void MySelfRPCProducer()
{
    var conn_factory = new ConnectionFactory(){HostName = "localhost",UserName = "guest",Password = "guest",Port = 5672};
    using (var conn = conn_factory.CreateConnection())
    {
        using (var channel = conn.CreateModel())
        {
            IBasicProperties pro = channel.CreateBasicProperties();
            pro.ReplyTo = channel.QueueDeclare().QueueName;//创建应答队列并返回队列名称,这个方法创建的队列exclusive和auto_delete都是true,这样可以确保没有人能窃取信息
            pro.ContentType = "text/plain";
            string corrId = Guid.NewGuid().ToString();
            pro.CorrelationId = corrId;

            channel.BasicPublish("", "rpc_queue", pro, Encoding.UTF8.GetBytes("小黄"));
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (ea, ch) =>
            {
                //比较CorrelationId确认是返回的我们的消息
                if (ch.BasicProperties.CorrelationId == corrId)
                {
                    //处理返回结果
                    string msg = Encoding.UTF8.GetString(ch.Body);
                    Console.WriteLine(msg);
                }
            };
            string consumer_tag = channel.BasicConsume(pro.ReplyTo, true, consumer);//监听应答队列
            channel.BasicCancel(consumer_tag);
        }
    }
    Console.ReadLine();
}

  消费者:

private static void MySelfRPCCousmer()
{
    var conn_factory = new ConnectionFactory(){HostName = "localhost",UserName = "guest",Password = "guest",Port = 5672};
    using (var conn = conn_factory.CreateConnection())
    {
        using (var channel = conn.CreateModel())
        {
            channel.QueueDeclare("rpc_queue", false, false, false, null);
            var consumer = new EventingBasicConsumer(channel);
            channel.BasicQos(0, 1, false);
            consumer.Received += (ea, ch) =>
            {
                string msg = Encoding.UTF8.GetString(ch.Body);
                Console.WriteLine("接收到消息:" + msg);
                //发送处理结果
                channel.BasicPublish("", ch.BasicProperties.ReplyTo, ch.BasicProperties, Encoding.UTF8.GetBytes(msg + "给我回电话了"));
                channel.BasicAck(ch.DeliveryTag, false);
            };
            string consumer_tag = channel.BasicConsume("rpc_queue", false, consumer);
            Console.ReadLine();//这里先停止运行下面的代码,因为需要持续监听,信道断开就监听不了了
            channel.BasicCancel(consumer_tag);
        }
    }
}

四、RabbitMQ封装好的RPC

  其实RabbitMQ已经封装好了RPC相应的对象,分别是SimpleRpcClient和SimpleRpcServer。客户端在初始化SimpleRpcClient后主要可以通过Call方法发送消息并返回服务端处理结果。服务端的SimpleRpcServer内部定义了很多虚方法,具体的消息处理是我们自己决定的,所以需要继承SimpleRpcServer后实现相应方法,通过实现重写HandleSimpleCall方法可以返回给客户端数据。具体代码如下所示:

  客户端:

private static void RabbitMQRPCProducer()
{
    var conn_factory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest", Port = 5672 };
    using (var conn = conn_factory.CreateConnection())
    {
        using (var channel = conn.CreateModel())
        {
            //创建client的rpc
            SimpleRpcClient client = new SimpleRpcClient(channel, new PublicationAddress(exchangeType: ExchangeType.Direct, exchangeName: string.Empty, routingKey: "rpc_queue"));
            bool flag = true;
            var sendmsg = "";
            while (flag)
            {
                Console.WriteLine("请输入要发送的消息");
                sendmsg = Console.ReadLine();
                if (string.IsNullOrWhiteSpace(sendmsg))
                {
                    Console.Write("请输入消息");
                    continue;
                }
                var msg = client.Call(Encoding.UTF8.GetBytes(sendmsg));
                Console.WriteLine(Encoding.UTF8.GetString(msg));
            }
            Console.ReadKey();
        }
    }
}

  服务端:

private static void RabbitMQRPCCousmer()
{

    var conn_factory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest", Port = 5672 };
    using (var conn = conn_factory.CreateConnection())
    {
        //创建返回一个新的频道
        using (var channel = conn.CreateModel())
        {
            channel.QueueDeclare("rpc_queue", false, false, false, null);//创建一个rpc queue
            SimpleRpcServer rpc = new MySimpleRpcServer(new Subscription(channel, "rpc_queue"));
            Console.WriteLine("服务端启动成功");
            rpc.MainLoop(); Console.ReadKey();
        }
    }
}

  继承实现方法:

class MySimpleRpcServer : SimpleRpcServer
{
    public MySimpleRpcServer(Subscription subscription) : base(subscription)
    {
    }
    /// <summary>
    /// 执行完成后进行回调
    /// </summary>
    public override byte[] HandleSimpleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties)
    {
        replyProperties = null;
        return Encoding.UTF8.GetBytes($"给{Encoding.UTF8.GetString(body)}发送短信成功");
    }
}

五、小结

  以上就是RabbitMQ对于RPC的最简单的实现,与大家共勉。

原文地址:https://www.cnblogs.com/lonelyxmas/p/10693402.html

时间: 2024-09-29 05:07:05

RabbitMQ(四):RPC的实现的相关文章

RabbitMQ 实现RPC

实现RPC 首先要弄明白,RPC是个什么东西. (RPC) Remote Procedure Call Protocol 远程过程调用协议 在一个大型的公司,系统由大大小小的服务构成,不同的团队维护不同的代码,部署在不同的机器.但是在做开发时候往往要用到其它团队的方法,因为已经有了实现.但是这些服务部署不同的机器上,想要调用就需要网络通信,这些代码繁琐且复杂,一不小心就会写的很低效.RPC协议定义了规划,其它的公司都给出了不同的实现.比如微软的wcf,以及现在火热的WebApi. 在Rabbit

RabbitMQ中RPC的实现及其通信机制

RabbitMQ中RPC的实现:客户端发送请求消息,服务端回复响应消息,为了接受响应response,客户端需要发送一个回调队列的地址来接受响应,每条消息在发送的时候会带上一个唯一的correlation_id,相应的服务端处理计算后会将结果返回到对应的correlation_id. RPC调用流程: 当生产者启动时,它会创建一个匿名的独占回调队列,对于一个RPC请求,生产者发送一条具有两个属性的消息:reply_to(回调队列),correlation_id(每个请求的唯一值),请求被发送到r

利用RabbitMQ实现RPC(python)

RPC--远程过程调用,通过网络调用运行在另一台计算机上的程序的函数\方法,是构建分布式程序的一种方式.RabbitMQ是一个消息队列系统,可以在程序之间收发消息.利用RabbitMQ可以实现RPC.本文所有操作都是在CentOS7.3上进行的,示例代码语言为Python. RabbiMQ以及pika模块安装 yum install rabbitmq-server python-pika -y systemctl    start rabbitmq-server RPC的基本实现 RPC的服务端

rabbitmq学习(四):利用rabbitmq实现远程rpc调用

一.rabbitmq实现rpc调用的原理 ·rabbitmq实现rpc的原理是:客户端向一个队列中发送消息,并注册一个回调的队列用于接收服务端返回的消息,该消息需要声明一个叫做correaltionId的属性,该属性将是该次请求的唯一标识.服务端在接受到消息(在需要时可以验证correaltionId)后,处理消息,并将消息发送到客户端注册的回调队列中.原理图如下: 二.代码实现 下面我们将模拟实现一个rpc客户端和rpc服务端.客户端给服务端发送message,服务端收到后处理message,

RabbitMQ(六) ——RPC

RabbitMQ(六) --RPC (转载请附上本文链接--linhxx) 一.概述 RabbitMQ的RPC模式,支持生产者和消费者不在同一个系统中,即允许远程调用的情况.通常,消费者作为服务端,放置在远程的系统中,提供接口,生产者调用接口,并发送消息. RPC模式如下图所示: RPC模式是一种远程调用的模式,因为需要http请求,因此速度比系统内部调用慢.而且rpc模式下,通常不易区分哪些是来自外部的请求,哪些是内部的请求,导致整体速度较慢.因此,不能滥用rpc模式. 二.回调队列(Call

RabbitMQ - RPC in Java

这次试着用RabbitMQ进行RPC. 其实用RabbitMQ搞RPC也没什么特别的. 只是我们需要在请求中再加入一个callback queue. 比如这样: callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties                             .Builder()                             .re

(转)RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC)

在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成.那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会通过其它节点求来斐波纳契完成示例. 1. 客户端接口 Client interface 为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class. 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞知道收到RPC运算的结果.代码如下: [python] vie

RabbitMQ教程——远程过程调用(RPC)

远程过程调用(RPC) (使用 pika 0.9.8 Python客户端) 在第二篇教程中,我们学习了如何使用工作队列在多个workers之间分发耗时的任务. 但是假使我们需要在一台远程的计算机上执行一个函数并等待结果呢?那就将是一件不同的事情了.这种模式通常被称为远程过程调用或RPC. 在这份教程中,我们将使用RabbitMQ来构建一个RPC系统:一个客户端和一个可伸缩的RPC服务器.由于我们没有任何耗时的任务值得分发,我们将创建一个虚拟的RPC服务来返回Fibonacci数. 客户端接口 为

RabbitMQ(七):适用于云计算集群的远程调用(RPC)

 在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成.那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会通过其它节点求来斐波纳契完成示例. 1. 客户端接口 Client interface 为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class. 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞知道收到RPC运算的结果.代码如下: [python]