RabbitMQ之发布订阅

将同一个队列的消息发送给多个消费者的模式就是“发布/订阅”,

这种模式的基础是将消息广播到所有的接收器上。

实际上,RabbitMQ中消息传递模型的核心思想是:

生产者不直接发送消息到队列。

实际的运行环境中,生产者是不知道消息会发送到哪个队列上,

她只会将消息发送到一个交换器,

交换器也像一个生产线,她一边接收生产者发来的消息,另外一边则根据交换规则,将消息放到队列中。

交换器必须知道她所接收的消息是什么?它应该被放到哪个队列中?它应该被添加到多个队列吗?还是应该丢弃?

这些规则都是按照交换器的规则来确定的。

交换器的规则有:

direct (直连)

topic (主题)

headers (标题)

fanout (分发)也有翻译为扇出的

我们将使用【fanout】类型创建一个名称为myexchange的交换器

channel.exchangeDeclare("myexchange", "fanout");

分发交换器很简单,你通过名称也能想到,她是广播所有的消息

通过rabbitmqctl list_exchanges指令可以列出服务器上所有可用的交换器列表

这个列表里面所有以【amq.*】开头的交换器都是RabbitMQ默认创建的。在生产环境中,可以自己定义。

在之前的实例中,我们知道,发送消息到队列时根本没有使用交换器,但是消息也能发送到队列。

这是因为RabbitMQ选择了一个空“”字符串的默认交换器。

来看看我们之前的代码:

channel.basicPublish("", "myqueue", null, message.getBytes());

第一个参数就是交换器的名称。如果输入“”空字符串,表示使用默认的匿名交换器。

第二个参数是【routingKey】路由线索

匿名交换器规则:

发送到routingKey名称对应的队列

现在,我们可以发送消息到交换器中

channel.basicPublish( "myexchange", "", null, message.getBytes());

如果要在生产者和消费者之间创建一个新的队列,又不想使用原来的队列,临时队列就是为这个场景而生的:

1.首先,每当我们连接到RabbitMQ,我们需要一个新的空队列,我们可以用一个随机名称来创建,或者说让服务器选择一个随机队列名称给我们。

2.一旦我们断开消费者,队列应该立即被删除。

提供queuedeclare()为我们创建一个非持久化、独立、自动删除的随机队列名称

String queueName = channel.queueDeclare().getQueue();

如果我们已经创建了一个分发交换器和队列,现在我们就可以就将我们的队列跟交换器进行绑定。

channel.queueBind(queueName, exchangeName, "");

执行完这段代码后,交换器会将消息添加到该队列中

执行rabbitmqctl list_bindings 命令可以查看绑定列表

以下是代码:

生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    private static final String EXCHANGE_NAME = "myexchange";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 分发消息
        for (int i = 1; i <= 5; i++) {
            String message = "Hello World! " + i;
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" p Sent ‘" + message + "‘");
        }
        channel.close();
        connection.close();
    }
}

消费者1

import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Consumer1 {
    private static final String EXCHANGE_NAME = "myexchange";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" Consumer1 Waiting for messages...");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" Consumer1 Received ‘" + message + "‘");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

消费者2

import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Consumer2 {
    private static final String EXCHANGE_NAME = "myexchange";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" Consumer2 Waiting for messages...");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" Consumer2 Received ‘" + message + "‘");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

运行Consumer1 打印

Consumer1 Waiting for messages...

运行Consumer2 打印

Consumer2 Waiting for messages...

运行Producer 打印

p Sent ‘Hello World! 1‘

p Sent ‘Hello World! 2‘

p Sent ‘Hello World! 3‘

p Sent ‘Hello World! 4‘

p Sent ‘Hello World! 5‘

Consumer1监听到消息最终打印

Consumer1 Waiting for messages...

Consumer1 Received ‘Hello World! 1‘

Consumer1 Received ‘Hello World! 2‘

Consumer1 Received ‘Hello World! 3‘

Consumer1 Received ‘Hello World! 4‘

Consumer1 Received ‘Hello World! 5‘

Consumer2监听到消息最终打印

Consumer2 Waiting for messages...

Consumer2 Received ‘Hello World! 1‘

Consumer2 Received ‘Hello World! 2‘

Consumer2 Received ‘Hello World! 3‘

Consumer2 Received ‘Hello World! 4‘

Consumer2 Received ‘Hello World! 5‘

可以看到同一个消息被2个消费者接收

使用rabbitmqctl list_bindings命令可以看到两个临时队列的名称

原文地址:https://www.cnblogs.com/zengnansheng/p/10389654.html

时间: 2024-11-08 03:06:50

RabbitMQ之发布订阅的相关文章

【译】RabbitMQ:发布-订阅(Publish/Subscribe)

在前一篇教程中,我们创建了一个工作队列,我们假设在工作队列后的每一个任务都只被调度给一个消费者.在这一部分,我们将做一些完全不一样的事情,调度同一条消息给多个消费者,也就是有名的“发布-订阅”模式.为了阐述这种模式,我们将构建一个简单的日志系统.该系统将由两部分组成:一部分发送日志消息,另一部分接收并且打印日志消息,在这个日志系统中,每一份运行着的接收程序都将会收到消息.这样我们可以运行一个接收者把日志写入到磁盘中,同时可以运行另一个接收者将日志打印到显示器上面.也就是说,发布的日志消息会被广播

