柯南君:看大数据时代下的IT架构(9)消息队列之RabbitMQ--案例(RPC起航)

二、Remote procedure call (RPC)(using the Java client)

三、Client interface(客户端接口)

为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class。 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞,直到收到RPC运算的结果。代码如下:

  1. fibonacci_rpc = FibonacciRpcClient()
  2. result = fibonacci_rpc.call(4)
  3. print "fib(4) is %r" % (result,)
四、 总体来说,在RabbitMQ进行RPC远程调用是比较容易的。client发送请求的Message然后server返回响应结果。为了收到响应client在publish message时需要提供一个”callback“(回调)的queue地址。code如下:

[java] view plaincopyprint?

  1. result = channel.queue_declare(exclusive=True)
  2. callback_queue = result.method.queue
  3. channel.basic_publish(exchange=‘‘,
  4. routing_key=‘rpc_queue‘,
  5. properties=pika.BasicProperties(
  6. reply_to = callback_queue,
  7. ),
  8. body=request)

Message properties

AMQP 预定义了14个属性。它们中的绝大多很少会用到。以下几个是平时用的比较多的:

  • delivery_mode: 持久化一个Message(通过设定值为2)。其他任意值都是非持久化。
  • content_type: 描述mime-type 的encoding。比如设置为JSON编码:设置该property为application/json。
  • reply_to: 一般用来指明用于回调的queue(Commonly used to name a callback queue)。
  • correlation_id: 在请求中关联处理RPC响应(correlate RPC responses with requests)。
四、Correlation Id  在上个小节里,实现方法是对每个RPC请求都会创建一个callback queue。这是不高效的。幸运的是,在这里有一个解决方法:为每个client创建唯一的callback queue。

这又有其他问题了:收到响应后它无法确定是否是它的,因为所有的响应都写到同一个queue了。上一小节的correlation_id在这种情况下就派上用场了:对于每个request,都设置唯一的一个值,在收到响应后,通过这个值就可以判断是否是自己的响应。如果不是自己的响应,就不去处理。

五、(总结)
工作流程:
  • 当客户端启动时,它创建了匿名的exclusive callback queue.
  • 客户端的RPC请求时将同时设置两个properties: reply_to设置为callback queue;correlation_id设置为每个request一个独一无二的值.
  • 请求将被发送到an rpc_queue queue.
  • RPC端或者说server一直在等待那个queue的请求。当请求到达时,它将通过在reply_to指定的queue回复一个message给client。
  • client一直等待callback queue的数据。当message到达时,它将检查correlation_id的值,如果值和它request发送时的一致那么就将返回响应。
六、
Putting it all together
  1. private static int fib(int n) throws Exception {
  2. if (n == 0) return 0;
  3. if (n == 1) return 1;
  4. return fib(n-1) + fib(n-2);
  5. }
 RPCServer.java :

private static final String RPC_QUEUE_NAME = "rpc_queue";  

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");  

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();  

channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);  

channel.basicQos(1);  

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);  

System.out.println(" [x] Awaiting RPC requests");  

while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();  

    BasicProperties props = delivery.getProperties();
    BasicProperties replyProps = new BasicProperties
                                     .Builder()
                                     .correlationId(props.getCorrelationId())
                                     .build();  

    String message = new String(delivery.getBody());
    int n = Integer.parseInt(message);  

    System.out.println(" [.] fib(" + message + ")");
    String response = "" + fib(n);  

    channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());  

    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} 
服务器代码相当简单:
  • 像往常一样,我们首先建立连接、通道和声明队列。
  • 我们可能想要运行多个服务器进程。为了分散负载同样在多个服务器,我们需要设置在channel.basicQos prefetchCount设置。
  • 我们使用basicConsume访问队列。然后我们进入while循环,我们等待请求消息,并发送响应工作。

