Routing(路由模式)

  • Routing(路由模式)

    生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费信息.

    • Direct exchange

      会把消息路由到那些binding key与routing key完全匹配的Queue中
      我们可以看到绑定了两个队列的exchange X。第一个队列binding key 为orange,第二个binding key为两个,一个binding key为black,另一个binding key为green。
      使用routing key为orange发布到交换机的消息 将被路由到队列Q1。routing key为black 或green的消息将转到Q2。所有其他消息将被丢弃。

      Multiple Bindings

      用相同的binding key 绑定多个队列,可以使用binding key 为black在X和Q1与Q2之间添加绑定。在这种情况下,exchange的行为将类似于扇出,并将消息广播到所有匹配的队列。routing key为black的消息将同时传递给 Q1和Q2

    • 下面代码实现生产者和消费者的Direct模式

      生产者代码:

      public class DirectEmitLog {
      
          private static final String EXCHANGE_NAME = "direct_logs";
      
          public static void main(String[] args) throws Exception {
              //获取连接
              Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
      
              Channel channel = connection.createChannel();
      
              //创建队列
              channel.queueDeclare("direct_loge",false,false,false,null);
              //声明交换机,
              channel.exchangeDeclare(EXCHANGE_NAME, "direct");
      
              String message="hello";
              //发送消息
      
      //        for (int i = 0; i < 10; i++) {
      //            String message = " message" + i;
      //            System.out.println("[send]:" + message);
                  //发送消息
                  channel.basicPublish(EXCHANGE_NAME, "err", null, message.getBytes("utf-8"));
      
              //}
              channel.close();
              connection.close();
      
          }
      
      }

      消费者代码:

      public class DirectRecv {
      
          private final static String QUEUE_NAME = "direct_loge";
          private static final String EXCHANGE_NAME = "direct_logs";
          public static void main(String[] args) throws Exception {
              //获取连接
              Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
      
              //声明通道
              Channel channel = connection.createChannel();
      
              channel.exchangeDeclare(EXCHANGE_NAME, "direct");
      
              //声明队列队列
              channel.queueDeclare(QUEUE_NAME, false, false, false, null);
              channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "err");
      
              DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                  String message = new String(delivery.getBody(), "UTF-8");
      
                  System.out.println(" [x] Received '" + message + "'");
                  try {
                      doWork(message);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  } finally {
                      System.out.println(" [x] Done");
                      //channel.basicAck();
                      //channel.basicNack();
                  }
              };
              boolean autoAck = true; // acknowledgment is covered below
              channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
              });
      
      //        DeliverCallback deliverCallback = new DeliverCallback(){
      //            @Override
      //            public void handle(String consumerTag, Delivery delivery) throws IOException {
      //                String message = new String(delivery.getBody(), "UTF-8");
      //                System.out.println(" [x] Received '" + message + "'");
      //            }
      //        };
      //
      //        channel.basicConsume(QUEUE_NAME, true, deliverCallback, new CancelCallback(){
      //            @Override
      //            public void handle(String consumerTag) throws IOException {
      //
      //            }
      //        });
      
          }
      
          private static void doWork(String task) throws InterruptedException {
              for (char ch : task.toCharArray()) {
                  if (ch == '.') Thread.sleep(1000);
              }
          }
      
      }
    • SpringBoot相关代码:
      @SpringBootApplication
      @EnableScheduling
      public class RabbitAmqpTutorialsApplication {
      
          public static void main(String[] args) throws Exception {
              SpringApplication.run(RabbitAmqpTutorialsApplication.class, args);
          }
      
      }
      @Configuration
      public class Tut4Config {
      
          @Bean
          public DirectExchange direct() {
              return new DirectExchange("tut.direct");
          }
      
          private static class ReceiverConfig {
      
              @Bean
              public Queue autoDeleteQueue1() {
                  return new AnonymousQueue();
              }
      
              @Bean
              public Queue autoDeleteQueue2() {
                  return new AnonymousQueue();
              }
      
              @Bean
              public Binding binding1a(DirectExchange direct,
                                       Queue autoDeleteQueue1) {
                  return BindingBuilder.bind(autoDeleteQueue1)
                          .to(direct)
                          .with("orange");
              }
      
              @Bean
              public Binding binding1b(DirectExchange direct,
                                       Queue autoDeleteQueue1) {
                  return BindingBuilder.bind(autoDeleteQueue1)
                          .to(direct)
                          .with("black");
              }
      
              @Bean
              public Binding binding2a(DirectExchange direct,
                                       Queue autoDeleteQueue2) {
                  return BindingBuilder.bind(autoDeleteQueue2)
                          .to(direct)
                          .with("green");
              }
      
              @Bean
              public Binding binding2b(DirectExchange direct,
                                       Queue autoDeleteQueue2) {
                  return BindingBuilder.bind(autoDeleteQueue2)
                          .to(direct)
                          .with("black");
              }
      
              @Bean
              public Tut4Receiver receiver() {
                  return new Tut4Receiver();
              }
          }
      
          @Bean
          public Tut4Sender sender() {
              return new Tut4Sender();
          }
      
      }
      public class Tut4Receiver {
      
          @RabbitListener(queues = "#{autoDeleteQueue1.name}")
          public void receive1(String in) throws InterruptedException {
              receive(in, 1);
          }
      
          @RabbitListener(queues = "#{autoDeleteQueue2.name}")
          public void receive2(String in) throws InterruptedException {
              receive(in, 2);
          }
      
          public void receive(String in, int receiver) throws InterruptedException {
              StopWatch watch = new StopWatch();
              watch.start();
              System.out.println("instance " + receiver + " [x] Received '" + in + "'");
              doWork(in);
              watch.stop();
              System.out.println("instance " + receiver + " [x] Done in " +
                      watch.getTotalTimeSeconds() + "s");
          }
      
          private void doWork(String in) throws InterruptedException {
              for (char ch : in.toCharArray()) {
                  if (ch == '.') {
                      Thread.sleep(1000);
                  }
              }
          }
      
      }
      public class Tut4Sender {
      
          @Autowired
          private RabbitTemplate template;
      
          @Autowired
          private DirectExchange direct;
      
          AtomicInteger index = new AtomicInteger(0);
      
          AtomicInteger count = new AtomicInteger(0);
      
          private final String[] keys = {"orange", "black", "green"};
      
          @Scheduled(fixedDelay = 1000, initialDelay = 500)
          public void send() {
              StringBuilder builder = new StringBuilder("Hello to ");
              if (this.index.incrementAndGet() == 3) {
                  this.index.set(0);
              }
              String key = keys[this.index.get()];
              builder.append(key).append(' ');
              builder.append(this.count.get());
              String message = builder.toString();
              template.convertAndSend(direct.getName(), key, message);
              System.out.println(" [x] Sent '" + message + "'");
          }
      
      }

      相关代码链接: https://github.com/albert-liu435/springmq

