RabbitMQ (消息队列)专题学习07 RPC

(使用Java客户端)

一、概述

在Work Queue的章节中我们学习了如何使用Work Queue分配耗时的任务给多个工作者,但是如果我们需要运行一个函数在远程计算机上,这是一个完全不同的情景,这种模式通常被称之为RPC。

在本章节的学习中,我们将使用RabbitMQ来构建一个RPC系统:一个远程客户端和一个可扩展的RPC服务器,我们没有任何费时的任务进行分配,我们将创建一个虚拟的RPC服务返回Fibonacci数。

1.1、客户端接口(Client Interface)

为了说明一个RPC服务可以使用,我们将创建一个简单的客户端类,这将通过方法名的调用发送一个RPC请求和接收块得到答复:

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

注意:

尽管RPC是计算机领域中非常普遍的模式,它经常受到批评,当程序不知道是否这是一个缓慢的RPC调用函数,像在不可预知接口的系统进行调试,增加了不必要的复杂性,而不是简化软件,滥用会导致不可修复的代码,如果要使用它记住考虑以下建议:

1、能明确区分被调用的函数是局部的还是远程的。

2、您的文件系统、组件之间的依赖关系是很清晰的。

3、处理问题?客户应该知道当RPC服务器挂掉的时候该如何做。

1.2、回调队列(Callback Queue)

总的说来使用RabbitMQ来实现RPC是比较简单的,当客户端发送请求消息和服务器响应消息的答复,为了接收到响应我们需要发送一个callback队列地址在请求中,我们可以使用默认的队列,让我们试试:

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the callback_queue ...

说明:

AMQP协议对一个消息预设了一个14个属性的集合,大部分属性很少被使用,有以下例外:

1、deliveryMode:标志一个消息的持久化(值为2)或者状态(其他任何值)。

2、contentType:用于描述编码的MIME类型,比如,要经常使用JSON编码大的一个好的设置方法为application/json

3、replyTo:常用的回调队列名称。

4、correlationId:被用于一个RPC相应请求相关。

同时我们需要一个新的类:

import com.rabbitmq.client.AMQP.BasicProperties;

1.3、相关ID (correlation Id)

在上述方法我们为每个RPC请求创建一个回调队列,那是很低效的但是幸运的是有一个更好的方式,让我们创建一个单一回调队列供每个客户端调用。

这出现了一个新的问题,在队列中接收到一个不清楚这个请求属于哪个响应时的响应,我们要将它设置为每个请求的一个特有的值,然后从一个回调队列中接收一个消息时就要查看这个属性值,在此基础上,我们将能匹配一个请求的响应,如果我们看到一个未知的correlationId值,我们可以安全地将这些消息丢弃因为它不属于我们的要求。

你也许会问,我们为什么要丢弃回调队列中未知的消息呢?而不是一个错误引起的失败呢?这是由于一个可能在服务器的竞争引起的,虽然不太可能,但是它还是有可能发生的,RPC服务器在给我们大答复之后将挂掉,但是发送确认消息的请求,如果这种情况发生,将再次重启RPC服务器处理请求,这就是为什么在客户端必须处理重复的响应。

二、实现

2.1、结构如下图所示:

从上图可知,我们RPC工作流程如下:

1、当客户端启动时,它创建一个匿名的独立的回调队列。

2、一个RPC请求中,客户端发送一个消息具有两个特性:replyTo它包含将要到达的回调队列和correlation_id,这是每个请求的一个固有的值。

3、请求发送到一个rpc_queue队列。

4、RPC服务器正在等待队列的请求,当一个请求到达时,它的工作久是发送一个消息结果返回给客户端,使用了replyTo队列。

5、客户端等待回调replyTo队里的数据,当消息出现时,它检查correlationId值是否和请求返回给应用程序响应的值匹配。

2.2、代码实现

Fibonacci 函数:

private static int fib(int n) throws Exception {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
}

我们声明的Fibonacci函数,它假定只有有效的整整输入(别指望一个大的数字,它可能是最慢的递归实现),我们RPC服务器RPCServer.java代码如下:

private static final String RPC_QUEUE_NAME = "rpc_queue";

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

System.out.println(" [x] Awaiting RPC requests");

while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();

    BasicProperties props = delivery.getProperties();
    BasicProperties replyProps = new BasicProperties
                                     .Builder()
                                     .correlationId(props.getCorrelationId())
                                     .build();

    String message = new String(delivery.getBody());
    int n = Integer.parseInt(message);

    System.out.println(" [.] fib(" + message + ")");
    String response = "" + fib(n);

    channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());

    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