RPCClient.java:

    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;
    private QueueingConsumer consumer;  

    public RPCClient() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
        channel = connection.createChannel();  

        replyQueueName = channel.queueDeclare().getQueue();
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyQueueName, true, consumer);
    }  

    public String call(String message) throws Exception {
        String response = null;
        String corrId = java.util.UUID.randomUUID().toString();  

        BasicProperties props = new BasicProperties
                                    .Builder()
                                    .correlationId(corrId)
                                    .replyTo(replyQueueName)
                                    .build();  

        channel.basicPublish("", requestQueueName, props, message.getBytes());  

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response = new String(delivery.getBody());
                break;
            }
        }  

        return response;
    }  

    public void close() throws Exception {
        connection.close();
    }  
客户端代码部分涉及到:
  • 我们建立了一个"connecttion"(连接) 和 "channel"(通道)并且为replies(回复)声明一个独一无二的"callback"(回调);
  • 我们订阅了"callback"(回调)队列,这样我们就可以收到RPC的回应了;
  • 我们调用的方法是实际的RPC;
  • 接下来我们publish(发布)请求消息,有两个属性,分别是:replyTo 和 correlationId;
  • 在这点,我们可以坐下来,直到适当的响应到达;
  • while循环做了一件非常简单的工作,它会检查每一个消息响应,如果当前的最后,我们将响应给用户;

客户端请求:

 

    RPCClient fibonacciRpc = new RPCClient();  

    System.out.println(" [x] Requesting fib(30)");
    String response = fibonacciRpc.call("30");
    System.out.println(" [.] Got ‘" + response + "‘");  

    fibonacciRpc.close();  


现在是时候,该看看我们的整体完整的示例源代码了:RPCClent.java(包括基本的异常处理)和RPCServer.java,像往常一样编译和设置路径(可以参考前面的教程)

 

RPCClient.java:

    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import java.util.UUID;  

    public class RPCClient {  

      private Connection connection;
      private Channel channel;
      private String requestQueueName = "rpc_queue";
      private String replyQueueName;
      private QueueingConsumer consumer;  

      public RPCClient() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
        channel = connection.createChannel();  

        replyQueueName = channel.queueDeclare().getQueue();
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyQueueName, true, consumer);
      }  

      public String call(String message) throws Exception {
        String response = null;
        String corrId = UUID.randomUUID().toString();  

        BasicProperties props = new BasicProperties
                                    .Builder()
                                    .correlationId(corrId)
                                    .replyTo(replyQueueName)
                                    .build();  

        channel.basicPublish("", requestQueueName, props, message.getBytes());  

        while (true) {
          QueueingConsumer.Delivery delivery = consumer.nextDelivery();
          if (delivery.getProperties().getCorrelationId().equals(corrId)) {
            response = new String(delivery.getBody(),"UTF-8");
            break;
          }
        }  

        return response;
      }  

      public void close() throws Exception {
        connection.close();
      }  

      public static void main(String[] argv) {
        RPCClient fibonacciRpc = null;
        String response = null;
        try {
          fibonacciRpc = new RPCClient();  

          System.out.println(" [x] Requesting fib(30)");
          response = fibonacciRpc.call("30");
          System.out.println(" [.] Got ‘" + response + "‘");
        }
        catch  (Exception e) {
          e.printStackTrace();
        }
        finally {
          if (fibonacciRpc!= null) {
            try {
              fibonacciRpc.close();
            }
            catch (Exception ignore) {}
          }
        }
      }
    }<strong>
    </strong>  


RPCServer.java:

 

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;

public class RPCServer {

  private static final String RPC_QUEUE_NAME = "rpc_queue";