原文地址:https://www.cnblogs.com/haizhilangzi/p/12301713.html

时间: 2024-10-17 21:20:21

Routing(路由模式)的相关文章

routing路由模式

一:介绍 1.模式 2.应用场景 如果exchangge与队列中的key相同,消息就发送过去. 这个就是需要将交换机与队列增加key. 3.路由类型 上节课的订阅模式中的路由类型是Fanout. 这篇文章的路由类型是Direct. 二:程序 1.生产者 原文地址:https://www.cnblogs.com/juncaoit/p/8613077.html

python使用rabbitMQ介绍四(路由模式)

一.模式介绍 路由模式,与发布-订阅模式一样,消息发送到exchange中,消费者把队列绑定到exchange上. 这种模式在exchange上添加添加了一个路由键(routing-key),生产者发布消息的时候添加路由键(routing-key),消费者绑定队列到交换机时添加键值(routing-key),这样就可以接收到对应的消息. 路由模式的direct exchange. 队列模型: 与发布-订阅模式不同的是,每个消费者队列接收的消息不同,根据消息的routing-key把消息发送到不同

.NET/ASP.NET Routing路由(深入解析路由系统架构原理)

出处:http://www.cnblogs.com/wangiqngpei557/ 阅读目录: 1.开篇介绍 2.ASP.NET Routing 路由对象模型的位置 3.ASP.NET Routing 路由对象模型的入口 4.ASP.NET Routing 路由对象模型的内部结构 4.1UrlRoutingModule 对象内部结构 4.2RouteBase.Route.RouteCollection.RouteTable 路由核心对象模型 4.3RouteValueDictionary.Rou

