Java使用RabbitMQ之订阅分发(Topic)

使用RabbitMQ进行消息发布和订阅,生产者将消息发送给转发器(exchange),转发器根据路由键匹配已绑定的消息队列并转发消息,主题模式支持路由键的通配。

生产者代码:

 1 package org.study.exchange3.topic3;
 2
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import org.junit.Test;
 6 import org.study.utils.ConnectionUtils;
 7
 8 import java.io.IOException;
 9 import java.util.concurrent.TimeoutException;
10
11 /**
12  * topic-主题模式(分发订阅)
13  * exchange只转发消息,但是没有存储能力,只有队列才有存储能力
14  * 主题模式支持路由键的通配符
15  * “#”表示0个或若干个关键字,“*”表示一个关键字。
16  */
17 public class Sender {
18     public static final String QUEUE_NAME = "test_topic_queue";
19     public static final String EXCHANGE_NAME = "topic_exchange";
20
21     @Test
22     public void send() throws IOException, TimeoutException, InterruptedException {
23         // 获取连接
24         Connection conn = ConnectionUtils.getConnection();
25         // 获取通道
26         Channel channel = conn.createChannel();
27 //        //创建队列
28 //        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
29         //声明转发器
30         channel.exchangeDeclare(EXCHANGE_NAME, "topic");
31         //每个消费者发送确认消息前,只发送一条消息
32         channel.basicQos(1);
33
34         String msg = "hello rabbitmq topic !";
35         //发送消息至转发器,指定路由键
36         channel.basicPublish(EXCHANGE_NAME, "key.key", null, msg.getBytes());
37         System.out.println("[send] msg " + msg);
38
39         channel.close();
40         conn.close();
41     }
42 }

消费者代码:

 1 package org.study.exchange3.topic3;
 2
 3 import com.rabbitmq.client.*;
 4 import org.junit.Test;
 5 import org.study.utils.ConnectionUtils;
 6
 7 import java.io.IOException;
 8 import java.util.concurrent.TimeoutException;
 9
10 /**
11  * 主题模式-接收消息
12  */
13 public class Recv {
14     public static final String QUEUE_NAME = "test_topic_queue";
15     public static final String EXCHANGE_NAME = "topic_exchange";
16
17     @Test
18     public void recv() throws IOException, TimeoutException, InterruptedException {
19         Connection conn = ConnectionUtils.getConnection();
20         Channel channel = conn.createChannel();
21         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
22         channel.basicQos(1);
23         /*
24          * 队列绑定转发器,路由键通配符#和*
25          * #:表示0个或多个字符
26          * *:表示一个字符
27          * */
28         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.#");
29
30         //定义消费者
31         DefaultConsumer consumer = new DefaultConsumer(channel) {
32             //重写获取到达消息
33             @Override
34             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
35                 String msg = new String(body, "utf-8");
36                 System.out.println("[1] recv: " + msg);
37
38                 try {
39                     Thread.sleep(100);
40                 } catch (InterruptedException e) {
41                     e.printStackTrace();
42                 } finally {
43                     System.out.println("[1] done!");
44                     // 回执
45                     channel.basicAck(envelope.getDeliveryTag(), false);
46                 }
47             }
48         };
49
50         while (true) {
51             //监听队列
52             channel.basicConsume(QUEUE_NAME, false, consumer);
53             Thread.sleep(1000);
54         }
55
56     }
57 }

原文地址:https://www.cnblogs.com/gongxr/p/9646577.html

时间: 2024-11-09 02:52:06

Java使用RabbitMQ之订阅分发(Topic)的相关文章

Java使用RabbitMQ之公平分发

发送消息: 1 package org.study.workfair; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import org.junit.Test; 6 import org.study.utils.ConnectionUtils; 7 8 import java.io.IOException; 9 import java.util.concurrent.Time

RabbitMQ学习第一记:用java连接RabbitMQ