  private static int fib(int n) {
    if (n ==0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
  }

  public static void main(String[] argv) {
    Connection connection = null;
    Channel channel = null;
    try {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");

      connection = factory.newConnection();
      channel = connection.createChannel();

      channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

      channel.basicQos(1);

      QueueingConsumer consumer = new QueueingConsumer(channel);
      channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

      System.out.println(" [x] Awaiting RPC requests");

      while (true) {
        String response = null;

        QueueingConsumer.Delivery delivery = consumer.nextDelivery();

        BasicProperties props = delivery.getProperties();
        BasicProperties replyProps = new BasicProperties
                                         .Builder()
                                         .correlationId(props.getCorrelationId())
                                         .build();

        try {
          String message = new String(delivery.getBody(),"UTF-8");
          int n = Integer.parseInt(message);

          System.out.println(" [.] fib(" + message + ")");
          response = "" + fib(n);
        }
        catch (Exception e){
          System.out.println(" [.] " + e.toString());
          response = "";
        }
        finally {
          channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));

          channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
      }
    }
    catch  (Exception e) {
      e.printStackTrace();
    }
    finally {
      if (connection != null) {
        try {
          connection.close();
        }
        catch (Exception ignore) {}
      }
    }
  }
}

 

$ javac -cp rabbitmq-client.jar RPCClient.java RPCServer.java

我们的RPC service现在准备好了,我们开始启动server:

$ java -cp $CP RPCServer
 [x] Awaiting RPC requests

发布一个fibonacci 数字,运行在client(客户端):

$ java -cp $CP RPCClient
 [x] Requesting fib(30)

本节提供的设计并不是唯一的RPC服务实现,但它还是有一定的优点的:

  • 如果RPC server(服务器)太慢了,你仅仅需要运行另一个,就可以扩展;尝试在新的控制台,运行第二个吧;
  • 在客户端,RPC需要发送和接收的消息只有一个,不需要像queueDeclare 同步调用,因为RPC客户端为了一个RPC请求,只需要一个网络往返;

我们的代码依然很简单,不试图去解决更加繁杂的问题,但是非常重要,像以下这样:

  • 如果没有服务运行,客户端将怎么去做?
  • 客户端应该有RPC超时么?
  • 如果服务器出现故障,爆出一个异常,应该发给客户端么?
  • 防止传入错误的消息(如范围检查、类型)前处理
时间: 2024-10-08 22:52:52

柯南君:看大数据时代下的IT架构(9)消息队列之RabbitMQ--案例(RPC起航)的相关文章

柯南君:看大数据时代下的IT架构(5)消息队列之RabbitMQ--案例(Work Queues起航)