RabbitMQ(三) ——发布订阅

RabbitMQ(三) --发布订阅 (转载请附上本文链接--linhxx) 一.概述 RabbitMQ的发布订阅(Publish/Subscribe),其将生产者和消费者进一步解耦,生产者生产消息后,交付给交换机,消费者上线后,主动主动去队列中取数据进行处理.该模式也符合上一节工作队列中的ack.预取等规则. 发布订阅模式如下图所示: 二.交换机(exchange) 生产者生产完消息之后,都是将消息通过channel交给交换机,即生产者并不直接和队列联系.在没有定义交换机的时候,RabbitM

RabbitMQ 发布/订阅

我们会做一些改变,就是把一个消息发给多个消费者,这种模式称之为发布/订阅(类似观察者模式). 为了验证这种模式,我们准备构建一个简单的日志系统.这个系统包含两类程序,一类程序发动日志,另一类程序接收和处理日志. 在我们的日志系统中,每一个运行的接收者程序都会收到日志.然后我们实现,一个接收者将接收到的数据写到硬盘上,与此同时,另一个接收者把接收到的消息展现在屏幕上.本质上来说,就是发布的日志消息会转发给所有的接收者. 1.转发器(Exchanges) RabbitMQ消息模型的核心理念是生产者永

.Net下RabbitMQ发布订阅模式实践

一.概念AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然.AMQP的主要特征是面向消息.队列.路由(包括点对点和发布/订阅).可靠性.安全.RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python.Ruby..NET.Java.JMS.C.PHP.ActionScri

RabbitMQ实例教程:发布/订阅者消息队列

消息交换机(Exchange) RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,一般的情况生产者甚至不知道消息应该发送到哪些队列. 相反的,生产者只能发送消息给交换机(Exchange).交换机的作用非常简单,一边接收从生产者发来的消息,另一边把消息推送到队列中.交换机必须清楚的知道消息如何处理它收到的每一条消息.是否应该追加到一个指定的队列?是否应该追加到多个队列?或者是否应该丢弃?这些规则通过交换机的类型进行定义. 交换机的类型有:direct,topic,head

RabbitMQ官网教程---发布/订阅

(使用python客户端pika 0.9.8) 在前面的教程中我们创建了一个工作队列.假设在一个工作队列后面是每一个被传递给正确的工作者的任务.在这部分中我们将做一些完全不同的事情--我们将给多个消费者传递一个消息.这种模式被称为"发布/订阅". 为了阐明这个模式,我们将构建一个简单的日志系统.它将由两个程序构成--第一个将发出日志消息并且第二个将接收并且打印它们. 在我们的日志系统中每个运行的接收程序副本将获得这个消息.用这种方式我们将可以运行一个接收器并且直接日志到磁盘:而且同时我

RabbitMQ/JAVA (发布/订阅模式)

发布/订阅模式即生产者将消息发送给多个消费者. 下面介绍几个在发布/订阅模式中的关键概念-- 1. Exchanges (转发器) 可能原来我们都是基于一个队列发送和接收消息.现在介绍一下完整的消息传递模式. Rabbitmq消息模式的核心理念是:生产者没有直接发送任何消息到队列.实际上,生产者都不知道这个消息是发送给哪个队列的.相反,生产者只能发送消息给转发器. 转发器一方面接收生产者的消息,另一方面向队列推送消息. 转发器必须清楚的指导如何处理接收到的消息,需要附加队列吗?附加几个?或者是否

消息队列 RabbitMQ系列 第四篇:发布/订阅 Publish/Subscribe

上篇中我们实现了Work Queue的创建,在Work Queue背后,其实是rabbitMQ把每条任务消息只发给一个消费者.本篇中我们将要研究如何把一条消息推送给多个消费者,这种模式被称为publish/subscribe(发布/订阅). 为了说明这个模式,我们将会构建一个简单的日志系统.这将会包含两部分程序,第一个是发送日志信息,第二个将会接收并打印它们. 在我们的日志系统里,每个运行的消费者程序都能接收到消息.这样我就运行一个receiver并把日志写到磁盘上,同时我们再运行另外一个消费者

RabbitMQ系列教程之三:发布/订阅(Publish/Subscribe)

(本教程是使用Net客户端,也就是针对微软技术平台的) 在前一个教程中,我们创建了一个工作队列.工作队列背后的假设是每个任务会被交付给一个[工人].在这一部分我们将做一些完全不同的事情--我们将向多个[消费者]传递信息.这种模式被称为"发布/订阅". 为了说明这种模式,我们将构建一个简单的日志系统.它将包括两个程序,第一个将发出日志消息,第二个将接收并打印它们. 在我们的日志系统中每个接收程序的运行副本都会得到消息.这样我们就可以运行一个接收者程序,将日志记录到磁盘:同时我们可以运行另