RabbitMQ九:远程过程调用RPC

定义

RPC(Remote Procedure Call Protocol)——远程过程调用协议:它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

PRC采用客户端/服务端模式,请求程序就是一个客户机,而服务提供就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息,在服务器,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复消息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续前进。(整个过程有点类似:你到某大医院看病,你先到柜台交钱拿卡(医师费),拿卡去找医生(卡代表你的认证相当参数),医生根据卡给你把脉看病进行详谈沟通,医诊结束后给你开药,下一位患者进入。。。。。,医生有是服务端,患者是客户端,举例可能有点牵强,就是表达那个意思,)

RPC是在计算机中一种常见的模式,是通常我要用消息队列3个关键点:

1、服务的寻址;

2、消息的接受;

3、消息的关联。

RPC调用的顺序简述:

 1、当客户端启动时,它会创建一个匿名的独占会回调队列;

 2、对于一个RPC请求,客户端通过两个属性发送一条消息(从图中我们也可以看到):relayTo 设置回调队列;correlationId,为每个请求设置唯一的标识ID;

 3、消息将发送到一个Rpc_queue  队列;

 4、RPC工程线程(服务器)在该队列上等待请求,当请求出现,他将处理请求并把结果发回到客户端,使用队列在replayTo中设置;

 5、客户端在回调队列上等待响应,当消息出现,它检查关联ID,如果匹配来自请求的关联ID值,返回队列消息到该应用程序。

重点解释

correlationId 和 relayTo 参数

首先客户端通过RPC向服务端发送请求

我这里有一堆东西需要你给我处理一下,correlationId :这是我的请求标识,relayTo :你处理完过后把结果返回到这个队列中。

服务端拿到请求,并开始处理并返回结果

correlationId :这是你的请求标识 ,原封不动的给你。 这时候客户端用自己的correlationId 与服务端返回的id进行对比。是我的,就接收。

适合RPC场合说明

希望同步得到数据的场合,RPC合适;

希望使用简单,则RPC;RPC操作基于接口,使用简单,使用方式模式本地调用。异步的方式编程比较复杂。

不希望客户端受限于服务端的速度等,可以使用Message Queue

RabbitMQ RPC的特点

Message Queue  把所有的请求消息存储起来,然后处理,和客户端解耦;

Message Queue  引用新的结点,系统的可靠性会受Message Queue 结点的影响;

Meaage  Queue 是异步单向的消息,发送消息设计成是不需要等待消息处理的完成。

所以对于有同步返回需求,Message Queue 是个不错的方向

普通RPC的特点

同步调用,对于要等待返回结果、处理结果的场景,RPC是可以非常自然直觉的使用方式,当然RPC也可以异步调用。

由于等待结果,客户端会有线程消耗。

如果以异步RPC的方式使用,客户端线程消耗可以去掉,但不能做到像消息一样暂存消息请求,压力会直接传导到服务端。

代码块

备注(创建两个解决方案:服务端和客户端)

服务端

 static void Main(string[] args)
        {
            using (var channel = GetConnection().CreateModel())
            {
                channel.QueueDeclare("rpc_queue", true, false, false, null);
                channel.BasicQos(0, 1, false);
                var consumer = new EventingBasicConsumer(channel);
                // var consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume("rpc_queue", false, consumer);
                Console.WriteLine("等待 RPC 队列");
                consumer.Received += (model, ea) =>
                {
                    // while (true)
                    // {
                    string response = null;
                    //出列
                    // var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    var body = ea.Body;
                    var props = ea.BasicProperties;                     //内容的基本属性
                    var replyProps = channel.CreateBasicProperties();                    //注意这里的correlationId
                    replyProps.CorrelationId = props.CorrelationId;
                    try
                    {
                        var message = Encoding.UTF8.GetString(body);
                        int n = int.Parse(message);
                        Console.WriteLine("显示内容" + message);
                        response = fib(n).ToString();
                    }
                    catch (Exception e)
                    {

                        Console.WriteLine("报错" + e.ToString());
                        response = "";
                    }
                    finally
                    {
                        var responseBytes = Encoding.UTF8.GetBytes(response);
                        channel.BasicPublish("", props.ReplyTo, replyProps, responseBytes);
                        channel.BasicAck(ea.DeliveryTag, false);
                    }
                    //  };
                };
                Console.WriteLine("发布成功!!!");
                Console.ReadLine();
            }
        }

        /// <summary>
        /// 私有方法
        /// </summary>
        /// <param name="n"></param>
        /// <returns></returns>
        private static int fib(int n)
        {
            if (n == 0 || n == 1)
            {
                return n;
            }
            //Thread.Sleep()方法用于将当前线程休眠一定时间 时间单位是毫秒 1000毫秒= 1秒
            //System.Threading.Thread.Sleep(2000);当前休眠2秒
            //suspen()挂起当前线程。也可以指定挂起时间。
            //close() 关闭当前线程。
            Thread.Sleep(100 * 10);
            return n;
            // return fib(n - 1) + fib(n - 2);
        }

