RabbitMQ交换机规则实例

  RabbitMQ Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了。下面分别以实例的方式对这几种exchange进行讲解。

direct

  首先我们以路由的方式对消息进行过滤,代码如下:

生产者

 1 public class RoutingSendDirect {
 2
 3     private static final String EXCHANGE_NAME = "direct_test";
 4
 5     private static final String[] routingKeys = new String[]{"info" ,"warning", "error"};
 6
 7     public static void main(String[] args) throws IOException, TimeoutException {
 8         ConnectionFactory connectionFactory = new ConnectionFactory();
 9         connectionFactory.setHost("localhost");
10         Connection connection = connectionFactory.newConnection();
11         Channel channel = connection.createChannel();
12         channel.exchangeDeclare(EXCHANGE_NAME,"direct");
13         for(String key : routingKeys){
14             String message = "RoutingSendDirect Send the message level:" + key;
15             channel.basicPublish(EXCHANGE_NAME,key,null,message.getBytes());
16             System.out.println("RoutingSendDirect Send"+key +"‘:‘" + message);
17         }
18         channel.close();
19         connection.close();
20     }
21 }

消费者

 1 public class ReceiveDirect1 {
 2     // 交换器名称
 3     private static final String EXCHANGE_NAME = "direct_test";
 4     // 路由关键字
 5     private static final String[] routingKeys = new String[]{"info" ,"warning"};
 6
 7     public static void main(String[] args) throws IOException, TimeoutException {
 8         ConnectionFactory connectionFactory = new ConnectionFactory();
 9         connectionFactory.setHost("localhost");
10         Connection connection = connectionFactory.newConnection();
11         Channel channel = connection.createChannel();
12         channel.exchangeDeclare(EXCHANGE_NAME,"direct");
13         //获取匿名队列名称
14         String queueName=channel.queueDeclare().getQueue();
15         for(String key : routingKeys){
16             channel.queueBind(queueName,EXCHANGE_NAME,key);
17             System.out.println("ReceiveDirect1 exchange:"+EXCHANGE_NAME+"," +
18                     " queue:"+queueName+", BindRoutingKey:" + key);
19         }
20
21         System.out.println("ReceiveDirect1  Waiting for messages");
22         Consumer consumer = new DefaultConsumer(channel){
23             @Override
24             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
25                 String msg = new String(body,"UTF-8");
26                 System.out.println("ReceiveDirect1 Received ‘" + envelope.getRoutingKey() + "‘:‘" + msg + "‘");
27             }
28         };
29
30         channel.basicConsume(queueName, true, consumer);
31     }
32 }
 1 public class ReceiveDirect2 {
 2     // 交换器名称
 3     private static final String EXCHANGE_NAME = "direct_test";
 4     // 路由关键字
 5     private static final String[] routingKeys = new String[]{"error"};
 6
 7     public static void main(String[] args) throws IOException, TimeoutException {
 8         ConnectionFactory connectionFactory = new ConnectionFactory();
 9         connectionFactory.setHost("localhost");
10         Connection connection = connectionFactory.newConnection();
11         Channel channel = connection.createChannel();
12         channel.exchangeDeclare(EXCHANGE_NAME,"direct");
13         //获取匿名队列名称
14         String queueName=channel.queueDeclare().getQueue();
15         for(String key : routingKeys){
16             channel.queueBind(queueName,EXCHANGE_NAME,key);
17             System.out.println("ReceiveDirect2 exchange:"+EXCHANGE_NAME+"," +
18                     " queue:"+queueName+", BindRoutingKey:" + key);
19         }
20
21         System.out.println("ReceiveDirect2  Waiting for messages");
22         Consumer consumer = new DefaultConsumer(channel){
23             @Override
24             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
25                 String msg = new String(body,"UTF-8");
26                 System.out.println("ReceiveDirect2 Received ‘" + envelope.getRoutingKey() + "‘:‘" + msg + "‘");
27             }
28         };
29
30         channel.basicConsume(queueName, true, consumer);
31     }
32 }

运行结果如下:

 1 RoutingSendDirect Sendinfo‘:‘RoutingSendDirect Send the message level:info
 2 RoutingSendDirect Sendwarning‘:‘RoutingSendDirect Send the message level:warning
 3 RoutingSendDirect Senderror‘:‘RoutingSendDirect Send the message level:error
 4
 5 ReceiveDirect1 exchange:direct_test, queue:amq.gen-HsUrzbjzto-K7HeigXSEfQ, BindRoutingKey:info
 6 ReceiveDirect1 exchange:direct_test, queue:amq.gen-HsUrzbjzto-K7HeigXSEfQ, BindRoutingKey:warning
 7 ReceiveDirect1  Waiting for messages
 8 ReceiveDirect1 Received ‘info‘:‘RoutingSendDirect Send the message level:info‘
 9 ReceiveDirect1 Received ‘warning‘:‘RoutingSendDirect Send the message level:warning‘
10
11 ReceiveDirect2 exchange:direct_test, queue:amq.gen-i3NY12l3DqWjGapaBOCdwQ, BindRoutingKey:error
12 ReceiveDirect2  Waiting for messages
13 ReceiveDirect2 Received ‘error‘:‘RoutingSendDirect Send the message level:error‘