说明:

1、像之前所有实例一样,开始建立连接、通道和队列

2、可能要运行多个服务器进程,为了传播同样的负载在多个服务器上,我们需要通过channel.basicQos来设置prefetchcount值。

3、通过basicConsume访问队列,然后进入循环,等待请求消息、处理消息和发送响应。

RPCClient.java代码如下:

private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;

public RPCClient() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    connection = factory.newConnection();
    channel = connection.createChannel();

    replyQueueName = channel.queueDeclare().getQueue();
    consumer = new QueueingConsumer(channel);
    channel.basicConsume(replyQueueName, true, consumer);
}

public String call(String message) throws Exception {
    String response = null;
    String corrId = java.util.UUID.randomUUID().toString();

    BasicProperties props = new BasicProperties
                                .Builder()
                                .correlationId(corrId)
                                .replyTo(replyQueueName)
                                .build();

    channel.basicPublish("", requestQueueName, props, message.getBytes());

    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        if (delivery.getProperties().getCorrelationId().equals(corrId)) {
            response = new String(delivery.getBody());
            break;
        }
    }

    return response;
}

public void close() throws Exception {
    connection.close();
}

说明:

1、建立一个连接通道和声明一个专属的回调队列的回复。

2、订阅回调队列,这样可以接受RPC响应

3、调用方法发起实际的RPC请求。

4、生成一个唯一的correlationId并且保存它,while循环将使用这个值来匹配相对应的响应。

5、发送请求消息,它有两个属性值relpTo和correlationId。

6、等待相匹配的响应。

7、while循环做简单的工作,为每个响应检查是否correlationId就是我们需要的,如果是,保存该响应。

8、返回响应给客户端。

客户端请求代码:

RPCClient fibonacciRpc = new RPCClient();
System.out.println(" [x] Requesting fib(30)");
String response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");
fibonacciRpc.close();

2.3、完整的代码清单

RPCClient.java

package com.xuz.rpc;

import java.util.UUID;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;

public class RPCClient {
	private Connection connection;
	private Channel channel;
	private String requestQueueName = "rpc_queue";
	private String replyQueueName;
	private QueueingConsumer consumer;

	public RPCClient() throws Exception {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		connection = factory.newConnection();
		channel = connection.createChannel();
		//响应队列名,服务端会把返回的信息发送到这个队列中。
		replyQueueName = channel.queueDeclare().getQueue();
		consumer = new QueueingConsumer(channel);
		channel.basicConsume(replyQueueName, true, consumer);
	}

	public String call(String message) throws Exception {
		String response = null;
		//每个请求生成一个唯一的correlationId
		String corrId = UUID.randomUUID().toString();
		//设置请求响应基本参数:correlationId(UUID)和rpc_queue
		BasicProperties props = new BasicProperties.Builder().correlationId(
				corrId).replyTo(replyQueueName).build();
		System.out.println("客户端响应队列的属性:["+props.getCorrelationId()+","
				+props.getReplyTo()+"]");
		channel.basicPublish("", requestQueueName, props, message.getBytes());
		while (true) {
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			//如果获得响应队列中的getCorrelationId和当前corrId相等,则保存响应并返回
			if (delivery.getProperties().getCorrelationId().equals(corrId)) {
				response = new String(delivery.getBody(), "UTF-8");
				break;
			}
		}
		return response;
	}
	/**
	 * 关闭连接
	 * @throws Exception
	 */
	public void close() throws Exception {
		connection.close();
	}

	public static void main(String[] argv) {
		RPCClient rpcClient = null;
		String response = null;
		try {
			rpcClient = new RPCClient();
			//调用Call方法传入请求消息:测试RPC
			response = rpcClient.call("测试RPC");
			System.out.println(" 响应消息:[" + response + "]");
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (rpcClient != null) {
				try {
					rpcClient.close();
				} catch (Exception ignore) {
				}
			}
		}
	}
}

RPCServer.java:

package com.xuz.rpc;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;

public class RPCServer {
	private static final String RPC_QUEUE_NAME = "rpc_queue";
	/**
	 * 定义函数
	 * @param n 输入的正整数
	 * @return
	 */
	private static int fib(int n) {
		if (n == 0)
			return 0;
		if (n == 1)
			return 1;
		return fib(n - 1) + fib(n - 2);
	}

