RabbitMQ4--发后即忘和RPC

  在项目中引入RabbitMQ通常会考虑它会带来的好处:解耦应用程序,实现不同编程语言之间的互通,解除对特定通信协议的依赖,解除应用程序在时序上执行的依赖(异步).落实到代码层面就是两种常用应用模式:"发后即忘"(fire-and-forget)和RPC.

fire-and-forget

  RabbitMQ 解决的是应用程序之间互联(connect)和规模(scale)的问题,消息发送和接收是隔离,发送方不知道消息最终由谁接收,接收方也不必关心消息是谁步发出的;发送和接收是隔离的,消息本质上就是异步的.这种隔离也就解耦了应用程序之间的依赖.RabbitMQ的角色就是应用程序中间的路由器.对于消息的发布方来讲这是一种"发后即忘"(fire-and_forget)的发布方式.

RPC

  RPC需要双向通信,或者说RPC Server需要明确知道要把消息发送给谁.我们可以在payload的数据部分附加"发给谁" 这种EndPoint信息. RabbitMQ提供的解决方案:在每一个AMQP的消息头上有一个reply_to字段.这样消息的producer就可以指定Queue name,RPC Server接受到消息检查reply_to字段,创建一个消息包含Response并把queue name作为routing key,订阅了这个队列的Client就拿到了消息.

  这里有两件事情要保证:

  1. 要为队列创建随机Name
  2. 即使Name随机还是有可能冲突,还需要保证消息通信的独占性。

  看看RabbitMQ是怎么满足这两点的:

  1. 如果创建的队列不指定queue name,RabbitMQ就会创建一个随机的Name.
  2. 独占只需要exclusive参数即可

  总而言之,需要做的就是Client创建一个temporary,exclusive,anonymou的queue,并把queue name设置在RPC 消息的reply_to字段即可.注意这里RPC Server已经知道要投递到哪个Queue,所以不需要指定Exchange(后面我们会提到在实现层面Queue和Exchange的不同,简单讲 queue会有对应的Erlang进程,而exchang只是执行一些模式匹配的检查并没有进程实体对应).看下图:

  Our RPC will work like this:

  • When the Client starts up, it creates an anonymous exclusive callback queue.
  • For an RPC request, the Client sends a message with two properties: replyTo, which is set to the callback queue and correlationId, which is set to a unique value for every request.
  • The request is sent to an rpc_queue queue.
  • The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the replyTo field.
  • The client waits for data on the callback queue. When a message appears, it checks the correlationId property. If it matches the value from the request it returns the response to the application

RPC Client端的代码如下:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
import java.util.UUID;

public class RPCClient {
  private Connection connection;
  private Channel channel;
  private String requestQueueName = "rpc_queue";
  private String replyQueueName;
  private QueueingConsumer consumer;

  public RPCClient() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    connection = factory.newConnection();
    channel = connection.createChannel();

    replyQueueName = channel.queueDeclare().getQueue();
    consumer = new QueueingConsumer(channel);
    channel.basicConsume(replyQueueName, true, consumer);
  }

  public String call(String message) throws Exception {
    String response = null;
    String corrId = UUID.randomUUID().toString();
    BasicProperties props = new BasicProperties
                                .Builder()
                                .correlationId(corrId)
                                .replyTo(replyQueueName)
                                .build();

    channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      if (delivery.getProperties().getCorrelationId().equals(corrId)) {
        response = new String(delivery.getBody(),"UTF-8");
        break;
      }
    }
    return response;
  }

  public void close() throws Exception {
    connection.close();
  }

  public static void main(String[] argv) {
    RPCClient fibonacciRpc = null;
    String response = null;
    try {
      fibonacciRpc = new RPCClient();

      System.out.println(" [x] Requesting fib(30)");
      response = fibonacciRpc.call("30");
            System.out.println(" [.] Got ‘" + response + "‘");
        }
    catch  (Exception e) {
      e.printStackTrace();
    }
    finally {
      if (fibonacciRpc!= null) {
        try {
          fibonacciRpc.close();
        }
        catch (Exception ignore) {}
      }
    }
  }
}

RPC SEVER端代码如下:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;

public class RPCServer {
    private static final String RPC_QUEUE_NAME = "rpc_queue";
    //计算斐波切纳数列
    private static int fib(int n) {
        if (n == 0)
            return 0;
        if (n == 1)
            return 1;
        return fib(n - 1) + fib(n - 2);
    }

    public static void main(String[] argv) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            connection = factory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.basicQos(1);

            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
            System.out.println(" [x] Awaiting RPC requests");

            while (true) {
                String response = null;
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                BasicProperties props = delivery.getProperties();
                BasicProperties replyProps = new BasicProperties.Builder()
                        .correlationId(props.getCorrelationId()).build();

                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    int n = Integer.parseInt(message);
                    System.out.println(" [.] fib(" + message + ")");
                    response = "" + fib(n);
                } catch (Exception e) {
                    System.out.println(" [.] " + e.toString());
                    response = "";
                } finally {
                    channel.basicPublish("", props.getReplyTo(), replyProps,
                            response.getBytes("UTF-8"));
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
                            false);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ignore) {
                }
            }
        }
    }
}

略有不同

  传统的RPC调用Client和Server紧密依赖,客户端连接上服务器,发送一个请求然后阻塞等待服务器响应.这样的做的特点是客户端和服务器端是知道对方的.如果RPC Server崩溃掉,客户端需要重连,如果Server彻底崩掉就要重新找一个提供同样服务的Server,然后客户端重连过去.

  用RabbitMQ来实现RPC,依然保持Client Server信息隐藏的特点,Client依赖的不是特定的Server而是特定的消息,在有多个等效Server的情况下,一个Server的状态是否正常不会影响到客户端的状态.

  总结一下,使用RabbitMQ是先RPC,客观上还实现了下面的效果:

  1. 容错 一个Server崩溃不影响 Client
  2. 解耦了对特定通信协议和接口的依赖,统一走AMQP消息.
  3. 在多个RPC Server之间的负载均衡由RabbitMQ完成
