【RabbitMQ】 Routing

Routing

之前的章节里我们构建了一个简单的日志系统。我们可以广播所有的日志消息给所有的接收端。

本节我们将给它添加一个新特性 - 我们将允许只订阅一个消息的子集。例如,我们只将关键的错误消息定位到文件中(以节省磁盘空间),同时仍然可以在控制台输出所有日志消息。

Bindings

在前面的例子中我们已经创建了绑定关系。回想代码如下:

channel.queueBind(queueName, EXCHANGE_NAME, "");

一个绑定是指一个交换机和一个队列之间的关系。可以简单的理解为:队列对交换机中的消息感兴趣。

绑定需要一个额外的routingKey参数。为了避免和basic_publish中的参数混淆,我们现在称它为binding key。创建一个带有binding key的绑定:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

binding key的意义跟交换机的类型有关。我们之前使用的fanout交换机就直接忽略了binding key。

Direct交换机

我们之前的日志系统广播所有的消息给所有的消费者。我们希望扩展它,以便它可以根据消息严重级别来过滤。例如我们可能想要一个程序仅将关键错误写入磁盘,从而避免写入警告或信息导致磁盘空间的浪费。

我们曾经使用fanout交换机,它没有给我们很多的灵活性,仅仅就是无情的广播。

我们将使用一个direct交换机替代它。direct交换机的路由算法很简单 - 一个将要进入队列的消息的routing key必须和这个队列的binding key完全吻合。

为了说明这一点,请考虑一下设置:

可以看到direct交换机绑定了两个队列。第一个队列绑定了一个叫做orange的binding key。第二个队列有两个绑定,一个black一个green。

在这种设置下,以routing key为orange的消息将会被路由到队列Q1。而routing key为black和green的消息会被路由到Q2。所有其它消息会被丢弃。

多绑定

使用相同的binding key绑定多个队列是合法的。在我们的例子中,我们可以添加一个X和Q1之间的绑定,使用black作为binding key。这种情况下,direct交换机会像fanout一样广播消息给所有的匹配队列。一个带有routing key为black的消息将会被同时发送到Q1和Q2。

发送日志

我们将应用这种模型到我们的日志系统中。与fanout不同的是,我们将发送消息到一个direct交换机。需要提供日志级别作为routing key。这样接收程序就可以选择它希望接收的日志级别。首先先关注日志的发布:

首先创建一个交换机:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

准备发送消息:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

为了简化,我们假设severity可以是info,warning,error等。

订阅

接收消息就像前几节中讲的类似,但有一个例外 - 我们需要为每一个感兴趣的级别创建一个新的绑定。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

Putting it all together

EmitLogDirect.java

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLogDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");

    String severity = getSeverity(argv);
    String message = getMessage(argv);

    channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
    System.out.println(" [x] Sent ‘" + severity + "‘:‘" + message + "‘");

    channel.close();
    connection.close();
  }

  private static String getSeverity(String[] strings){
    if (strings.length < 1)
            return "info";
    return strings[0];
  }

  private static String getMessage(String[] strings){
    if (strings.length < 2)
            return "Hello World!";
    return joinStrings(strings, " ", 1);
  }

  private static String joinStrings(String[] strings, String delimiter, int startIndex) {
    int length = strings.length;
    if (length == 0 ) return "";
    if (length < startIndex ) return "";
    StringBuilder words = new StringBuilder(strings[startIndex]);
    for (int i = startIndex + 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
  }
}

ReceiveLogsDirect.java

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogsDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1){
      System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
      System.exit(1);
    }

    for(String severity : argv){
      channel.queueBind(queueName, EXCHANGE_NAME, severity);
    }
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received ‘" + envelope.getRoutingKey() + "‘:‘" + message + "‘");
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}
时间: 2024-11-10 01:13:18

【RabbitMQ】 Routing的相关文章

【RabbitMQ】一文带你搞定RabbitMQ延迟队列

