RabbitMQ - 远程过程调用

试着用RabbitMQ进行RPC。

其实用RabbitMQ搞RPC也没什么特别的。
只是我们需要在请求中再加入一个callback queue。
比如这样:

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

剩下的工作就是等待对方处理完成再从callback队列中读取响应消息。

上面用到了BasicProperties。
(注意:是com.rabbitmq.client.AMQP.BasicProperties 不是 com.rabbitmq.client.BasicProperties)
关于Message properties,AMQP协议为消息预定义了14种属性。

        private String contentType;
        private String contentEncoding;
        private Map<String,Object> headers;
        private Integer deliveryMode;
        private Integer priority;
        private String correlationId;
        private String replyTo;
        private String expiration;
        private String messageId;
        private Date timestamp;
        private String type;
        private String userId;
        private String appId;
        private String clusterId;

通常我们只需要使用其中一小部分:
·deliveryMode: 将消息设置为持久或者临时,2为持久,其余为临时。
·contentType: 指定mime-type,比如要使用JSON就是application/json
·replyTo: 指定callback queue的名字
·correlationId: 用来关联RPC请求和响应的标识。
上面那段代码中就是用到了correlationId。

另外需要说明这个correlationId。
其实在上面的代码中我们为每一个RPC请求都创建了一个回调队列。
但这样明显不效率,我们可以为每一个客户端只创建一个回调队列。

但这样我们又需要考虑另一个问题:<当我们将收到的消息放到队列时,如何确定该消息是属于哪个请求?>

这时我们可以使用correlationId解决这个问题。
我们可以用它来为每一个请求加上标识,获取信息时对比这个标识,以对应请求和响应。
如果我们收到了无法识别的correlationId,即该响应不与任何请求匹配,那么这个消息将会废除。

好了,代码比较简单。

class RPCServer{
    private static final String RPC_QUEUE_NAME = "rpc_queue";
    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

        System.out.println(" [x] Awaiting RPC requests");

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();

            BasicProperties props = delivery.getProperties();
            BasicProperties replyProps = new BasicProperties
                    .Builder()
                    .correlationId(props.getCorrelationId())
                    .build();

            String message = new String(delivery.getBody());
            int n = Integer.parseInt(message);

            System.out.println(" [.] fib(" + message + ")");
            String response = "" + fib(n);

            channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }

    }

    private static int fib(int n) throws Exception {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n-1) + fib(n-2);
    }
}

由于是共享队列,这里我们就不用exchange和routing了。
另外,有时我们可能需要运行多个服务,为了让多个服务端负载均衡,我们可以使用prefetchCount。
这个属性在之前任务队列的例子里也用过,也就是

workerChannel.basicQos(1);

即让多个worker一次获取一个任务。
用basicConsume方法进入队列后循环等待请求,发现有请求到达时根据队列和CorrelationId对相应请求作出响应。

另外需要注意的一点,server中basicConsume的第二个参数是false。
其意义为是否自动作出回应,即:
true if the server should consider messages acknowledged once delivered; false if the server should expect explicit acknowledgements
于是循环时需要显示调用basicAck进行回应。

class RPCClient{

    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;
    private QueueingConsumer consumer;

    public RPCClient() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
        channel = connection.createChannel();

        replyQueueName = channel.queueDeclare().getQueue();
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyQueueName, true, consumer);
    }

    public String call(String message) throws Exception {
        String response = null;
        String corrId = java.util.UUID.randomUUID().toString();

        BasicProperties props = new BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes());

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response = new String(delivery.getBody());
                break;
            }
        }

        return response;
    }

    public void close() throws Exception {
        connection.close();
    }
}

callback队列只是一个匿名队列,但切记需要将其设置到BasicProperties中。
corrId的生成方法有很多种,在这里使用UUID。
call方法中通过调用basicPublish进行RPC请求,参数中带着BasicProperties。

时间: 2024-10-11 13:00:54

RabbitMQ - 远程过程调用的相关文章

RabbitMQ九:远程过程调用RPC