客户端(两个类:Consumer,HelpConnection)

Consumer代码块:

  static void Main(string[] args)
        {
            for (int i = 0; i < 30; i++)
            {
                Stopwatch watch = new Stopwatch();
                watch.Start();
                var rpcClient = new HelpConnection();
                Console.WriteLine("显示内容" + i.ToString());
                var response = rpcClient.Call(i);
                Console.WriteLine("显示内容" + response);
                //当前连接关闭
                rpcClient.Close();
                watch.Stop();
                Console.WriteLine(string.Format(" [x] Requesting complete {0} ,cost {1} ms", i, watch.Elapsed.TotalMilliseconds));
            }
            Console.WriteLine(" complete!!!! ");
            Console.ReadLine();
        }

HelpConnection代码块:

        /// <summary>
        /// 成员变量
        /// </summary>
        private static IConnection connection { get; set; }
        private IModel channel { get; set; }
        private string replyQueueName { get; set; }
        private QueueingBasicConsumer consumer { get; set; }

        /// <summary>
        /// 构造方法:连接配置
        /// </summary>
        public HelpConnection()
        {
            var factory = new ConnectionFactory()
            {
                //计算机名称,账号,密码,
                HostName = "localhost",
                UserName = "zhangguangpo",
                Password = "guangpo1992",
                //RequestedHeartbeat = 60,
                AutomaticRecoveryEnabled = true   //要启用自动连接恢复
            };
            //创建连接
            connection = factory.CreateConnection();
            channel = connection.CreateModel();
            //而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to
            replyQueueName = channel.QueueDeclare().QueueName;
            consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queue: replyQueueName,
                                 noAck: true,
                                 consumer: consumer);
            //  return Connection;
        }

        /// <summary>
        /// 消息判断
        /// </summary>
        /// <param name="message"></param>
        /// <returns></returns>
        public string Call(int message)
        {
            var corrId = Guid.NewGuid().ToString();
            var props = channel.CreateBasicProperties();
            props.ReplyTo = replyQueueName;
            props.CorrelationId = corrId;
            var messageBates = Encoding.UTF8.GetBytes(message.ToString());
            channel.BasicPublish("", "rpc_queue", props, messageBates);
            while (true)
            {
                var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                if (ea.BasicProperties.CorrelationId == corrId)
                {
                    var body = Encoding.UTF8.GetString(ea.Body);
                    return body;
                }
            }
        }

        /// <summary>
        /// 当前连接关闭
        /// </summary>
        public void Close()
        {
            connection.Close();
        }

效果图

  • 博主是利用读书、参考、引用、抄袭、复制和粘贴等多种方式打造成自己的纯镀 24k 文章,请原谅博主成为一个无耻的文档搬运工!
  • 小弟刚迈入博客编写,文中如有不对,欢迎用板砖扶正,希望给你有所帮助。
时间: 2024-08-05 00:05:57

RabbitMQ九:远程过程调用RPC的相关文章

PRC远程过程调用

