消息中间件——RabbitMQ(六)理解Exchange交换机核心概念!

前言

来了解RabbitMQ一个重要的概念:Exchange交换机

1. Exchange概念

  • Exchange:接收消息,并根据路由键转发消息所绑定的队列。

蓝色框:客户端发送消息至交换机,通过路由键路由至指定的队列。
黄色框:交换机和队列通过路由键有一个绑定的关系。
绿色框:消费端通过监听队列来接收消息。

2. 交换机属性

Name:交换机名称
Type:交换机类型——direct、topic、fanout、headers、sharding(此篇不讲)
Durability:是否需要持久化,true为持久化
Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange
Internal:当前Exchange是否用于RabbitMQ内部使用,默认为false
Arguments:扩展参数,用于扩展AMQP协议自定制化使用

3. Direct Exchange(直连)

  • 所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue

注意:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃。

重点:routing key与队列queues 的key保持一致,即可以路由到对应的queue中。

3.1 代码演示

生产端:


/**
 *
* @ClassName: Producer4DirectExchange
* @Description: 生产者
* @author Coder编程
* @date2019年7月19日 下午22:15:52
*
 */
public class Producer4DirectExchange {

    public static void main(String[] args) throws Exception {

        //1创建ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();
        //2创建Channel
        Channel channel = connection.createChannel();
        //3 声明
        String exchangeName = "test_direct_exchange";
        String routingKey = "test.direct";
        //4 发送
        String msg = "Coder编程 Hello World RabbitMQ 4  Direct Exchange Message ... ";
        channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
    }
}

消费端:


/**
 *
* @ClassName: Consumer4DirectExchange
* @Description: 消费者
* @author Coder编程
* @date2019年7月19日 下午22:18:52
*
 */
public class Consumer4DirectExchange {

    public static void main(String[] args) throws Exception {

        //创建ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明
        String exchangeName = "test_direct_exchange";
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routingKey = "test.direct";

        //表示声明了一个交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        //表示声明了一个队列
        channel.queueDeclare(queueName, false, false, false, null);
        //建立一个绑定关系:
        channel.queueBind(queueName, exchangeName, routingKey);

        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //循环获取消息
        while(true){
            //获取消息,如果没有消息,这一步将会一直阻塞
            Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }
    }
}

测试结果:

注意需要routingKey保持一致。可以自己尝试修改routingkey,是否能收到消息。

4. Topic Exchange

  • 所有发送到Topic Exchange的消息被转发到所有管线RouteKey中指定Topic的Queue上
  • Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic

    注意:可以使用通配符进行模糊匹配
    符号 "#" 匹配一个或多个词
    符号 "" 匹配不多不少一个词
    例如:"log.#" 能够匹配到 "log.info.oa"
    "log.
    " 只会匹配到 "log.error"

在一堆消息中,每个不同的队列只关心自己需要的消息。

4.1 代码演示

生产端:


/**
 *
* @ClassName: Producer4TopicExchange
* @Description: 生产者
* @author Coder编程
* @date2019年7月19日 下午22:32:41
*
 */
public class Producer4TopicExchange {

    public static void main(String[] args) throws Exception {

        //1创建ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();
        //2创建Channel
        Channel channel = connection.createChannel();
        //3声明
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.abc";
        //4发送
        String msg = "Coder编程 Hello World RabbitMQ 4 Topic Exchange Message ...";
        channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
        channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
        channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
        channel.close();
        connection.close();
    }
}

消费端:


/**
 *
* @ClassName: Consumer4TopicExchange
* @Description: 消费者
* @author Coder编程
* @date2019年7月19日 下午22:37:12
*
 */
public class Consumer4TopicExchange {

    public static void main(String[] args) throws Exception {

        //创建ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();

        Channel channel = connection.createChannel();
        // 声明
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        //String routingKey = "user.*";
        String routingKey = "user.*";
        // 1 声明交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        // 2 声明队列
        channel.queueDeclare(queueName, false, false, false, null);
        // 3 建立交换机和队列的绑定关系:
        channel.queueBind(queueName, exchangeName, routingKey);

        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //循环获取消息
        while(true){
            //获取消息,如果没有消息,这一步将会一直阻塞
            Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }
    }
}

测试结果:

注意一个问题:需要进行解绑

5. Fanout Exchange

  • 不处理路由键,只需要简单的将队里绑定到交换机上
  • 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
  • Fanout交换机转发消息是最快的

5.1 代码演示

生产端:



