RabbitMQ 实现RPC

实现RPC

首先要弄明白,RPC是个什么东西。

(RPC) Remote Procedure Call Protocol 远程过程调用协议

在一个大型的公司,系统由大大小小的服务构成,不同的团队维护不同的代码,部署在不同的机器。但是在做开发时候往往要用到其它团队的方法,因为已经有了实现。但是这些服务部署不同的机器上,想要调用就需要网络通信,这些代码繁琐且复杂,一不小心就会写的很低效。RPC协议定义了规划,其它的公司都给出了不同的实现。比如微软的wcf,以及现在火热的WebApi。

在RabbitMQ中RPC的实现也是很简单高效的,现在我们的客户端、服务端都是消息发布者与消息接收者。

首先客户端通过RPC向服务端发出请求

我这里有一堆东西需要你给我处理一下,correlation_id:这是我的请求标识,erply_to:你处理完过后把结果返回到这个队列中。

服务端拿到了请求,开始处理并返回

correlation_id:这是你的请求标识 ,原封不动的给你。 这时候客户端用自己的correlation_id与服务端返回的id进行对比。是我的,就接收。

一些繁琐的细节rabbitmq已经为我们封装了,简单的SimpleRpcServer与SimpleRpcClient让Rpc实现的更为方便,这里可以先看一下server端

使用默认的交换机,创建一个SimpleRpcServer的实例,这里需要注意的是,SimpleRpcServer的处理应该是根据业务来的,也就是自己的。在给出的类中没有任何的实现,如果我们创建一个自己的RpcServer并且给出实现

          //创建返回一个新的频道
            using (var channel = RabbitMqHelper.GetConnection().CreateModel())
            {

                //创建一个rpc queue
                channel.QueueDeclare("RpcQueue", true, false, false, null);

                SimpleRpcServer rpc = new SmsSimpleRpcServer(new Subscription(channel, "RpcQueue"));

                Console.WriteLine("服务端启动成功");
   rpc.MainLoop();
                Console.ReadKey();

            }

这里是自己的一个RpcServer,在HandleSimpleCall方法里返回对处理的回调消息,在ProcessRequest中做出具体的处理逻辑

/// <summary>
    /// 发送短信的Rpc
    /// </summary>
    public class SmsSimpleRpcServer : SimpleRpcServer
    {
        public SmsSimpleRpcServer(Subscription subscription) : base(subscription)
        {
        }

        /// <summary>
        /// 执行完成后进行h回调
        /// </summary>
        /// <param name="isRedelivered"></param>
        /// <param name="requestProperties"></param>
        /// <param name="body"></param>
        /// <param name="replyProperties"></param>
        /// <returns></returns>
        public override byte[] HandleSimpleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties)
        {
            replyProperties = null;
            return Encoding.UTF8.GetBytes($"给{Encoding.UTF8.GetString(body)}发送短信成功");
        }

        /// <summary>
        /// 进行处理
        /// </summary>
        /// <param name="evt"></param>
        public override void ProcessRequest(BasicDeliverEventArgs evt)
        {
            // todo.....
            base.ProcessRequest(evt);
        }
    }

回到client端,这里的代码也是非常容易的。创建一个SimpleRpcClient,然后指定了交换机类型,因为用的是默认的,所以exchange传的是null, routingkey是我们的rpcqueue。最后调用call方法

using (var channel = RabbitMqHelper.GetConnection().CreateModel())
           {
               //创建client的rpc
               SimpleRpcClient client = new SimpleRpcClient(channel, new PublicationAddress(exchangeType: ExchangeType.Direct, exchangeName: string.Empty, routingKey: "RpcQueue"));
               bool flag = true;
               var sendmsg = "";
               while (flag)
               {
                   Console.WriteLine("请输入要发送的消息");
                   sendmsg = Console.ReadLine();
                   if (string.IsNullOrWhiteSpace(sendmsg))
                   {
                       Console.Write("请输入消息");
                       continue;
                   }

                   var msg = client.Call(Encoding.UTF8.GetBytes(sendmsg));

                   Console.WriteLine(Encoding.UTF8.GetString(msg));

               }

               Console.ReadKey();

           }

把程序运行起来