RPC(Remote Promote Call) 一种进程间通信方式.允许像调用本地服务一样调用远程服务. RPC框架的主要目标就是让远程服务调用更简单.透明.RPC框架负责屏蔽底层的传输方式(TCP或者UDP).序列化方式(XML/JSON/二进制)和通信细节.开发人员在使用的时候只需要了解谁在什么位置提供了什么样的远程服务接口即可,并不需要关心底层通信细节和调用过程. 远程过程调用 (RPC) 是一种协议,程序可使用这种协议向网络中的另一台计算机上的程序请求服务.由于使用 RPC 的程序不必

rabbitMQ学习笔记(七) RPC 远程过程调用

当客户端想要调用服务器的某个方法来完成某项功能时,就可以使用rabbitMQ支持的PRC服务. 其实RPC服务与普通的收发消息的区别不大, RPC的过程其实就是 客户端向服务端定义好的Queue发送消息,其中携带的消息就应该是服务端将要调用的方法的参数 ,并使用Propertis告诉服务端将结果返回到指定的Queue. 示例: 1 package com.zf.rabbitmq07; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.

RabbitMQ - 远程过程调用

试着用RabbitMQ进行RPC. 其实用RabbitMQ搞RPC也没什么特别的.只是我们需要在请求中再加入一个callback queue.比如这样: callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build(); channel.basicPublish("&qu

手动启动“远程过程调用”服务时,出现错误信息1058

有许多朋友在系统启动或者是运行一段时间后Remote Procedure Call (RPC)服务自动停止了,然后手动启动时无法启动,报的错误结果是:Could not start the Remote Procedure Call (RPC) Service.Error 1058:The service cannot be started, either because it is disabled or because it has no enabled devices associated

java 远程调用 RPC

1. 概念 RPC,全称为Remote Procedure Call,即远程过程调用,它是一个计算机通信协议.它允许像调用本地服务一样调用远程服务.它可以有不同的实现方式.如RMI(远程方法调用).Hessian.Http invoker等.RPC是与语言无关的.直观说法就是A通过网络调用B的过程方法.也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数/方法,由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据. 1.首先要解决寻址的

RPC远程过程

(一)RPC的定义:RPC(Remote Procedure Call)-远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议.RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据.在OSI网络通信模型中,RPC跨越了传输层和应用层.RPC使得开发包括网络分布式多程序在内的应用程序更加容易. RPC采用客户机/服务器模式.请求程序就是一个客户机,而服务提供程序就是一个服务器.首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,

RPC-远程过程调用协议

远程过程调用协议 同义词 RPC一般指远程过程调用协议 RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议.RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据.在OSI网络通信模型中,RPC跨越了传输层和应用层.RPC使得开发包括网络分布式多程序在内的应用程序更加容易. RPC采用客户机/服务器模式.请求程序就是一个客户机,而服务提供程序就是一个服务器.

Atitit.分布式远程调用&#160;&#160;rpc &#160;rmi &#160;CORBA的关系

Atitit.分布式远程调用  rpc  rmi  CORBA的关系 1. 远程调用(包括rpc,rmi,rest)1 2. 分布式调用大体上就分为两类,RPC式的,REST式的1 3. RPC(远程过程调用)是什么 1 4. 传输的数据2 5. 序列化与反序列化3 6. ref  谁能用通俗的语言解释一下什么是 RPC 框架? - Java - 知乎.html3 1. 远程调用(包括rpc,rmi,rest) RPC的协议有很多,比如最早的CORBA,Java RMI,Web Service的

XML-RPC远程方法调用

一.简介 XML-RPC的全称是XML Remote Procedure Call,即XML远程方法调用. 它是一套允许运行在不同操作系统.不同环境的程序实现基于Internet过程调用的规范和一系列的实现. 这种远程过程调用使用http作为传输协议,XML作为传送信息的编码格式. Xml-Rpc的定义尽可能的保持了简单,但同时能够传送.处理.返回复杂的数据结构. xml rpc使用http协议做为传输协议的rpc机制,使用xml文本的方式传输命令和数据. 一个rpc系统,必然包括2个部分: 1