【转】.NET/ASP.NET Routing路由(深入解析路由系统架构原理)

阅读目录: 1.开篇介绍 2.ASP.NET Routing 路由对象模型的位置 3.ASP.NET Routing 路由对象模型的入口 4.ASP.NET Routing 路由对象模型的内部结构 4.1]UrlRoutingModule 对象内部结构 4.2]RouteBase.Route.RouteCollection.RouteTable 路由核心对象模型 4.3]RouteValueDictionary.RouteData.RequestContext 路由数据对象模型 4.4]IRou

d-link DI-7300 DI-7200 开启路由模式

最近做个项目需要把两个网段组起来 使用DI-7300 DI-7200 开启路由模式 添加静态路由后发现没法ping通 检查了静态路由没啥问题 打了客服才知道原来需要关闭 网络安全--网络攻击防御--内网病毒防御 还要在上网行为里面 添加一条允许的规则 如果你写的静态路由没问题的话 这时候应该是可以互通了

CentOS 7 之 hostapd 路由模式配置

这篇是 linux 下使用 hostapd 实现无线接入点 AP 模式的另一种实现方式:hostapd 路由模式配置. 对于软硬件的基本配置及 hostapd 安装在<CentOS 7 之 hostapd AP模式配置>的前半部分内容中有说明,可以先看看那篇,再看本文. hostapd 的AP模式配置需要的有线网卡和无线网卡进行桥接,那路由模式配置主要就是将无线网卡的数据通过有线网卡进行伪装.转发两个方面,也就不再需要将有线和无线网卡进行桥接. 配置这种路由模式就类似一台普通的无线路由器,有线

Routing路由

Routing路由 新版Routing功能介绍 在ASP.NET 5和MVC6中,Routing功能被全部重写了,虽然用法有些类似,但和之前的Routing原理完全不太一样了,该Routing框架不仅可以支持MVC和Web API,还支持一般的ASP.NET5程序.新版的改变有如下几个部分. 首先,Routing系统是基于ASP.NET 5的,是一个独立于MVC的路由框架,而不是基于MVC的.MVC只是在上面扩展了一个快捷方式而已. 其次,在ASP.NET 5中,MVC和Web API控制器没有

AngularJS学习---Routing(路由) &amp; Multiple Views(多个视图) step 7

1.切换分支到step7,并启动项目 git checkout step-7 npm start 2.需求: 在步骤7之前,应用只给我们的用户提供了一个简单的界面(一张所有手机的列表),并且所有的模板代码位于index.html文件中.下一步是增加一个能够显示我们列表中每一部手机详细信息的页面.可以先看一下step6和7的代码区别 . 为了增加详细信息视图,我们可以拓展index.html来同时包含两个视图的模板代码,但是这样会很快给我们带来巨大的麻烦.相反,我们要把index.html模板转变

HA主备路由模式的原理

HA是High Availability缩写,即高可用性 ,可防止网络中由于单个防火墙的设备故障或网络故障导致网络中断,保证网络服务的连续性和安全强度.目前,ha功能已经是防火墙内一个重要组成部分.        主备模式(Active-standby):在一个冗余组中,有两台防火墙,一台处于主状态.在这个状态下,防火墙响应ARP请求,并且转发网络流量:另一台处于备份状态,该防火墙不响应ARP请求,也不转发网络流量.主备之间同步状态信息,当主墙down机或网线故障时,进行主备切换.