定义 RPC(Remote Procedure Call Protocol)--远程过程调用协议:它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议.RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据.在OSI网络通信模型中,RPC跨越了传输层和应用层.RPC使得开发包括网络分布式多程序在内的应用程序更加容易. PRC采用客户端/服务端模式,请求程序就是一个客户机,而服务提供就是一个服务器.首先,客户机调用进程发送一个有进程参数的调用信息到服务

rabbitMQ学习笔记(七) RPC 远程过程调用

当客户端想要调用服务器的某个方法来完成某项功能时,就可以使用rabbitMQ支持的PRC服务. 其实RPC服务与普通的收发消息的区别不大, RPC的过程其实就是 客户端向服务端定义好的Queue发送消息,其中携带的消息就应该是服务端将要调用的方法的参数 ,并使用Propertis告诉服务端将结果返回到指定的Queue. 示例: 1 package com.zf.rabbitmq07; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.

PRC远程过程调用

RPC(Remote Promote Call) 一种进程间通信方式.允许像调用本地服务一样调用远程服务. RPC框架的主要目标就是让远程服务调用更简单.透明.RPC框架负责屏蔽底层的传输方式(TCP或者UDP).序列化方式(XML/JSON/二进制)和通信细节.开发人员在使用的时候只需要了解谁在什么位置提供了什么样的远程服务接口即可,并不需要关心底层通信细节和调用过程. 远程过程调用 (RPC) 是一种协议,程序可使用这种协议向网络中的另一台计算机上的程序请求服务.由于使用 RPC 的程序不必

手动启动“远程过程调用”服务时,出现错误信息1058

有许多朋友在系统启动或者是运行一段时间后Remote Procedure Call (RPC)服务自动停止了,然后手动启动时无法启动,报的错误结果是:Could not start the Remote Procedure Call (RPC) Service.Error 1058:The service cannot be started, either because it is disabled or because it has no enabled devices associated

RPC-远程过程调用协议

远程过程调用协议 同义词 RPC一般指远程过程调用协议 RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议.RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据.在OSI网络通信模型中,RPC跨越了传输层和应用层.RPC使得开发包括网络分布式多程序在内的应用程序更加容易. RPC采用客户机/服务器模式.请求程序就是一个客户机,而服务提供程序就是一个服务器.

dubbo_远程同步调用原理

Dubbo缺省协议采用单一长连接和NIO异步通讯,适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况. Dubbo缺省协议,使用基于mina1.1.7+hessian3.2.1的tbremoting交互. 连接个数:单连接 连接方式:长连接 传输协议:TCP 传输方式:NIO异步传输 序列化:Hessian二进制序列化 适用范围:传入传出参数数据包较小(建议小于100K),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用dubbo协议传输大文件或超大字符串

RPC远程过程

(一)RPC的定义:RPC(Remote Procedure Call)-远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议.RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据.在OSI网络通信模型中,RPC跨越了传输层和应用层.RPC使得开发包括网络分布式多程序在内的应用程序更加容易. RPC采用客户机/服务器模式.请求程序就是一个客户机,而服务提供程序就是一个服务器.首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,

rabbitmq学习(四):利用rabbitmq实现远程rpc调用

一.rabbitmq实现rpc调用的原理 ·rabbitmq实现rpc的原理是:客户端向一个队列中发送消息,并注册一个回调的队列用于接收服务端返回的消息,该消息需要声明一个叫做correaltionId的属性,该属性将是该次请求的唯一标识.服务端在接受到消息(在需要时可以验证correaltionId)后,处理消息,并将消息发送到客户端注册的回调队列中.原理图如下: 二.代码实现 下面我们将模拟实现一个rpc客户端和rpc服务端.客户端给服务端发送message,服务端收到后处理message,

无法自动进入单步执行服务器。未能调试远程过程。这通常说明未在服务器上启动调试

在进行UI界面修改的时候,出现了一个很大的问题,就是"无法自动进入单步执行服务器.未能调试远程过程.这通常说明未在服务器上启动调试",在进行远程调用的WCF调用的时候,出现的问题. 问题出现的原因: 是因为我们在源代码中修改了一些东西后,但在客户端调用的时候,一般使用配置文件中对WCF服务进行调用的: <endpoint address="http://localhost:5734/Service.svc?wsdl" binding="basicHt