	public static void main(String[] argv) {
		Connection connection = null;
		Channel channel = null;
		try {
			//获取连接工厂
			ConnectionFactory factory = new ConnectionFactory();
			//设定主机
			factory.setHost("127.0.0.1");
			//创建连接
			connection = factory.newConnection();
			//创建通道
			channel = connection.createChannel();
			//声明RPC队列
			channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
			//设置公平调度
			channel.basicQos(1);
			QueueingConsumer consumer = new QueueingConsumer(channel);
			channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
			System.out.println("[等待RPC远程请求!]");
			while (true) {
				String response = null;
				System.out.println("[服务端等待接收消息!]");
				QueueingConsumer.Delivery delivery = consumer.nextDelivery();
				System.out.println("[服务端成功接收消息!]");
				BasicProperties props = delivery.getProperties();
				//从响应队列获取reply参数
				BasicProperties replyProps = new BasicProperties.Builder()
						.correlationId(props.getCorrelationId()).build();
				System.out.println("服务端响应队列的属性:["+replyProps.getCorrelationId()+"]");
				try {
					String message = new String(delivery.getBody(), "UTF-8");
					response = "服务端已经处理了消息:[" + message+"]";
				} catch (Exception e) {
					System.out.println(" [.] " + e.toString());
					response = "";
				} finally {
					//将结果返回给客户端
					channel.basicPublish("", props.getReplyTo(), replyProps,
							response.getBytes("UTF-8"));
					//设置确认消息
					channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
							false);
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (Exception ignore) {
				}
			}
		}
	}
}

2.4、RPC测试

1、运行RPCClient发送响应请求:

2、运行PRCServer接收处理响应请求:

源码下载:

RabbitMQ RPC源码

RabbitMQ (消息队列)专题学习07 RPC,布布扣,bubuko.com

时间: 2024-12-20 03:22:14

RabbitMQ (消息队列)专题学习07 RPC的相关文章

RabbitMQ消息队列(十)RPC应用2

基于RabbitMQ RPC实现的主机异步管理 地址原文:http://blog.51cto.com/baiying/2065436,作者大大,我把原文贴出来了啊.不要告我 [email protected]:~/workspace# tree ManageHost/ ManageHost/ ├── environment │   ├── base_dir.py │   ├── base_dir.pyc │   └── __init__.py ├── README.md ├── RPC_Clie

(转)RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC)

在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成.那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会通过其它节点求来斐波纳契完成示例. 1. 客户端接口 Client interface 为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class. 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞知道收到RPC运算的结果.代码如下: [python] vie

RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC) [转]

在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成.那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会通过其它节点求来斐波纳契完成示例. 1. 客户端接口 Client interface 为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class. 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞知道收到RPC运算的结果.代码如下: [python] vie

RabbitMQ 消息队列 应用

安装参考    详细介绍   学习参考 RabbitMQ 消息队列 RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统.他遵循Mozilla Public License开源协议. MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.

(转)RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message

RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message

RabbitMQ消息队列1: Detailed Introduction 详细介绍

1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现.AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco .Redhat.iMatix 等联合制定了 AMQP 的公开标

RabbitMQ消息队列应用

RabbitMQ消息队列应用 消息通信组件Net分布式系统的核心中间件之一,应用与系统高并发,各个组件之间解耦的依赖的场景.本框架采用消息队列中间件主要应用于两方面:一是解决部分高并发的业务处理:二是通过消息队列传输系统日志.目前业界使用较多的消息队列组件有RabbitMQ.ActiveMQ.MSMQ.kafka.zeroMQ等,本文对系统架构之MQ Component诠释,并采用RabbitMQ作为消息队列中间件. 图1- 消息队列组件示意图 一.RabbitMQ介绍 RabbitMQ是一款基

Nginx通过LUA脚本访问RabbitMQ消息队列

发现了一个Nginx的LUA脚本:lua-resty-rabbitmqstomp,可以让Nginx通过LUA脚本访问RabbitMQ消息队列,这个脚本是通过stomp协议连接RabbitMQ的stomp适配器,来pub/sub消息的 关于RabbitMQ-STOMP安装使用相关内容可以参见:RabbitMQ STOMP Adapter 关于Nginx-LUA模块安装使用参见:LAMP架构演进到LAMPGC,再演进到LNMLGC 关于STOMP协议相关资料参见这里: STOMP官方英文协议1.1版