RabbitMQ学习笔记3-使用topic交换器

本例使用topic接收警告、错误的日志,并根据匹配的路由规则发送给不同的Queue队列来处理的例子:

日志生产者SenderWithTopicExchange

 1 package com.yzl.test2;
 2
 3 import java.util.concurrent.CountDownLatch;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.Executors;
 6
 7 import com.rabbitmq.client.Channel;
 8 import com.rabbitmq.client.Connection;
 9 import com.rabbitmq.client.ConnectionFactory;
10
11 /**
12  * 使用topic交换器发送消息
13  * 分为警告和错误2种日志
14  * @author: yzl
15  * @date: 2016-10-22
16  */
17 public class SenderWithTopicExchange {
18     //交换器名称
19     private static final String EXCHANGE_NAME = "myTopicExchange";
20     //路由键的前缀
21     private static final String BASE_ROUTING_KEY = "logger.";
22
23     public static void main(String[] args) throws Exception {
24         //使用CountDownLatch控制2个线程一起运行
25         final CountDownLatch cdl = new CountDownLatch(2);
26         //连接到rabbitmq服务器
27         ConnectionFactory factory = new ConnectionFactory();
28         factory.setHost("localhost");
29         Connection connection = factory.newConnection();
30         //创建一个信道
31         final Channel channel = connection.createChannel();
32         //定义一个名字为topicExchange的topic类型的exchange
33         channel.exchangeDeclare(EXCHANGE_NAME, "topic");
34
35         ExecutorService pool = Executors.newFixedThreadPool(2);
36         pool.submit(new Runnable() {
37             @Override
38             public void run() {
39                 try {
40                     cdl.await();
41                     //发送警告日志,绑定路由键:logger.warning
42                     String warningMsg = "warning message is :";
43                     for(int i=1; i<800; i++){
44                         System.out.println("发送警告消息:" + warningMsg+i);
45                         channel.basicPublish(EXCHANGE_NAME, BASE_ROUTING_KEY + "warning", null, (warningMsg+i).getBytes());
46                         Thread.sleep(2000L);
47                     }
48                 } catch (Exception e) {
49                     e.printStackTrace();
50                 }
51             }
52         });
53         pool.submit(new Runnable() {
54             @Override
55             public void run() {
56                 try {
57                     cdl.await();
58                     //发送错误日志,绑定路由键:logger.error
59                     String errorMsg = "error message is :";
60                     for(int i=1; i<1000; i++){
61                         System.out.println("发送错误消息:" + errorMsg+i);
62                         channel.basicPublish(EXCHANGE_NAME, BASE_ROUTING_KEY + "error", null, (errorMsg+i).getBytes());
63                         Thread.sleep(2000L);
64                     }
65                 } catch (Exception e) {
66                     e.printStackTrace();
67                 }
68             }
69         });
70
71         cdl.countDown();
72         cdl.countDown();
73     }
74 }

消息消费者ReceiverWithTopicExchange

 1 package com.yzl.test2;
 2
 3 import java.io.IOException;
 4
 5 import com.rabbitmq.client.AMQP.BasicProperties;
 6 import com.rabbitmq.client.Channel;
 7 import com.rabbitmq.client.Connection;
 8 import com.rabbitmq.client.ConnectionFactory;
 9 import com.rabbitmq.client.DefaultConsumer;
10 import com.rabbitmq.client.Envelope;
11
12 /**
13  * 使用topic交换器接收消息
14  *
15  * @author: yzl
16  * @date: 2016-10-22
17  */
18 public class ReceiverWithTopicExchange {
19     // 交换器名称
20     private static final String EXCHANGE_NAME = "myTopicExchange";
21
22     public static void main(String[] args) throws Exception {
23         // 连接到rabbitmq服务器
24         ConnectionFactory factory = new ConnectionFactory();
25         factory.setHost("localhost");
26         Connection connection = factory.newConnection();
27         // 创建一个信道
28         final Channel channel = connection.createChannel();
29         // 定义一个名字为topicExchange的topic类型的exchange
30         channel.exchangeDeclare(EXCHANGE_NAME, "topic");
31
32         //定义接收警告消息的队列
33         channel.queueDeclare("warningQueue", false, false, false, null);
34         //定义接收错误消息的队列
35         channel.queueDeclare("errorQueue", false, false, false, null);
36         //定义接收所有级别日志消息的队列
37         channel.queueDeclare("allLoggerQueue", false, false, false, null);
38
39         //使用logger.warning路由键绑定myTopicExchange与warningQueue
40         channel.queueBind("warningQueue", EXCHANGE_NAME, "logger.warning");
41         //使用logger.error路由键绑定myTopicExchange与errorQueue
42         channel.queueBind("errorQueue", EXCHANGE_NAME, "logger.error");
43         //使用logger.*路由规则绑定myTopicExchange与allLoggerQueue
44         channel.queueBind("allLoggerQueue", EXCHANGE_NAME, "logger.*");
45
46         //只处理警告日志,使用手动ack确认
47         channel.basicConsume("warningQueue", false, new DefaultConsumer(channel){
48             @Override
49             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
50                     throws IOException {
51                 String msg = new String(body);
52                 System.out.println("warningQueue accept a warning msg :" + msg);
53                 channel.basicAck(envelope.getDeliveryTag(), false);
54             }
55         });
56         //只处理错误日志,使用手动ack确认
57         channel.basicConsume("errorQueue", false, new DefaultConsumer(channel){
58             @Override
59             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
60                     throws IOException {
61                 String msg = new String(body);
62                 System.out.println("errorQueue accept a error msg :" + msg);
63                 channel.basicAck(envelope.getDeliveryTag(), false);
64             }
65         });
66         //处理警告和错误日志,使用手动ack确认
67         channel.basicConsume("allLoggerQueue", false, new DefaultConsumer(channel){
68             @Override
69             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
70                     throws IOException {
71                 String msg = new String(body);
72                 System.out.println("allLoggerQueue accept a logger msg :" + msg);
73                 channel.basicAck(envelope.getDeliveryTag(), false);
74             }
75         });
76     }
77 }

