RabbitMQ - 发布订阅

这次我们试试publish / subscribe模式,
也就是将一个消息发送给多个consumer。

这里用一个简单的小程序来说明publish / subscribe。
由一个provider提供消息,这个消息会被多个consumer接收。
consumer对同一个消息做出不同的反应,比如打印、保存到文件、数据库什么的。

之前的例子可能会给人这种感觉:
producer将消息发送到队列中,消息缓冲在队列中,consumer从队列获得消息。

但这并不正确。
在rabbit中,producer从来不会直接将消息发送到队列中。
producer根本无从得知消息是否会发送到某个队列中。

事实上,producer只能将消息发送到exchange中。
这么一说虽然感觉多了个东西,但exchange并不复杂。
exchange只是从producer获取消息并将消息推送到队列中。

但为什么多了这么个步骤?
比如exchange收到消息后,它应该将消息推送给某个特定的队列? 或者可以将消息推送给多个队列? 再或者直接抛弃该消息?
这些规则取决于exchange的类型。

以下是一些可用的exchange type(org.springframework.amqp.core.ExchangeTypes):

public static final String DIRECT = "direct";
public static final String TOPIC = "topic";
public static final String FANOUT = "fanout";
public static final String HEADERS = "headers";
public static final String SYSTEM = "system";

我们可以用以下方式定义一个exchange:

channel.exchangeDeclare("logs", "fanout");

正如其名,fanout就是将收到的消息发送给所有可访问的队列。

如何查看已定义的exchange?
查看已定义的exchange,我们可以用rabbitmqctl list_exchanges命令,如图:

图中名为amq.*和没有名字的exchange都是默认自带的。
(PS:之前的例子中我们还没有用到exchange的概念,但仍然成功地将消息发送到了队列中。
这是因为我们使用的是默认的exchange。)

我们需要将消息发送到指定的exchange中。
basicPublish的第一个参数就是exchange的名称(重写的几个都是)。
空字符串表示默认的exchange:

channel.basicPublish( "logs", "", null, message.getBytes());

队列的命名很重要,比如多个worker共享一个队列,producer和consumer的关系用队列名维系。
但并不是所有的场景都需要我们亲自去命名。
比如我们需要获得所有消息,而不是它的某个子集。
或者我们更关心最新的消息,而不是更早放到队列的那些。

我们需要让server随机命名队列,并且队列在consumer连接断开时自动删除。

我们只需要一行代码来做这些:

String queueName = channel.queueDeclare().getQueue();

调用不带参数的queueDeclare()可以创建一个临时队列。

到此我们就创建好了exchange和队列。
我们需要用什么东西将他们联系起来,这个东西叫作"binding"。

通过以下代码将他们联系起来:

channel.queueBind(queueName, "logs", "");

正如查看exchange那样,我们可以用rabbitmqctl list_bindings命令查看binding。
如图:

从producer到queue的关系图如下:

写了个Channel静态工厂,写的不好。
我打算在静态初始化块中定义两个exchange:

final class ChannelFactory {

    //consumer的temporary queue与这两个exchange绑定
    final static String EXCHANGE_NAME = "log";
    final static String EXCHANGE_NAME_ = "log2";

    private static final ConnectionFactory factory = new ConnectionFactory();

