rabbitmq学习(四):利用rabbitmq实现远程rpc调用

一、rabbitmq实现rpc调用的原理

·rabbitmq实现rpc的原理是:客户端向一个队列中发送消息,并注册一个回调的队列用于接收服务端返回的消息,该消息需要声明一个叫做correaltionId的属性,该属性将是该次请求的唯一标识。服务端在接受到消息(在需要时可以验证correaltionId)后,处理消息,并将消息发送到客户端注册的回调队列中。原理图如下:  

  

二、代码实现

  下面我们将模拟实现一个rpc客户端和rpc服务端。客户端给服务端发送message,服务端收到后处理message,再将处理后的消息返给客户端

  rpc客户端

  

/**
 * rpc客户端
 */
public class RpcClient {
    //发送消息的队列名称
    private static final String RPC_QUEUE_NAME = "rpc_queue";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        Connection connection = null;
        Channel channel = null;
        try {
           connection = connectionFactory.newConnection();
           channel = connection.createChannel();
           //创建回调队列
           String callbackQueue = channel.queueDeclare().getQueue();
           //创建回调队列,消费者从回调队列中接收服务端传送的消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(callbackQueue,true,consumer);

            //创建消息带有correlationId的消息属性
            String correlationId = UUID.randomUUID().toString();
            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(correlationId).replyTo(callbackQueue).build();
            String message = "hello rabbitmq";
            channel.basicPublish("",RPC_QUEUE_NAME,basicProperties,message.getBytes());
            System.out.println("RpcClient send message " + message + ", correaltionId = " + correlationId);

            //接收回调消息
            while (true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String receivedCorrelationId = delivery.getProperties().getCorrelationId();
                if(correlationId.equals(receivedCorrelationId)){
                    System.out.println("RpcClient receive format message " + new String(delivery.getBody(), "UTF-8") + ", correaltionId = " + correlationId);
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }
}

  rpc服务端

  

/**
 * rpc服务器
 */
public class RpcServer {
    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static String format(String message){
        return "......" + message + "......";
    }

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        Connection connection = null;
        try {
            connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //声明消费者预取的消息数量
            channel.basicQos(1);
            channel.basicConsume(RPC_QUEUE_NAME,false,consumer);//采用手动回复消息
            System.out.println("RpcServer waitting for receive message");

            while (true){
                //接收并处理消息
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("RpcServer receive message " + message);
                String response = format(message);
                //确认收到消息
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

                //取出消息的correlationId
                AMQP.BasicProperties properties = delivery.getProperties();
                String correlationId = properties.getCorrelationId();

                //创建具有与接收消息相同的correlationId的消息属性
                AMQP.BasicProperties replyProperties = new AMQP.BasicProperties().builder().correlationId(correlationId).build();
                channel.basicPublish("",properties.getReplyTo(),replyProperties,response.getBytes());
            }

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  先运行服务端,再运行客户端,结果如下:

  RpcClient

  

  RpcServer

  

  

原文地址:https://www.cnblogs.com/wutianqi/p/10055391.html

时间: 2024-10-09 03:09:53

rabbitmq学习(四):利用rabbitmq实现远程rpc调用的相关文章

RabbitMQ学习(六):远程结果调用

场景:我们需要在传输消息时得到结果 客服端在发送请求时会发送回调队列,服务端处理事情完成后会将结果返回到回调队列中,在增加关联标志关联每个请求和服务返回 客户端代码: public class RPCClient { private final static String RPC_Queue_Name = "rpc_queue"; public static void main(String[] args) throws IOException, TimeoutException, I

RabbitMQ学习四

参考文章: http://previous.rabbitmq.com/v3_3_x/tutorials/tutorial-three-python.html

RabbitMQ学习总结

原文:RabbitMQ学习总结 关于RabbitMQ是什么以及它的概念,不了解的可以先查看一下下面推荐的几篇博客 https://blog.csdn.net/whoamiyang/article/details/54954780 https://www.cnblogs.com/frankyou/p/5283539.html https://blog.csdn.net/mx472756841/article/details/50815895 官网介绍:http://www.rabbitmq.com

RabbitMQ学习和使用

RabbitMQ学习和使用 RabbitMQ介绍 MQ全称Message Queue 消息队列,RabbitMQ是基于AMQP(高级消息队列协议)实现的.消息队列通常用以应用之间相互通信,解决同步问题.MQ是典型的生产者消费者模型,RabbitMQ最常用的三种模式是点对点模式.发布订阅模式.广播模式. RabbitMQ is a message-queueing software called a message broker or queue manager. Simply said; It

RabbitMQ学习之旅(一)

RabbitMQ学习总结(一) RabbitMQ简介 RabbitMQ是一个消息代理,其接收并转发消息.类似于现实生活中的邮局:你把信件投入邮箱的过程,相当于往队列中添加信息,因为所有邮箱中的信件最终都会汇集到邮局中:当邮递员把你的新建发送给收件人的时候,相当于消息的转发. RabbitMQ中的常见术语 生产者(Provider):生产者负责生产消息,并将其发送到消息队列中 队列(Queue):消息代理(Proxy)角色,从生产者那里接收消息,并将其转发到消费者进行消费.队列主要受限于主机的内存

自研发RPC调用框架

自主研发设计RPC远程调用框架,实现服务自动注册,服务发现,远程RPC调用,后续实现服务负载均衡 主要包括:客户端服务,服务端,服务发现,服务注册 github地址:https://github.com/btshoutn/rpc_project 原文地址:https://www.cnblogs.com/shoutn/p/8297345.html

rabbitMQ学习笔记(七) RPC 远程过程调用

当客户端想要调用服务器的某个方法来完成某项功能时,就可以使用rabbitMQ支持的PRC服务. 其实RPC服务与普通的收发消息的区别不大, RPC的过程其实就是 客户端向服务端定义好的Queue发送消息,其中携带的消息就应该是服务端将要调用的方法的参数 ,并使用Propertis告诉服务端将结果返回到指定的Queue. 示例: 1 package com.zf.rabbitmq07; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.

RabbitMQ九:远程过程调用RPC

定义 RPC(Remote Procedure Call Protocol)--远程过程调用协议:它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议.RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据.在OSI网络通信模型中,RPC跨越了传输层和应用层.RPC使得开发包括网络分布式多程序在内的应用程序更加容易. PRC采用客户端/服务端模式,请求程序就是一个客户机,而服务提供就是一个服务器.首先,客户机调用进程发送一个有进程参数的调用信息到服务

利用RabbitMQ实现RPC(python)

RPC--远程过程调用,通过网络调用运行在另一台计算机上的程序的函数\方法,是构建分布式程序的一种方式.RabbitMQ是一个消息队列系统,可以在程序之间收发消息.利用RabbitMQ可以实现RPC.本文所有操作都是在CentOS7.3上进行的,示例代码语言为Python. RabbiMQ以及pika模块安装 yum install rabbitmq-server python-pika -y systemctl    start rabbitmq-server RPC的基本实现 RPC的服务端