/**
 *
* @ClassName: Producer4FanoutExchange
* @Description: 生产者
* @author Coder编程
* @date2019年7月19日 下午23:01:16
*
 */
public class Producer4FanoutExchange {

    public static void main(String[] args) throws Exception {

        //1创建ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();
        //2 创建Channel
        Channel channel = connection.createChannel();
        //3 声明
        String exchangeName = "test_fanout_exchange";
        //4 发送
        for(int i = 0; i < 10; i ++) {
            String msg = "Coder 编程  Hello World RabbitMQ 4 FANOUT Exchange Message ...";
            channel.basicPublish(exchangeName, "", null , msg.getBytes());
        }
        channel.close();
        connection.close();
    }

}

消费端:


/**
 *
* @ClassName: Consumer4FanoutExchange
* @Description: 消费者
* @author Coder编程
* @date2019年7月19日 下午23:21:18
*
 */
public class Consumer4FanoutExchange {

    public static void main(String[] args) throws Exception {

        //创建ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();

        Channel channel = connection.createChannel();
        // 声明
        String exchangeName = "test_fanout_exchange";
        String exchangeType = "fanout";
        String queueName = "test_fanout_queue";
        String routingKey = ""; //不设置路由键
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //循环获取消息
        while(true){
            //获取消息,如果没有消息,这一步将会一直阻塞
            Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }
    }
}

测试结果:


6. 其他

6.1 Bingding —— 绑定

  • Exchange和Exchange、Queue之间的连接关系
  • Bingding可以包含RoutingKey或者参数

6.2 Queue——消息队列

  • 消息队列,实际存储消息数据
  • Durability:是否持久化,Durable:是 ,Transient:否
  • Auto delete:如选yes,代表当最后一个监听被移除之后,该Queue会自动被删除。

6.3 Message——消息

  • 服务器与应用程序之间传送的数据
  • 本质上就是一段数据,由Properties和Payload(Body)组成
  • 常用属性:delivery mode、headers(自定义属性)

6.4 其他属性

content_type、content_encoding、priority

correlation_id、reply_to、expiration、message_id

timestamp、type、user_id、app_id、cluster_id

6.5 Virtual Host虚拟主机

  • 虚拟地址,用于进行逻辑隔离,最上层的消息路由
  • 一个Virtual Host里面可以有若干个Exchange和Queue
  • 同一个Virtual Host里面不能有相同名称的Exchange或Queue

7. 总结

RabbitMQ的概念、安装与使用、管控台操作、结合RabbitMQ的特性、Exchange、Queue、Binding
、RoutingKey、Message进行核销API的讲解,通过本章的学习,希望大家对RabbitMQ有一个初步的认识。

文末

欢迎关注个人微信公众号:Coder编程
获取最新原创技术文章和免费学习资料,更有大量精品思维导图、面试资料、PMP备考资料等你来领,方便你随时随地学习技术知识!
新建了一个qq群:315211365,欢迎大家进群交流一起学习。谢谢了!也可以介绍给身边有需要的朋友。

文章收录至
Github: https://github.com/CoderMerlin/coder-programming
Gitee: https://gitee.com/573059382/coder-programming
欢迎关注并star~

参考文章:

《RabbitMQ消息中间件精讲》

推荐文章:

消息中间件——RabbitMQ(三)理解RabbitMQ核心概念和AMQP协议!

消息中间件——RabbitMQ(四)命令行与管控台的基本操作!

消息中间件——RabbitMQ(五)快速入门生产者与消费者,SpringBoot整合RabbitMQ!

原文地址:https://www.cnblogs.com/coder-programming/p/11396179.html

时间: 2024-08-23 21:18:44

消息中间件——RabbitMQ(六)理解Exchange交换机核心概念!的相关文章

理解maven的核心概念

原文链接:http://www.cnblogs.com/holbrook/archive/2012/12/24/2830519.html 好久没进行java方面的开发了,最近又完成了一个java相关的任务,顺便重新体会了 maven 这一利器. 在使用过程中发现以前对maven的理解不够深入,借此机会重新梳理了一下maven的核心概念.相信理解了这些核心概念, 即使长时间不使用,以后再重新上手也会非常容易. 本文以类图的方式,介绍maven核心的12个概念以及相互之间的关系. Table of

RabbitMQ 核心概念及与 Spring Boot 2 的整合