fanout

  fanout和别的MQ的发布/订阅模式类似,实例如下:

生产者  

 1 public class Pub {
 2     private static final String EXCHANGE_NAME = "logs";
 3     public static void main(String[] args) throws IOException, TimeoutException {
 4         ConnectionFactory factory=new ConnectionFactory();
 5         factory.setHost("localhost");
 6         Connection connection=factory.newConnection();
 7         Channel channel=connection.createChannel();
 8         //fanout表示分发,所有的消费者得到同样的队列信息
 9         channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
10         //分发信息
11         for (int i=0;i<5;i++){
12             String message="Hello World"+i;
13             channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
14             System.out.println("Pub Sent ‘" + message + "‘");
15         }
16         channel.close();
17         connection.close();
18     }
19 }

消费者

public class Sub {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        //产生一个随机的队列名称
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");//对队列进行绑定

        System.out.println("Sub Waiting for messages");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Sub Received ‘" + message + "‘");
            }
        };
        channel.basicConsume(queueName, true, consumer);//队列会自动删除
    }
}

Topics

这种应该属于模糊匹配,* :可以替代一个词,#:可以替代0或者更多的词,现在我们继续看看代码来理解

生产者  

 1 public class TopicSend {
 2     private static final String EXCHANGE_NAME = "topic_logs";
 3
 4     public static void main(String[] args) throws IOException, TimeoutException {
 5         Connection connection = null;
 6         Channel channel = null;
 7         try{
 8             ConnectionFactory factory=new ConnectionFactory();
 9             factory.setHost("localhost");
10             connection=factory.newConnection();
11             channel=connection.createChannel();
12
13             //声明一个匹配模式的交换机
14             channel.exchangeDeclare(EXCHANGE_NAME,"topic");
15             //待发送的消息
16             String[] routingKeys=new String[]{
17                     "quick.orange.rabbit",
18                     "lazy.orange.elephant",
19                     "quick.orange.fox",
20                     "lazy.brown.fox",
21                     "quick.brown.fox",
22                     "quick.orange.male.rabbit",
23                     "lazy.orange.male.rabbit"
24             };
25             //发送消息
26             for(String severity :routingKeys){
27                 String message = "From "+severity+" routingKey‘ s message!";
28                 channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
29                 System.out.println("TopicSend Sent ‘" + severity + "‘:‘" + message + "‘");
30             }
31         }catch (Exception e){
32             e.printStackTrace();
33             if (connection!=null){
34                 channel.close();
35                 connection.close();
36             }
37         }finally {
38             if (connection!=null){
39                 channel.close();
40                 connection.close();
41             }
42         }
43     }
44 }