本文口味:鱼香肉丝? ?预计阅读:10分钟 一.说明 在上一篇中,介绍了RabbitMQ中的死信队列是什么,何时使用以及如何使用RabbitMQ的死信队列.相信通过上一篇的学习,对于死信队列已经有了更多的了解,这一篇的内容也跟死信队列息息相关,如果你还不了解死信队列,那么建议你先进行上一篇文章的阅读. 这一篇里,我们将继续介绍RabbitMQ的高级特性,通过本篇的学习,你将收获: 什么是延时队列 延时队列使用场景 RabbitMQ中的TTL 如何利用RabbitMQ来实现延时队列 二.本文大纲

【rabbitmq】安装卸载

安装: 1. 在http://www.rabbitmq.com/install-rpm.html下载对应系统的rpm包  我下载的是rabbitmq-server-3.6.6-1.el6.noarch.rpm 2. rpm -ivh rabbitmq-server-3.6.6-1.el6.noarch.rpm   安装 3. service rabbitmq-server start    启动rabbitmq 卸载: 1.rpm -qa|grep rabbitmq 2.rpm -e --nod

【RabbitMQ】5、RabbitMQ任务分发机制

当有Consumer需要大量的运算时,RabbitMQ Server需要一定的分发机制来balance每个Consumer的load.接下来我们分布讲解. 应用场景就是RabbitMQ Server会将queue的Message分发给不同的Consumer以处理计算密集型的任务: 1. Message acknowledgment 消息确认 每个Consumer可能需要一段时间才能处理完收到的数据.如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么 非常不幸,这段数

【RabbitMQ】3、win7下安装RabbitMQ

RabbitMQ依赖erlang,所以先安装erlang,然后再安装RabbitMQ; erlang,下载地址:http://www.erlang.org/download RabbitMQ,下载地址: http://www.rabbitmq.com/releases/rabbitmq-server 先安装erlang,双击erlang的安装文件即可,然后配置环境变量: ERLANG_HOME=D:\Program Files\erl7.1 追加到path=%ERLANG_HOME%\bin;

【RabbitMQ】6、rabbitmq生产者的消息确认

通过Publisher Confirms and Returns机制,生产者可以判断消息是否发送到了exchange及queue,而通过消费者确认机制,Rabbitmq可以决定是否重发消息给消费者,以保证消息被处理. 1.什么是Publisher Confirms and Returns? Delivery processing acknowledgements from consumers to RabbitMQ are known as acknowledgements in AMQP 0-

【RabbitMQ】4、几种Exchange 模式

AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列.生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机.先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储.同理,消费者也是如此.Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中. RabbitMQ提供了四种Exchange模式:fanout,direct,topic,header . header模式在实际使用中较少,本文只对前三种模式进行比

【RabbitMQ】7、RabbitMQ主备复制是异步还是同步?

转自:https://yq.aliyun.com/articles/73040?spm=5176.100240.searchblog.116.RcXYdl 我们知道RabbitMQ可以配置成Queue做主从复制(按照官方的说法叫配置mirror queue),对master queue的写操作会被复制到其他slave上去(也就是复制到mirror queue上去).这对rabbitmq的这个特性,有些人会问这样的问题,rabbitmq的主从复制是同步的还是异步的? 为什么有些人会问这个问题那?主

【RabbitMQ】——5种队列(转)

原文地址:https://blog.csdn.net/u012654963/article/details/76417613 应用RabbitMQ,我们可以根据需求选择5种队列之一. 一.简单队列 P:消息的生产者 C:消息的消费者 红色:队列 简单队列的生产者和消费者关系一对一 但有时我们的需求,需要一个生产者,对应多个消费者,那就可以采用第二种模式 二.Work模式 一个生产者.2个消费者. 但MQ中一个消息只能被一个消费者获取.即消息要么被C1获取,要么被C2获取.这种模式适用于类似集群,

【RabbitMQ】3、work模式

上一篇博客的作为rabbitMQ的入门程序,也是简单队列模式,一个生产者,一个消费者,今天这篇博客介绍work模式,一个生产者,多个消费者,下面的例子模拟两个消费者的情况. 图示:         一个生产者.两个消费者:一个消息只能被一个消费者获取. 在work模式中可以分为两种模式,一种是两个消费者平均消费队列中的消息,即使他们的消费能力是不一样的,这种似乎不太符合实际的情况.另一种是能者多劳模式,处理消息能力强的消费者会获取更多的 消息,这种模式更符合实际需求. 生产者:向队列发送50条消