结果输出:

发送警告消息:warning message is :1
发送错误消息:error message is :1
发送警告消息:warning message is :2
发送错误消息:error message is :2
发送错误消息:error message is :3
发送警告消息:warning message is :3
allLoggerQueue accept a logger msg :error message is :1
allLoggerQueue accept a logger msg :warning message is :1
errorQueue accept a error msg :error message is :1
warningQueue accept a warning msg :warning message is :1
warningQueue accept a warning msg :warning message is :2
errorQueue accept a error msg :error message is :2
allLoggerQueue accept a logger msg :warning message is :2
allLoggerQueue accept a logger msg :error message is :2
allLoggerQueue accept a logger msg :warning message is :3
errorQueue accept a error msg :error message is :3
warningQueue accept a warning msg :warning message is :3
allLoggerQueue accept a logger msg :error message is :3

消息处理流程:

时间: 2024-12-28 11:38:27

RabbitMQ学习笔记3-使用topic交换器的相关文章

RabbitMQ学习笔记4-使用fanout交换器

fanout交换器会把发送给它的所有消息发送给绑定在它上面的队列,起到广播一样的效果. 本里使用实际业务中常见的例子, 订单系统:创建订单,然后发送一个事件消息 积分系统:发送订单的积分奖励 短信平台:发送订单的短信 消息生产者SenderWithFanoutExchange 1 package com.yzl.test3; 2 3 import java.util.Date; 4 5 import com.google.gson.Gson; 6 import com.rabbitmq.clie

RabbitMQ学习笔记五:RabbitMQ之优先级消息队列

RabbitMQ优先级队列注意点: 1.只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效 2.RabbitMQ3.5以后才支持优先级队列 代码在博客:RabbitMQ学习笔记三:Java实现RabbitMQ之与Spring集成 最后面有下载地址,只是做了少许改变,改变的代码如下: 消费者 spring-config.xml(还需要增加一个QueueListener监听器,代码就不复制到这里了,可以参考项目中的其他监听器) <!-- =========================

rabbitMQ学习笔记(六) topic类型消息。

上一节中使用了消息路由,消费者可以选择性的接收消息. 但是这样还是不够灵活. 比如某个消费者要订阅娱乐新闻消息 . 包括新浪.网易.腾讯的娱乐新闻.那么消费者就需要绑定三次,分别绑定这三个网站的消息类型. 如果新闻门户更多了,那么消费者将要绑定个更多的消息类型, 其实消费者只是需要订阅娱乐新闻,不管是哪个网站的新闻,都需要. 那么在rabbitMQ中可以使用topic类型. 模糊匹配消息类型. 模糊匹配中的 *代表一个  #代表零个或1个 示例: 1 package com.zf.rabbitm

使用php-amqplib连接rabbitMQ 学习笔记及总结

1.使用composer安装php-amqplib 在你的项目中添加一个 composer.json文件: { "require": { "php-amqplib/php-amqplib": "2.6.*" } } 只要你已经安装Composer功能,你可以运行以下: $ composer install 已经存在的项目则执行 $ composer update这时在verdor目录就已经下载完毕 具体可以参考官方文档:https://githu

RabbitMQ学习笔记2-理解消息通信

消息包含两部分:1.有效载荷(payload) - 你想要传输的数据.2.标签(lable) - 描述有效载荷的相关信息,包含具体的交换器.消息的接受兴趣方等. rabbitmq的基础流程如下: RabbitMQ的客户端和服务端通过channel与RabbitMQ服务器进行通信. Channel(信道):程序和RabbitMQ之间的连接是通过channel,channel是基于TCP协议之上的?, 一个TCP连接可以有多个channel,可以比喻成如下:一根大电缆里面有很多根小的电线,大电缆就是

rabbitmq学习笔记

1 基本概念 rabbitmq server(broker server):rabbitmq服务 client:包括producers和consumer message:包括payload和label exchange:producer发布message的地方 queue:messages存放和consumer收取message的地方 bindings:将message从exchange到不同queue的实现 connection:tcp连接,producer和consumer都是通过conne

rabbitmq学习笔记2 基本概念

官网:http://www.rabbitmq.com 参考:http://blog.csdn.net/column/details/rabbitmq.html 1 基本概念 rabbitmq server(broker server):rabbitmq服务 client:包括producers和consumer message:包括payload和label exchange:producer发布message的地方 queue:messages存放和consumer收取message的地方 b

rabbitMQ学习笔记(四) 发布/订阅消息

前面都是一条消息只会被一个消费者处理. 如果要每个消费者都处理同一个消息,rabbitMq也提供了相应的方法. 在以前的程序中,不管是生产者端还是消费者端都必须知道一个指定的QueueName才能发送.获取消息.  而rabbitMQ消息模型的核心思想是生产者不会将消息直接发送给队列. 因为,生产者通常不会知道消息将会被哪些消费者接收. 生产者的消息虽然不是直接发送给Queue,但是消息会交给Exchange,所以需要定义Exchange的消息分发模式 ,之前的程序中,有如下一行代码: chan

rabbitmq学习笔记1 安装和配置

环境 OS: CentOS Linux release 7.1.1503 (Core) kernel:3.10.0-229.el7.x86_64 安装 参考:http://www.rabbitmq.com/install-rpm.html Erlang rabbitmq 从官网:http://www.rabbitmq.com/,找出最新版本为3.6.2