RabbitMQ 简介 RabbitMQ 是什么 RabbitMQ 是一个用 Erlang 编写的开源的消息队列中间件,它实现了 AMQP 协议(其实还实现了 MTQQ 等消息协议).和其他两个主流的消息队列中间件 Kafka 和 RocketMQ 相比,拥有更低的延迟.更高的稳定性.更完备的功能.更完善的文档支持以及较活跃的开源社区支持,但是在吞吐量上和分布式扩展能力上逊色一些. AMQP 是什么 AMQP(Advanced Message Queuing Protocol),高级消息队列协议

Effective Objective-C 2.0 — 第二章 对象、消息、运行期 - 第六条:理解“属性”这一概念

开发者通过对象来 存储并传递数据. 在对象之间传递数据并执行任务的过程就叫做“消息传递”. 这两条特性的工作原理? Objective-C运行期环境(Objective-C runtime) ,提供了使得对象之间能够传递消息的重要函数,并且包含创建类实例所用的全部逻辑. 第六条:理解“属性”这一概念 property:

领域驱动设计(DDD)部分核心概念的个人理解(转)

领域驱动设计(DDD)是一种基于模型驱动的软件设计方式.它以领域为核心,分析领域中的问题,通过建立一个领域模型来有效的解决领域中的核心的复杂问题.Eric Ivans为领域驱动设计提出了大量的最佳实践和经验技巧.只有对领域的不断深入认识,才能得到一个解决领域核心问题的领域模型.如果一个应用的复杂性不是在技术方面的,而是在领域本身,即领域内的业务很复杂,那这种应用,使用领域驱动设计的价值就越大. 领域驱动开发也是一种敏捷开发过程(极限编程,XP),强调迭代开发.在迭代过程中,强调开发人员与领域专家

(三)理解Maven核心概念

转载自: http://www.cnblogs.com/holbrook/archive/2012/12/24/2830519.html 本文以类图的方式,介绍maven核心的12个概念以及相互之间的关系. Table of Contents 1 maven管理的目标:工程(Project) 1.1 工程依赖关系 1.2 工程聚合关系 2 maven的核心:生命周期和阶段 3 功能实现:插件和Goal 4 仓库(Repository) 5 小结 1 maven管理的目标:工程(Project)

十分钟带你理解Kubernetes核心概念

本文将会简单介绍Kubernetes的核心概念.因为这些定义可以在Kubernetes的文档中找到,所以文章也会避免用大段的枯燥的文字介绍.相反,我们会使用一些图表(其中一些是动画)和示例来解释这些概念.我们发现一些概念(比如Service)如果没有图表的辅助就很难全面地理解.在合适的地方我们也会提供Kubernetes文档的链接以便读者深入学习.这就开始吧. 什么是Kubernetes? Kubernetes(k8s)是自动化容器操作的开源平台,这些操作包括部署,调度和节点集群间扩展.如果你曾

Elasticsearch学习笔记(六)核心概念和分片shard机制

一.核心概念 1.近实时(Near Realtime NRT) (1)从写入数据到数据可以被搜索到有一个小延迟(大概1秒): (2)基于es执行搜索和分析可以达到秒级 2.集群(Cluster) 一个集群下有多个节点.集群名称,默认是elasticsearch 3.节点(Node) 集群中的一个节点,节点也有一个名称(默认是随机分配的),节点名称很重要(在执行运维管理操作的时        候),默认节点会去加入一个名称为"elasticsearch"的集群,如果直接启动一堆节点,那么

十次方项目第五天(消息中间件RabbitMQ)

学习目标: 能够说出消息队列的应用场景以及RabbitMQ的主要概念 完成RabbitMQ安装以及RabbitMQ三种模式的入门案例 完成用户注册,能够将消息发送给RabbitMQ 完成短信微服务,能够接收消息并调用阿里云通信完成短信发送 1 RabbitMQ简介 1.1消息队列中间件简介 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量 削锋等问题实现高性能,高可用,可伸缩和最终一致性[架构] 使用较多的消息队列有 ActiveMQ,RabbitMQ,ZeroMQ,Ka

PHP操作RabbitMQ的类 exchange、queue、route kye、bind

RabbitMQ是常见的消息中间件.也许是还是不够了解的缘故,感觉功能还好吧. 讲到队列,大家脑子里第一印象是下边这样的. P生产者推送消息-->队列-->C消费者取出消息 结构很简单,但是RabbitMQ应该是为了丰富的功能吧,把“队列”拆分了. 分成了:exchange(交换机)和queue(队列)两个部分 同时说明: 生产者推送消息只推到exchange,不知道会进入哪个queue. exchange通过一个route key与queue绑定,这时才会知道消息具体落到了哪个queue里.