RabbitMQ学习(三)订阅/发布

RabbitMQ学习(三)订阅/发布

1.RabbitMQ模型

前面所学都只用到了生产者、队列、消费者。如上图所示,其实生产者并不直接将信息传输到队列中,在生产者和队列中间有一个交换机(Exchange),我们之前没有使用到交换机是应为我们没有配置交换机,使用了默认的交换机。

有几个可供选择的交换机类型:直连交换机(direct), 主题交换机(topic), (头交换机)headers和 扇型交换机(fanout)

这里我们使用扇形交换机做一个简单的广播模型:一个生产者和多个消费者接受相同消息;

生产者代码:


public class Productor {

public static void main(String[]
args) throws
IOException,
TimeoutException {

//配置rabbitmq服务器地址

ConnectionFactory
factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setPort(5672);

factory.setUsername("starktan");

factory.setPassword("starktan");

factory.setVirtualHost("/");

//建立连接和通道

Connection
connection = factory.newConnection();

Channel channel =
connection.createChannel();

//声明一个扇形交换机

channel.exchangeDeclare("fanout",
BuiltinExchangeType.FANOUT);

System.out.println("发送信息!");

String message = "WorkQueue
Message number "
+ System.currentTimeMillis();

channel.basicPublish("fanout", "", true, null,
message.getBytes());

channel.close();

connection.close();

}

}

消费者代码:

public class Consumer {
    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        //创建连接和通道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        ExecutorService service = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 4; i++) {
            final int cur = i;
            service.submit(new Runnable() {
                Channel channel = connection.createChannel();
                String queryname = channel.queueDeclare().getQueue();
                public void run() {
                    //创建队列消费者
                    QueueingConsumer consumer = new QueueingConsumer(channel);
                    try {
                        channel.queueBind(queryname,"fanout","");
                        channel.basicConsume(queryname,consumer);
                        while (true) {
                            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                            String message = new String(delivery.getBody());
                            System.out.println("线程 " + cur + " 获取到消息 " + message);

                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        service.shutdown();
    }
}

运行效果:

时间: 2024-11-02 16:06:21

RabbitMQ学习(三)订阅/发布的相关文章

RabbitMQ(三) ——发布订阅

RabbitMQ(三) --发布订阅 (转载请附上本文链接--linhxx) 一.概述 RabbitMQ的发布订阅(Publish/Subscribe),其将生产者和消费者进一步解耦,生产者生产消息后,交付给交换机,消费者上线后,主动主动去队列中取数据进行处理.该模式也符合上一节工作队列中的ack.预取等规则. 发布订阅模式如下图所示: 二.交换机(exchange) 生产者生产完消息之后,都是将消息通过channel交给交换机,即生产者并不直接和队列联系.在没有定义交换机的时候,RabbitM

python使用rabbitMQ介绍三(发布订阅模式)

一.模式介绍 在前面的例子中,消息直接发送到queue中. 现在介绍的模式,消息发送到exchange中,消费者把队列绑定到exchange上. 发布-订阅模式是把消息广播到每个消费者,每个消费者接收到的消息都是相同的. 一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的.需要注意的是,如果将消息发送到一个没有队列绑定的exchange上面,那么该

rabbitmq系列三 之发布/订阅

1.发布/订阅 在上篇教程中,我们搭建了一个工作队列,每个任务只分发给一个工作者(worker).在本篇教程中,我们要做的跟之前完全不一样 -- 分发一个消息给多个消费者(consumers).这种模式被称为"发布/订阅". 为了描述这种模式,我们将会构建一个简单的日志系统.它包括两个程序--第一个程序负责发送日志消息,第二个程序负责获取消息并输出内容. 在我们的这个日志系统中,所有正在运行的接收方程序都会接受消息.我们用其中一个接收者(receiver)把日志写入硬盘中,另外一个接受

rabbitMQ学习笔记(四) 发布/订阅消息

前面都是一条消息只会被一个消费者处理. 如果要每个消费者都处理同一个消息,rabbitMq也提供了相应的方法. 在以前的程序中,不管是生产者端还是消费者端都必须知道一个指定的QueueName才能发送.获取消息.  而rabbitMQ消息模型的核心思想是生产者不会将消息直接发送给队列. 因为,生产者通常不会知道消息将会被哪些消费者接收. 生产者的消息虽然不是直接发送给Queue,但是消息会交给Exchange,所以需要定义Exchange的消息分发模式 ,之前的程序中,有如下一行代码: chan

RabbitMQ学习三

Work Queues 在上一篇文章中,send.py程序向名为hello的队列发送消息,receive.py程序向名为hello的队列接收消息.这一节中,我们将创建一个Work Queue用于将那些比较耗时的任务分布到多个worker上. Work Queues工作队列或者叫做Task Queues任务队列的主要概念就是为了避免立刻执行一个耗费资源的任务并且不得不等待它执行完成.取而代之的是,我们将这个任务调度到以后去执行. 我们封装一个任务为一个消息并发送这个消息到队列.一个work pro

RabbitMQ学习系列(三): C# 如何使用 RabbitMQ

上一篇已经讲了Rabbitmq如何在Windows平台安装,还不了解如何安装的朋友,请看我前面几篇文章:RabbitMQ学习系列一:windows下安装RabbitMQ服务 , 今天就来聊聊 C# 实际开发的过程中,怎么调用 用RabbitMQ. 一.客户端 RabbitMQ 有很多客户端API,都非常的好用.我们在一边,一直用的都是 EasyNetQ,所以这里的 demo 只介绍 EasyNetQ 客户端实现.其他的客户端,大家自己去研究吧. EasyNetQ 是一个易于使用的RabbitMQ

Part1.2 、RabbitMQ -- Publish/Subscribe 【发布和订阅】

python 目录 (一).交换 (Exchanges) -- 1.1 武sir 经典 Exchanges 案例展示. (二).临时队列( Temporary queues ) (三).绑定(Bindings) (四).汇总(Putting it all together) python系列之 RabbitMQ -- Publish/Subscribe [发布和订阅] >>前面的部分我们创建了一个工作队列(work queue). 设想是每个任务都能分发到一个 worker[queue],这一

RabbitMQ下的生产消费者模式与订阅发布模式

??所谓模式,就是在某种场景下,一类问题及其解决方案的总结归纳.生产消费者模式与订阅发布模式是使用消息中间件时常用的两种模式,用于功能解耦和分布式系统间的消息通信,以下面两种场景为例: 数据接入 ??假设有一个用户行为采集系统,负责从App端采集用户点击行为数据.通常会将数据上报和数据处理分离开,即App端通过REST API上报数据,后端拿到数据后放入队列中就立刻返回,而数据处理则另外使用Worker从队列中取出数据来做,如下图所示. ??这样做的好处有:第一,功能分离,上报的API接口不关心

Redis系列(三)—— 订阅/发布

Redis 订阅/发布 参考:http://www.cnblogs.com/mushroom/p/4470006.html,http://www.tuicool.com/articles/ABry2aj,http://www.cnblogs.com/tinywan/p/5903256.html,http://www.cnblogs.com/linjiqin/p/5733183.html,http://redisbook.readthedocs.io/en/latest/feature/pubsu