1.什么是RabbitMQ MQ(Message Queue):消息队列,是服务端设计的一个可以存储大量消息的队列,并提供客户端操作队列的方法:生产队列(向队列中添加数据).消费队列(从队列中取数据).RabbitMQ就是基于消息队列的一个典型应用.RabbitMQ除了普通的生产消费功能,还有一些高级功能:公平分发 ,轮询分发,路由模式,通配符模式,发布订阅,队列持久化. 2.java实现RabbitMQ的连接 2.1.RabbitMQ客户端jar包 <dependency><group

事件订阅分发模型

最近看了下各位大师写的事件订阅分发的模型很有感触,js果然强大到行如流水,下面这段模型摘自汤姆大叔的深入理解Javascript,非常感谢原作者 原文链接:http://www.sxrczx.com/docs/js/2305513.html function Event(name) { var handlers = []; this.getName = function () { return name; }; this.addHandler = function (handler) { han

JAVA实现RabbitMQ,附安装过程

RabbitMQ的第一个JAVA实现 RabbitMQ是基于Erlang的,所以首先必须配置Erlang环境 Erlang官网   http://www.erlang.org/ Linux 下Erlang下载选择sourcefile Wget命令下载 Ubuntu下用tar –xzvf *.tar.gz命令解压 依次执行以下命令: ./configure--prefix=/home/hadoop/mydisk/erlang (该过程可能失败,建议sudoapt-get install build

Kafka获取订阅某topic的所有consumer group【客户端版】

之前写过如何用服务器端的API代码来获取订阅某topic的所有consumer group,参见这里.使用服务器端的API需要用到kafka.admin.AdminClient类,但是这个类在0.11.0.0版本已经被标记为不推荐使用了,故目前最合适的方式还是通过客户端API:org.apache.kafka.clients.admin.AdminClient.今天碰到有人问这个问题,我就尝试写了一个.使用之前你需要引入kafka client包依赖(以2.2.0版本为例) Maven: <de

关于rxjs subject订阅分发实现Angular的全局数据管理与同步更新

自定义实现angular中数据的状态管理,如有不妥请指正 一.先介绍一下rxjs中subject: Import {subject}from’rxjs’ Subject 数据的订阅与分发,结合报刊的发布与订阅进行功能的模拟,subject即是observeable对象也是observer对象,subject对于后期没有数据更新时所添加的订阅者是不怎么友好的,因为不跟新数据时订阅者就不在收到返回的数值     const interval$ = interval(1000).pipe(take(1

RabbitMQ (五) : 订阅者模式之分发模式 ( fanout )

前面讲到了简单队列和工作队列. 这两种队列有个非常明显的缺点 : 生产者发送的消息,只能进入到一个队列. 消息只能进入到一个队列就意味着消息只能被一个消费者消费. 尽管工作队列模式中,一个队列中的消息可以被多个消费者消费,但是,具体到每一条消息,却只能被一个消费者消费. 如果想要一个消息被多个消费者消费,那么生产者就必须把这条消息发送到多个队列中去. RabbitMQ 在这个点的设计是 : 在生产者和队列两者之间加入了一个叫做"交换机"的东西. 生产者发送消息时,不直接发送到队列,而是

RabbitMQ (七) : 订阅者模式之主体模式 ( topic )

主体模式和路由模式很像 路由模式是精确匹配 主体模式是模糊匹配 依然先通过管理后台添加一个交换机. 生产者 public class Producer { private const string ExchangeName = "test_exchange_topic"; public static void Send() { //获取一个连接 IConnection connection = ConnectionHelper.GetConnection(); //从连接中获取一个通道

RabbitMQ 发布/订阅

我们会做一些改变,就是把一个消息发给多个消费者,这种模式称之为发布/订阅(类似观察者模式). 为了验证这种模式,我们准备构建一个简单的日志系统.这个系统包含两类程序,一类程序发动日志,另一类程序接收和处理日志. 在我们的日志系统中,每一个运行的接收者程序都会收到日志.然后我们实现,一个接收者将接收到的数据写到硬盘上,与此同时,另一个接收者把接收到的消息展现在屏幕上.本质上来说,就是发布的日志消息会转发给所有的接收者. 1.转发器(Exchanges) RabbitMQ消息模型的核心理念是生产者永