    static{
        try {
            Channel temp = getChannel();
            temp.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);
            temp.exchangeDeclare(EXCHANGE_NAME_, ExchangeTypes.FANOUT);
            closeChannel(temp);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private ChannelFactory() {
    }

    public static Channel getChannel() {
        try {
            return factory.newConnection().createChannel();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static Channel getChannel(int channelNumber) {
        try {
            return factory.newConnection().createChannel();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static void closeChannel(Channel channel) {
        try {
            channel.close();
            channel.getConnection().close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

producer类,同一个producer给两个exchange发消息:

class Publisher {
    public static void main(String[] args) throws IOException {

        Channel channel = ChannelFactory.getChannel();

        String message = "Here is the content";
        channel.basicPublish(ChannelFactory.EXCHANGE_NAME, StringUtils.EMPTY, null,
                ("EXCHANGE_NAME 1:::"+message).getBytes());
        channel.basicPublish(ChannelFactory.EXCHANGE_NAME_, StringUtils.EMPTY, null,
                ("EXCHANGE_NAME 2:::"+message).getBytes());

        ChannelFactory.closeChannel(channel);
    }
}

consumer类,临时队列需要和两个exchange进行绑定:

class Logger {
    public static void main(String[] args) throws IOException,
            ShutdownSignalException, ConsumerCancelledException,
            InterruptedException {
        Channel channel = ChannelFactory.getChannel();

        String queue = channel.queueDeclare().getQueue();
        System.out.println("temporary queue name::"+queue);

        channel.queueBind(queue, ChannelFactory.EXCHANGE_NAME, "");
        channel.queueBind(queue, ChannelFactory.EXCHANGE_NAME_, "");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queue, true, consumer);

        while (true) {
            System.out.println(new String(consumer.nextDelivery().getBody()));
        }
    }
}

由于使用的是临时队列,需要先运行consumer再运行producer。
运行结果输出:

时间: 2024-10-06 02:33:31

RabbitMQ - 发布订阅的相关文章

RabbitMQ 发布/订阅

我们会做一些改变,就是把一个消息发给多个消费者,这种模式称之为发布/订阅(类似观察者模式). 为了验证这种模式,我们准备构建一个简单的日志系统.这个系统包含两类程序,一类程序发动日志,另一类程序接收和处理日志. 在我们的日志系统中,每一个运行的接收者程序都会收到日志.然后我们实现,一个接收者将接收到的数据写到硬盘上,与此同时,另一个接收者把接收到的消息展现在屏幕上.本质上来说,就是发布的日志消息会转发给所有的接收者. 1.转发器(Exchanges) RabbitMQ消息模型的核心理念是生产者永

.Net下RabbitMQ发布订阅模式实践

一.概念AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然.AMQP的主要特征是面向消息.队列.路由(包括点对点和发布/订阅).可靠性.安全.RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python.Ruby..NET.Java.JMS.C.PHP.ActionScri

RabbitMQ 发布订阅持久化

RabbitMQ是一种重要的消息队列中间件,在生产环境中,稳定是第一考虑.RabbitMQ厂家也深知开发者的声音,稳定.可靠是第一考虑,为了消息传输的可靠性传输,RabbitMQ提供了多种途径的消息持久化保证:Exchange持久化.Queue持久化及Message的持久化.以保证RabbitMQ在退出或Crash等异常情况下,消息不会丢失.RabbitMQ提供了简单的参数配置来实现持久化操作. 简单说明一下各种持久化方式:(描述代码采用的是Rabbit.Client  SDK,  C#代码)

RabbitMQ发布订阅模式

这个可能是消息队列中最重要的队列了,其他的都是在它的基础上进行了扩展.功能实现:一个生产者发送消息,多个消费者获取消息(同样的消息),包括一个生产者,一个交换机,多个队列,多个消费者. 思路解读(重点理解): (1)一个生产者,多个消费者(2)每一个消费者都有自己的一个队列(3)生产者没有直接发消息到队列中,而是发送到交换机(4)每个消费者的队列都绑定到交换机上(5)消息通过交换机到达每个消费者的队列该模式就是Fanout Exchange(扇型交换机)将消息路由给绑定到它身上的所有队列以用户发

【译】RabbitMQ:发布-订阅(Publish/Subscribe)

在前一篇教程中,我们创建了一个工作队列,我们假设在工作队列后的每一个任务都只被调度给一个消费者.在这一部分,我们将做一些完全不一样的事情,调度同一条消息给多个消费者,也就是有名的“发布-订阅”模式.为了阐述这种模式,我们将构建一个简单的日志系统.该系统将由两部分组成:一部分发送日志消息,另一部分接收并且打印日志消息,在这个日志系统中,每一份运行着的接收程序都将会收到消息.这样我们可以运行一个接收者把日志写入到磁盘中,同时可以运行另一个接收者将日志打印到显示器上面.也就是说,发布的日志消息会被广播

RabbitMQ实例教程:发布/订阅者消息队列

消息交换机(Exchange) RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,一般的情况生产者甚至不知道消息应该发送到哪些队列. 相反的,生产者只能发送消息给交换机(Exchange).交换机的作用非常简单,一边接收从生产者发来的消息,另一边把消息推送到队列中.交换机必须清楚的知道消息如何处理它收到的每一条消息.是否应该追加到一个指定的队列?是否应该追加到多个队列?或者是否应该丢弃?这些规则通过交换机的类型进行定义. 交换机的类型有:direct,topic,head

RabbitMQ官网教程---发布/订阅

(使用python客户端pika 0.9.8) 在前面的教程中我们创建了一个工作队列.假设在一个工作队列后面是每一个被传递给正确的工作者的任务.在这部分中我们将做一些完全不同的事情--我们将给多个消费者传递一个消息.这种模式被称为"发布/订阅". 为了阐明这个模式,我们将构建一个简单的日志系统.它将由两个程序构成--第一个将发出日志消息并且第二个将接收并且打印它们. 在我们的日志系统中每个运行的接收程序副本将获得这个消息.用这种方式我们将可以运行一个接收器并且直接日志到磁盘:而且同时我

RabbitMQ/JAVA (发布/订阅模式)

发布/订阅模式即生产者将消息发送给多个消费者. 下面介绍几个在发布/订阅模式中的关键概念-- 1. Exchanges (转发器) 可能原来我们都是基于一个队列发送和接收消息.现在介绍一下完整的消息传递模式. Rabbitmq消息模式的核心理念是:生产者没有直接发送任何消息到队列.实际上,生产者都不知道这个消息是发送给哪个队列的.相反,生产者只能发送消息给转发器. 转发器一方面接收生产者的消息,另一方面向队列推送消息. 转发器必须清楚的指导如何处理接收到的消息,需要附加队列吗?附加几个?或者是否

消息队列 RabbitMQ系列 第四篇:发布/订阅 Publish/Subscribe

上篇中我们实现了Work Queue的创建,在Work Queue背后,其实是rabbitMQ把每条任务消息只发给一个消费者.本篇中我们将要研究如何把一条消息推送给多个消费者,这种模式被称为publish/subscribe(发布/订阅). 为了说明这个模式,我们将会构建一个简单的日志系统.这将会包含两部分程序,第一个是发送日志信息,第二个将会接收并打印它们. 在我们的日志系统里,每个运行的消费者程序都能接收到消息.这样我就运行一个receiver并把日志写到磁盘上,同时我们再运行另外一个消费者