一.回顾 让我们回顾一下,在上几章里都讲了什么?总结如下: <柯南君:看大数据时代下的IT架构(1)业界消息队列对比> <柯南君:看大数据时代下的IT架构(2)消息队列之RabbitMQ-基础概念详细介绍> <柯南君:看大数据时代下的IT架构(3)消息队列之RabbitMQ-安装.配置与监控> <柯南君:看大数据时代下的IT架构(4)消息队列之RabbitMQ--案例(Helloword起航)> 二.Work Queues(using the Java Cl

柯南君:看大数据时代下的IT架构(6)消息队列之RabbitMQ--案例(Publish/Subscribe起航)

一.回顾 让我们回顾一下,在上几章里都讲了什么?总结如下: <柯南君:看大数据时代下的IT架构(1)业界消息队列对比> <柯南君:看大数据时代下的IT架构(2)消息队列之RabbitMQ-基础概念详细介绍> <柯南君:看大数据时代下的IT架构(3)消息队列之RabbitMQ-安装.配置与监控> <柯南君:看大数据时代下的IT架构(4)消息队列之RabbitMQ--案例(Helloword起航)> <柯南君:看大数据时代下的IT架构(5)消息队列之Rab

柯南君:看大数据时代下的IT架构(4)消息队列之RabbitMQ--案例(Helloword起航)

一.回顾 让我们回顾一下,在上几章里都讲了什么?总结如下: <柯南君:看大数据时代下的IT架构(1)业界消息队列对比> <柯南君:看大数据时代下的IT架构(2)消息队列之RabbitMQ-基础概念详细介绍> <柯南君:看大数据时代下的IT架构(3)消息队列之RabbitMQ-安装.配置与监控> 二.起航 本章节,柯南君将从几个层面,用官网例子讲解一下RabbitMQ的实操经典程序案例,让大家重新回到经典"Hello world!"(The simpl

柯南君:看大数据时代下的IT架构(3)消息队列之RabbitMQ-安装、配置与监控

柯南君上一章<看大数据时代下的IT架构(2)消息队列之RabbitMQ-基础概念详细介绍>中,粗略的讲了一下,目前消息队列的几种常见产品的优劣对比,接下来的几章节会分别详细阐述,本章介绍RabbitMQ,好吧,废话少说,正式开始: 一.安装 1.安装Erlang 1)系统编译环境(这里采用linux/unix 环境) ① 安装环境 虚拟机:VMware? Workstation 10.0.1 build Linux系统:CentOS6.5 rabbitMQ官网下载:http://www.rab

柯南君:看大数据时代下的IT架构(2)消息队列之RabbitMQ-基础概念详细介绍

柯南君上一章<柯南君:看大数据时代下的IT架构(1)业界消息队列对比 >中,粗略的讲了一下,目前消息队列的几种常见产品的优劣对比,接下来的几章节会分别详细阐述,本章介绍RabbitMQ,好吧,废话少说,正式开始: 一.基础概念详细介绍 1.引言 你是否遇到过两个(多个)系统间需要通过定时任务来同步某些数据?你是否在为异构系统的不同进程间相互调用.通讯的问题而苦恼.挣扎?如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题. 消息服务擅长于解决多系统.异构系统间的数据交换(消息通知/通讯)问

柯南君:看大数据时代下的IT架构(1)业界消息队列对比

一.MQ(Message Queue) 即消息队列,一般用于应用系统解耦.消息异步分发,能够提高系统吞吐量.MQ的产品有很多,有开源的,也有闭源,比如ZeroMQ.RabbitMQ.ActiveMQ.Kafka/Jafka.Kestrel.Beanstalkd.HornetQ.Apache Qpid.Sparrow.Starling.Amazon SQS.MSMQ等,甚至Redis也可以用来构造消息队列.至于如何取舍,取决于你的需求. 由于工作需要和兴趣爱好,曾经写过关于RabbitMQ的系列博

看大数据时代下的IT架构(1)业界消息队列对比

一.MQ(Message Queue) 即消息队列,一般用于应用系统解耦.消息异步分发,能够提高系统吞吐量.MQ的产品有很多,有开源的,也有闭源,比如ZeroMQ.RabbitMQ.ActiveMQ.Kafka/Jafka.Kestrel.Beanstalkd.HornetQ.Apache Qpid.Sparrow.Starling.Amazon SQS.MSMQ等,甚至Redis也可以用来构造消息队列.至于如何取舍,取决于你的需求. 由于工作需要和兴趣爱好,曾经写过关于RabbitMQ的系列博

看大数据时代下的IT架构(1)图片服务器之演进史

        柯南君的公司最近产品即将上线,由于产品业务对图片的需求与日俱增,花样百出,与此同时,在大数据时代,大流量的冲击下,对图片服务器的压力可想而知,那么今天,柯南君结合互联网的相关热文,加上自己的一点实践经验,与君探讨,与君共勉! 一.图片服务器的重要性 当前,不管哪一家网站(包括 电商行业.O2O行业.互联网行业等),不管哪一种渠道 (包括 web端,APP端甚至一些SNS应用),在大数据时代下,在内容为王的前提下,对图片的需求量越来越大,柯南君的公司是一家O2O公司,也不例外,图片

柯南君:看大数据时代下的IT架构(7)消息队列之RabbitMQ--案例(routing 起航)

二.Routing(路由) (using the Java client) 在前面的学习中,构建了一个简单的日志记录系统,能够广播所有的日志给多个接收者,在该部分学习中,将添加一个新的特点,就是可以只订阅一个特定的消息源,也就是说能够直接把关键的错误日志消息发送到日志文件保存起来,不重要的日志信息文件不保存在磁盘中,但是仍然能够在控制台输出,那么这便是我们这部分要学习的消息的路由分发机制. 三.Bindings(绑定) 在前面的学习中已经创建了绑定(bindings),代码如下: channel