消费者 

 1 public class ReceiveLogsTopic1 {
 2     private static final String EXCHANGE_NAME = "topic_logs";
 3
 4     public static void main(String[] args) throws IOException, TimeoutException {
 5         ConnectionFactory factory = new ConnectionFactory();
 6         factory.setHost("localhost");
 7         Connection connection = factory.newConnection();
 8         Channel channel = connection.createChannel();
 9
10         //声明一个匹配模式的交换机
11         channel.exchangeDeclare(EXCHANGE_NAME, "topic");
12         String queueName = channel.queueDeclare().getQueue();
13         //路由关键字
14         String[] routingKeys = new String[]{"*.orange.*"};
15         //绑定路由
16         for (String routingKey : routingKeys) {
17             channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
18             System.out.println("ReceiveLogsTopic1 exchange:" + EXCHANGE_NAME + ", queue:" + queueName + ", BindRoutingKey:" + routingKey);
19         }
20         System.out.println("ReceiveLogsTopic1 Waiting for messages");
21
22         Consumer consumer = new DefaultConsumer(channel) {
23             @Override
24             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
25                 String message = new String(body, "UTF-8");
26                 System.out.println("ReceiveLogsTopic1 Received ‘" + envelope.getRoutingKey() + "‘:‘" + message + "‘");
27             }
28         };
29         channel.basicConsume(queueName, true, consumer);
30     }
31 }
 1  public class ReceiveLogsTopic2 {
 2     private static final String EXCHANGE_NAME = "topic_logs";
 3
 4     public static void main(String[] argv) throws IOException, TimeoutException {
 5         ConnectionFactory factory = new ConnectionFactory();
 6         factory.setHost("localhost");
 7         Connection connection = factory.newConnection();
 8         Channel channel = connection.createChannel();
 9 //      声明一个匹配模式的交换器
10         channel.exchangeDeclare(EXCHANGE_NAME, "topic");
11         String queueName = channel.queueDeclare().getQueue();
12         // 路由关键字
13         String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"};
14 //      绑定路由关键字
15         for (String bindingKey : routingKeys) {
16             channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
17             System.out.println("ReceiveLogsTopic2 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + bindingKey);
18         }
19
20         System.out.println("ReceiveLogsTopic2 Waiting for messages");
21
22         Consumer consumer = new DefaultConsumer(channel) {
23             @Override
24             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException  {
25                 String message = new String(body, "UTF-8");
26                 System.out.println("ReceiveLogsTopic2 Received ‘" + envelope.getRoutingKey() + "‘:‘" + message + "‘");
27             }
28         };
29         channel.basicConsume(queueName, true, consumer);
30     }
31 }

原文地址:https://www.cnblogs.com/senlinyang/p/8440627.html

时间: 2024-11-05 19:38:32

RabbitMQ交换机规则实例的相关文章

详细讲解多个华为交换机配置实例

详细讲解多个华为交换机配置实例 交换机的配置是网络管理员的基本技能,本文以华为S5700交换机为例,结合使用sNSP模拟器,详细阐述VLAN配置.VLAN之间通信.跨交换机VLAN配置.跨交换机VLAN之间通信等.(备注:以下配置的都是基于交换机接口的VLAN) 一.单台交换机下VLAN配置 1.配置单个VLAN 华为S5700本身默认有个VLAN,若不另行配置,直接接入交换机的终端都属于默认的VLAN,其编号是1.若要手动配置一个指定编号为10的VLAN,可用eNSP创建如下拓扑. 其中LSW

URL路由规则实例

1.设置支持路由和写路由规则 2.控制器中这样来写 3.测试结果: URL路由规则实例

交换机配置实例(DHCP、VLAN)

交换机配置实例(DHCP.VLAN) 1. 端口地址配置 int g0/0/1 ip add 172.16.131.5 255.255.255.0 2. 静态路由配置 目的IP 掩码 下一跳 0.0.0.0 0.0.0.0 X.X.X.X ip route-static 0.0.0.0 0.0.0.0 192.168.88.1 这条路由是所有的访问下一跳都是88.1 ip route-static 10.10.100.0 255.255.255.0 10.10.101.1 这条路由是10.10.

rabbitMQ第三篇:采用不同的交换机规则

在上一篇我们都是采用发送信息到队列然后队列把信息在发送到消费者,其实实际情况并非如此,rabbitMQ其实真正的思想是生产者不发送任何信息到队列,甚至不知道信息将发送到哪个队列.相反生产者只能发送信息到交换机,交换机接收到生产者的信息,然后按照规则把它推送到对列中,交换机是如何做处理他接收到的信息,并怎么样发送到特定的队列,那么这一篇主要是讲解交换机的规则. 一:发布/订阅 在上一篇说到的队列都指定了名称,但是现在我们不需要这么做,我们需要所有的日志信息,而不只是其中的一个.如果要做这样的队列,

RabbitMQ交换机、RabbitMQ整合springCloud

目标 1.交换机 2.RabbitMQ整合springCloud 交换机 蓝色区域===生产者 红色区域===Server:又称Broker,接受客户端的连接,实现AMQP实体服务 绿色区域===消费者 黄色区域===就是我们的交换机以及队列 由生产者投递信息到RabbitMQ Server里面某一个交换机对应的队列中,消费者则是从对应的队列中获取信息 交换机属性: Name:交换机名称 Type:交换机类型 direct.topic.fanout.headers Durability:是否需要

Swift难点-继承中的构造规则实例详解

一.两种构造器-指定构造器和便利构造器 指定构造器:类中必备的构造器,为所有的属性赋初值.(有些子类可能不需要显示声明,因为默认从基类继承了) 便利构造器:类中的辅助构造器,通过调用指定构造器为属性赋初值.(仅在必要的时候声明) 举例 [plain] view plaincopy class Food { var name: String init(name: String) { self.name = name } convenience init() { self.init(name: "[

nginx 正则及rewrite常用规则实例

一.正则表达式匹配,其中:* ~ 为区分大小写匹配* ~* 为不区分大小写匹配* !~和!~*分别为区分大小写不匹配及不区分大小写不匹配二.文件及目录匹配,其中:* -f和!-f用来判断是否存在文件* -d和!-d用来判断是否存在目录* -e和!-e用来判断是否存在文件或目录* -x和!-x用来判断文件是否可执行三.rewrite指令的最后一项参数为flag标记,flag标记有:1.last    相当于apache里面的[L]标记,表示rewrite.2.break本条规则匹配完成后,终止匹配

Swift难点-继承中的构造规则实例具体解释

关于继承中的构造规则是一个难点. 假设有问题,请留言问我. 我的Swift新手教程专栏 http://blog.csdn.net/column/details/swfitexperience.html 为什么要有构造器:为类中自身和继承来的存储属性赋初值. 一.两种构造器-指定构造器和便利构造器 指定构造器:类中必备的构造器.为全部的属性赋初值.(有些子类可能不须要显示声明,由于默认从基类继承了) 便利构造器:类中的辅助构造器,通过调用指定构造器为属性赋初值.(仅在必要的时候声明) 举例 cla

Rabbitmq交换机(exchange)类型解释

1.topic 将路由键和某模式进行匹配.此时队列需要绑定要一个模式上.符号"#"匹配一个或多个词,符号"*"匹配不多不少一个词.因此"abc.#"能够匹配到"abc.def.ghi",但是"abc.*" 只会匹配到"abc.def". 任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上 1.这种模式较为复杂,简单来说,就是每个队列都