后面说一些内部的东西,其实上在创建一次SimpleRpcClient的时候都会创建一个回调队列,这个队列在程序关闭后会自动消失,所以这些建议创建一次就够了,都使用这个。如果创建多次会影响性能

在回调的时候,通过源码也可以看到判断了correlation_id的一致性

在server端也可以看到在执行Process后会发布消息到回调队列

时间: 2024-10-12 16:01:18

RabbitMQ 实现RPC的相关文章

RabbitMQ中RPC的实现及其通信机制

RabbitMQ中RPC的实现:客户端发送请求消息,服务端回复响应消息,为了接受响应response,客户端需要发送一个回调队列的地址来接受响应,每条消息在发送的时候会带上一个唯一的correlation_id,相应的服务端处理计算后会将结果返回到对应的correlation_id. RPC调用流程: 当生产者启动时,它会创建一个匿名的独占回调队列,对于一个RPC请求,生产者发送一条具有两个属性的消息:reply_to(回调队列),correlation_id(每个请求的唯一值),请求被发送到r

RabbitMQ(四):RPC的实现

原文:RabbitMQ(四):RPC的实现 一.RPC RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议.有很多方式可以实现,譬如UNIX RPC.REST API.WCF和SOAP.这些传统的RPC实现方法有共同之处:那就是客户端和服务器端紧密相连,客户端直接连接上服务器,发送一个请求,然后就停下来等待服务器的应答. 这种点对点的性质模式有很多好处,它使得在小范围内的拓扑变得简单.但是当有众多服务器的

利用RabbitMQ实现RPC(python)

RPC--远程过程调用,通过网络调用运行在另一台计算机上的程序的函数\方法,是构建分布式程序的一种方式.RabbitMQ是一个消息队列系统,可以在程序之间收发消息.利用RabbitMQ可以实现RPC.本文所有操作都是在CentOS7.3上进行的,示例代码语言为Python. RabbiMQ以及pika模块安装 yum install rabbitmq-server python-pika -y systemctl    start rabbitmq-server RPC的基本实现 RPC的服务端

RabbitMQ - RPC in Java

这次试着用RabbitMQ进行RPC. 其实用RabbitMQ搞RPC也没什么特别的. 只是我们需要在请求中再加入一个callback queue. 比如这样: callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties                             .Builder()                             .re

(转)RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC)

在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成.那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会通过其它节点求来斐波纳契完成示例. 1. 客户端接口 Client interface 为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class. 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞知道收到RPC运算的结果.代码如下: [python] vie

RabbitMQ教程——远程过程调用(RPC)

远程过程调用(RPC) (使用 pika 0.9.8 Python客户端) 在第二篇教程中,我们学习了如何使用工作队列在多个workers之间分发耗时的任务. 但是假使我们需要在一台远程的计算机上执行一个函数并等待结果呢?那就将是一件不同的事情了.这种模式通常被称为远程过程调用或RPC. 在这份教程中,我们将使用RabbitMQ来构建一个RPC系统:一个客户端和一个可伸缩的RPC服务器.由于我们没有任何耗时的任务值得分发,我们将创建一个虚拟的RPC服务来返回Fibonacci数. 客户端接口 为

RabbitMQ(七):适用于云计算集群的远程调用(RPC)

 在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成.那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会通过其它节点求来斐波纳契完成示例. 1. 客户端接口 Client interface 为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class. 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞知道收到RPC运算的结果.代码如下: [python]

RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC) [转]

在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成.那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会通过其它节点求来斐波纳契完成示例. 1. 客户端接口 Client interface 为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class. 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞知道收到RPC运算的结果.代码如下: [python] vie

RabbitMQ(六) ——RPC

RabbitMQ(六) --RPC (转载请附上本文链接--linhxx) 一.概述 RabbitMQ的RPC模式,支持生产者和消费者不在同一个系统中,即允许远程调用的情况.通常,消费者作为服务端,放置在远程的系统中,提供接口,生产者调用接口,并发送消息. RPC模式如下图所示: RPC模式是一种远程调用的模式,因为需要http请求,因此速度比系统内部调用慢.而且rpc模式下,通常不易区分哪些是来自外部的请求,哪些是内部的请求,导致整体速度较慢.因此,不能滥用rpc模式. 二.回调队列(Call