rabbitMQ学习笔记(五) 消息路由

生产者会生产出很多消息 , 但是不同的消费者可能会有不同的需求,只需要接收指定的消息,其他的消息需要被过滤掉。 这时候就可以对消息进行过滤了。 在消费者端设置好需要接收的消息类型。

如果不使用默认的Exchange发送消息,而是使用我们自定定义的Exchange发送消息,那么下面这个方法的第二个参数就不是QueueName了,而是消息的类型。

channel.basicPublish( exchangeName , messageType , null , msg.getBytes());

示例:Sender05.java

 1 package com.zf.rabbitmq05;
 2
 3 import java.io.IOException;
 4
 5 import com.rabbitmq.client.Channel;
 6 import com.rabbitmq.client.Connection;
 7 import com.rabbitmq.client.ConnectionFactory;
 8
 9 /**
10  * 发送消息
11  * @author zhoufeng
12  *
13  */
14 public class Sender05 {
15
16     public static void main(String[] args) throws IOException {
17
18         ConnectionFactory connFac = new ConnectionFactory() ;
19
20         //RabbitMQ-Server安装在本机,所以直接用127.0.0.1
21         connFac.setHost("127.0.0.1");
22
23         //创建一个连接
24         Connection conn = connFac.newConnection() ;
25
26         //创建一个渠道
27         Channel channel = conn.createChannel() ;
28
29         String exchangeName = "exchange02";
30
31         String messageType = "type01";
32
33         channel.exchangeDeclare(exchangeName, "direct") ;
34
35         //定义Queue名
36         String msg = "Hello World!";
37
38         //发送消息
39         channel.basicPublish( exchangeName , messageType , null , msg.getBytes());
40
41         System.out.println("send message[" + msg + "] to "+ exchangeName +" success!");
42
43         channel.close();
44         conn.close();
45
46     }
47
48 }
 1 package com.zf.rabbitmq05;
 2
 3 import java.io.IOException;
 4
 5 import com.rabbitmq.client.Channel;
 6 import com.rabbitmq.client.Connection;
 7 import com.rabbitmq.client.ConnectionFactory;
 8 import com.rabbitmq.client.ConsumerCancelledException;
 9 import com.rabbitmq.client.QueueingConsumer;
10 import com.rabbitmq.client.QueueingConsumer.Delivery;
11 import com.rabbitmq.client.ShutdownSignalException;
12
13 /**
14  * 接收消息
15  * @author zhoufeng
16  *
17  */
18 public class Recv05_01 {
19
20     public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
21
22         ConnectionFactory connFac = new ConnectionFactory() ;
23
24         connFac.setHost("127.0.0.1");
25
26         Connection conn = connFac.newConnection() ;
27
28         Channel channel = conn.createChannel() ;
29
30
31         String exchangeName = "exchange02";
32
33         channel.exchangeDeclare(exchangeName, "direct") ;
34
35         String queueName = channel.queueDeclare().getQueue() ;
36
37         //第三个参数就是type,这里表示只接收type01类型的消息。
38         channel.queueBind(queueName, exchangeName, "type01") ;
39         //也可以选择接收多种类型的消息。只需要再下面再绑定一次就可以了
40         channel.queueBind(queueName, exchangeName, "type02") ;
41
42
43         //配置好获取消息的方式
44         QueueingConsumer consumer = new QueueingConsumer(channel) ;
45         channel.basicConsume(queueName, true, consumer) ;
46
47         //循环获取消息
48         while(true){
49
50             //获取消息,如果没有消息,这一步将会一直阻塞
51             Delivery delivery = consumer.nextDelivery() ;
52
53             String msg = new String(delivery.getBody()) ;
54
55             System.out.println("received message[" + msg + "] from " + exchangeName);
56         }
57
58     }
59
60 }

这时,启动Recv05_01.java 然后启动Sender05.java ,消费者端就会收到消息。

然后将Sender05.java 中的messageType分别改为type02  type03 然后发送消息 , 可以看到消费者端能接收到type02的消息,但是不能接收到type03的消息。

时间: 2024-10-28 21:16:06

rabbitMQ学习笔记(五) 消息路由的相关文章

RabbitMQ学习笔记五:RabbitMQ之优先级消息队列

RabbitMQ优先级队列注意点: 1.只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效 2.RabbitMQ3.5以后才支持优先级队列 代码在博客:RabbitMQ学习笔记三:Java实现RabbitMQ之与Spring集成 最后面有下载地址,只是做了少许改变,改变的代码如下: 消费者 spring-config.xml(还需要增加一个QueueListener监听器,代码就不复制到这里了,可以参考项目中的其他监听器) <!-- =========================

rabbitMQ学习笔记(三) 消息确认与公平调度消费者

从本节开始称Sender为生产者 , Recv为消费者   一.消息确认 为了确保消息一定被消费者处理,rabbitMQ提供了消息确认功能,就是在消费者处理完任务之后,就给服务器一个回馈,服务器就会将该消息删除,如果消费者超时不回馈,那么服务器将就将该消息重新发送给其他消费者 默认是开启的,在消费者端通过下面的方式开启消息确认,  首先将autoAck自动确认关闭,等我们的任务执行完成之后,手动的去确认,类似JDBC的autocommit一样 QueueingConsumer consumer

Android学习笔记五之Service

Android学习笔记五之Service 1.什么是Service? 什么是Service?Service是Android系统的四大组件之一,官方文档是这样描述Service的: A Service is an application component that can perform long-running operations in the background and does not provide a user interface. Another application comp

Caliburn.Micro学习笔记(五)----协同IResult

Caliburn.Micro学习笔记(五)----协同IResult 今天说一下协同IResult 看一下IResult接口 /// <summary> /// Allows custom code to execute after the return of a action. /// </summary> public interface IResult { /// <summary> /// Executes the result using the specif

angular学习笔记(五)-阶乘计算实例(1)

<!DOCTYPE html> <html ng-app> <head> <title>2.3.2计算阶乘实例1</title> <meta charset="utf-8"> <script src="../angular.js"></script> <script src="script.js"></script> </

NLTK学习笔记(五):分类和标注词汇

[TOC] 词性标注器 之后的很多工作都需要标注完的词汇.nltk自带英文标注器pos_tag import nltk text = nltk.word_tokenize("And now for something compleyely difference") print(text) print(nltk.pos_tag(text)) 标注语料库 表示已经标注的标识符:nltk.tag.str2tuple('word/类型') text = "The/AT grand/J

rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理

rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理

Linux System Programming 学习笔记(五) 进程管理

1. 进程是unix系统中两个最重要的基础抽象之一(另一个是文件) A process is a running program A thread is the unit of activity inside of a process the virtualization of memory is associated with the process, the threads all share the same memory address space 2. pid The idle pro

java之jvm学习笔记五(实践写自己的类装载器)

java之jvm学习笔记五(实践写自己的类装载器) 课程源码:http://download.csdn.net/detail/yfqnihao/4866501 前面第三和第四节我们一直在强调一句话,类装载器和安全管理器是可以被动态扩展的,或者说,他们是可以由用户自己定制的,今天我们就是动手试试,怎么做这部分的实践,当然,在阅读本篇之前,至少要阅读过笔记三. 下面我们先来动态扩展一个类装载器,当然这只是一个比较小的demo,旨在让大家有个比较形象的概念. 第一步,首先定义自己的类装载器,从Clas