时间: 2024-07-29 22:50:42

RabbitMQ4--发后即忘和RPC的相关文章

Error-ASP.NET:由于未能找到 id 为“FileUpload1$gvFiles$ctl02$lnkBtnRemoveFile”的控件或在回发后将同一 ID 分配给另一个控件,导致发生错误。如果未分配 ID,请显式设置引发回发事件的控件的 ID 属性以避免此错误。

ylbtech-Error-ASP.NET:由于未能找到 id 为“FileUpload1$gvFiles$ctl02$lnkBtnRemoveFile”的控件或在回发后将同一 ID 分配给另一个控件,导致发生错误.如果未分配 ID,请显式设置引发回发事件的控件的 ID 属性以避免此错误. 1.返回顶部 1. “/”应用程序中的服务器错误. 由于未能找到 id 为“FileUpload1$gvFiles$ctl02$lnkBtnRemoveFile”的控件或在回发后将同一 ID 分配给另一个控件

Tensorflow2.0报错:ProfilerNotRunningError: Cannot stop profiling. No profiler is running.(修改后别忘了重启内核或关掉重启)

# 把这个路径用包装一下logdir = os.path.join("cnn_selu_callbacks") print(logdir)if not os.path.exists(logdir):    os.mkdir(logdir)output_model_file = os.path.join(logdir,                                 "fashion_mnist_model.h5") callbacks = [   

消息队列_RabbitMQ-0004.深入RabbitMQ之分类告警/并行执行/RPC响应?

应用场景: 1. 通知,针对发送事件的描述,内容可以是消息的日志,也可以是真实的报告通知给另一个程序或者管理员. 说明: 首先选择交换机,如果选择fanout交换机,则需要为每种告警传输类型(邮件/微信/手机/短信)创建队列,但同时也带来坏处就是每个消息都会发送到所有队列,导致告警消息发生时,被报警消息淹没,如果选择topic交换机,则可为其创建四种严重级别告警info/warning/problem/citical,但如果使用fanout类型交换机消息会发送到所有这四个级别队列,如果使用dir

RabbitMQ(四):RPC的实现

原文:RabbitMQ(四):RPC的实现 一.RPC RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议.有很多方式可以实现,譬如UNIX RPC.REST API.WCF和SOAP.这些传统的RPC实现方法有共同之处:那就是客户端和服务器端紧密相连,客户端直接连接上服务器,发送一个请求,然后就停下来等待服务器的应答. 这种点对点的性质模式有很多好处,它使得在小范围内的拓扑变得简单.但是当有众多服务器的

RabbitMQ实战:消息通信模式和最佳实践

本系列是「RabbitMQ实战:高效部署分布式消息队列」书籍的总结笔记. 通过前2篇的介绍,了解了消息通信的主要元素和交互过程,以及如何运行和管理RabbitMQ,这篇将站在开发模式的角度理解「面向消息通信」带来的好处,以及在各种场景下的最佳实践. 通过介绍,你会了解到: 面向消息通信的好处 发后即忘模型 用RabbitMQ实现RPC 面向消息通信的好处 主要从异步状态思维.处理能力扩展性.集成复杂度方面,说明面向消息通信的好处. 异步状态思维 当将消息通信集成到应用程序时,开发模式将从同步模型

RabbitMQ实战:可用性分析和实现

本系列是「RabbitMQ实战:高效部署分布式消息队列」书籍的总结笔记. 上一篇介绍了各种场景下的最佳实践,大部分场景可以使用「发后即忘」的模式,不需要响应,如果需要响应,可以使用RabbitMQ的RPC模型. RabbitMQ以异步的方式解耦系统间的关系,调用者将业务请求发送到Rabbit服务器,就可以返回了,Rabbit会确保请求被正确处理,即使遇到网络异常.Rabbit服务器崩溃.整个机房断电等特殊场景,针对这些场景,Rabbit提供了各种机制确保其可用性. 本篇通过总结可能出现的特殊场景

全面解析C#中的异步编程

当我们处理一些长线的调用时,经常会导致界面停止响应或者IIS线程占用过多等问题,这个时候我们需要更多的是用异步编程来修正这些问题,但是通常都是说起来容易做起来难,诚然异步编程相对于同步编程来说,它是一种完全不同的编程思想,对于习惯了同步编程的开发者来说,在开发过程中难度更大,可控性不强是它的特点. 在.NET Framework5.0种,微软为我们系统了新的语言特性,让我们使用异步编程就像使用同步编程一样相近和简单,本文中将会解释以前版本的Framework中基于回调道德异步编程模型的一些限制以

rabbitmq的相关知识

1. 如何确保消息正确地发送至RabbitMQ? RabbitMQ使用发送方确认模式,确保消息正确地发送到RabbitMQ. 发送方确认模式:将信道设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID.一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一ID).如果RabbitMQ发生内部错误从而导致消息丢失,会发送一条nack(not acknowledged,未确认)消息. 发送方确认模式是异步的

深入理解Kafka必知必会(上)

Kafka的用途有哪些?使用场景如何? 消息系统: Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦.冗余存储.流量削峰.缓冲.异步通信.扩展性.可恢复性等功能.与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能. 存储系统: Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险.也正是得益于 Kafka 的消息持久化功能和多副本机制,我们可以把 Kafka 作为长期的数据存储系统来使用,只需要把对应的数据保