生产者会生产出很多消息 , 但是不同的消费者可能会有不同的需求,只需要接收指定的消息,其他的消息需要被过滤掉。 这时候就可以对消息进行过滤了。 在消费者端设置好需要接收的消息类型。
如果不使用默认的Exchange发送消息,而是使用我们自定定义的Exchange发送消息,那么下面这个方法的第二个参数就不是QueueName了,而是消息的类型。
channel.basicPublish( exchangeName , messageType , null , msg.getBytes());
示例:Sender05.java
1 package com.zf.rabbitmq05; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8 9 /** 10 * 发送消息 11 * @author zhoufeng 12 * 13 */ 14 public class Sender05 { 15 16 public static void main(String[] args) throws IOException { 17 18 ConnectionFactory connFac = new ConnectionFactory() ; 19 20 //RabbitMQ-Server安装在本机,所以直接用127.0.0.1 21 connFac.setHost("127.0.0.1"); 22 23 //创建一个连接 24 Connection conn = connFac.newConnection() ; 25 26 //创建一个渠道 27 Channel channel = conn.createChannel() ; 28 29 String exchangeName = "exchange02"; 30 31 String messageType = "type01"; 32 33 channel.exchangeDeclare(exchangeName, "direct") ; 34 35 //定义Queue名 36 String msg = "Hello World!"; 37 38 //发送消息 39 channel.basicPublish( exchangeName , messageType , null , msg.getBytes()); 40 41 System.out.println("send message[" + msg + "] to "+ exchangeName +" success!"); 42 43 channel.close(); 44 conn.close(); 45 46 } 47 48 }
1 package com.zf.rabbitmq05; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8 import com.rabbitmq.client.ConsumerCancelledException; 9 import com.rabbitmq.client.QueueingConsumer; 10 import com.rabbitmq.client.QueueingConsumer.Delivery; 11 import com.rabbitmq.client.ShutdownSignalException; 12 13 /** 14 * 接收消息 15 * @author zhoufeng 16 * 17 */ 18 public class Recv05_01 { 19 20 public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { 21 22 ConnectionFactory connFac = new ConnectionFactory() ; 23 24 connFac.setHost("127.0.0.1"); 25 26 Connection conn = connFac.newConnection() ; 27 28 Channel channel = conn.createChannel() ; 29 30 31 String exchangeName = "exchange02"; 32 33 channel.exchangeDeclare(exchangeName, "direct") ; 34 35 String queueName = channel.queueDeclare().getQueue() ; 36 37 //第三个参数就是type,这里表示只接收type01类型的消息。 38 channel.queueBind(queueName, exchangeName, "type01") ; 39 //也可以选择接收多种类型的消息。只需要再下面再绑定一次就可以了 40 channel.queueBind(queueName, exchangeName, "type02") ; 41 42 43 //配置好获取消息的方式 44 QueueingConsumer consumer = new QueueingConsumer(channel) ; 45 channel.basicConsume(queueName, true, consumer) ; 46 47 //循环获取消息 48 while(true){ 49 50 //获取消息,如果没有消息,这一步将会一直阻塞 51 Delivery delivery = consumer.nextDelivery() ; 52 53 String msg = new String(delivery.getBody()) ; 54 55 System.out.println("received message[" + msg + "] from " + exchangeName); 56 } 57 58 } 59 60 }
这时,启动Recv05_01.java 然后启动Sender05.java ,消费者端就会收到消息。
然后将Sender05.java 中的messageType分别改为type02 type03 然后发送消息 , 可以看到消费者端能接收到type02的消息,但是不能接收到type03的消息。
时间: 2024-10-28 21:16:06