Spring Boot (十三): Spring Boot 整合 RabbitMQ

1. 前言

RabbitMQ 是一个消息队列,说到消息队列,大家可能多多少少有听过,它主要的功能是用来实现应用服务的异步与解耦,同时也能起到削峰填谷、消息分发的作用。

消息队列在比较主要的一个作用是用来做应用服务的解耦,消息从消息的生产者传递到消息队列,消费者从消息队列中获取消息并进行消费,生产者不需要管是谁在消费消息,消费者也无需关注消息是由谁来生产的。在分布式的系统中,消息队列也会被用在其他地方,比如分布式事务的支持,代表如阿里开源的 RocketMQ 。

当然,我们本篇文章的主角还是 RabbitMQ 。

2. RabbitMQ 介绍

RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。

AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ 是一个开源的 AMQP 实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

3. 概念介绍

在普通的消息队列的设计中,一般会有这么几个概念:生产者、消费者和我们的队列。但是在 RabbitMQ 中,中间增加了一层,叫交换机(Exchange),这样,消息的投递就不由生产者来决定投递至哪个队列,而消息是直接投递至交换机的,由交换机根据调度策略来决定这个消息需要投递到哪个队列。如图:

  • 左侧的 P 代表消息的生产者
  • 紫色的 X 代表交换机
  • 右侧红色的代表队列

4. 交换机(Exchange)

那么为什么我们需要 Exchange 而不是直接将消息发送至队列呢?

AMQP 协议中的核心思想就是生产者和消费者的解耦,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由 Exchange 来接收,然后 Exchange 按照特定的策略转发到 Queue 进行存储。

Exchange 收到消息时,他是如何知道需要发送至哪些 Queue 呢?这里就需要了解 Binding 和 RoutingKey 的概念:

Binding 表示 Exchange 与 Queue 之间的关系,我们也可以简单的认为队列对该交换机上的消息感兴趣,绑定可以附带一个额外的参数 RoutingKey。Exchange 就是根据这个 RoutingKey 和当前 Exchange 所有绑定的 Binding 做匹配,如果满足匹配,就往 Exchange 所绑定的 Queue 发送消息,这样就解决了我们向 RabbitMQ 发送一次消息,可以分发到不同的 Queue。RoutingKey 的意义依赖于交换机的类型。

下面就来了解一下 Exchange 的三种主要类型:Fanout、Direct 和 Topic。

4.1 Direct Exchange

Direct Exchange 是 RabbitMQ 默认的 Exchange,完全根据 RoutingKey 来路由消息。设置 Exchange 和 Queue 的 Binding 时需指定 RoutingKey(一般为 Queue Name),发消息时也指定一样的 RoutingKey,消息就会被路由到对应的Queue。

4.2 Topic Exchange

Topic Exchange 和 Direct Exchange 类似,也需要通过 RoutingKey 来路由消息,区别在于Direct Exchange 对 RoutingKey 是精确匹配,而 Topic Exchange 支持模糊匹配。分别支持 *# 通配符,* 表示匹配一个单词, # 则表示匹配没有或者多个单词。

4.3 Headers Exchange

Headers Exchange 会忽略 RoutingKey 而根据消息中的 Headers 和创建绑定关系时指定的 Arguments 来匹配决定路由到哪些 Queue。

Headers Exchange 的性能比较差,而且 Direct Exchange 完全可以代替它,所以不建议使用。

4.4 Default Exchange

Default Exchange 是一种特殊的 Direct Exchange。当你手动创建一个队列时,后台会自动将这个队列绑定到一个名称为空的 Direct Exchange 上,绑定 RoutingKey 与队列名称相同。有了这个默认的交换机和绑定,使我们只关心队列这一层即可,这个比较适合做一些简单的应用。

5. Spring Boot 整合 RabbitMQ

Spring Boot 整合 RabbitMQ 非常简单,如果只是简单的使用配置非常少,Spring Boot 提供了 spring-boot-starter-amqp 项目对消息各种支持。

5.1 简单使用

引入依赖

代码清单:spring-boot-rabbitmq/pom.xml
***

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件 application.yml 如下:

代码清单:spring-boot-rabbitmq/src/main/resources/application.yml
***

server:
  port: 8080
spring:
  application:
    name: spring-boot-rabbitmq
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin

队列配置

代码清单:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/config/QueueConfig.java
***

@Configuration
public class QueueConfig {
    @Bean
    public Queue simpleQueue() {
        return new Queue("simple");
    }

    @Bean
    public Queue simpleOneToMany() {
        return new Queue("simpleOneToMany");
    }
}

消息提供者

代码清单:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/simple/SimpleSend.java
***

@Component
public class SimpleSend {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send() {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String message = "Hello Spring Boot " + simpleDateFormat.format(new Date());
        amqpTemplate.convertAndSend("simple", message);
        logger.info("消息推送成功!");
    }
}

消息消费者

代码清单:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/simple/SimpleReceive.java
***

@Component
@RabbitListener(queues = "simple")
public class SimpleReceive {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @RabbitHandler
    public void process(String message) {
        logger.info("Receive :{}", message);
    }

}

测试

代码清单:spring-boot-rabbitmq/src/test/java/com/springboot/springbootrabbitmq/DemoApplicationTests.java
***

@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoApplicationTests {

    @Autowired
    SimpleSend simpleSend;

    @Test
    public void simpleSend() {
        simpleSend.send();
    }

}

5.2 一对多使用

如果有一个消息的生产者,有 N 个消息的消费者,会发生什么呢?

对上面的代码稍作改动,增加一个消息的消费者。

测试代码如下:

@Test
public void simpleOneSend() {
    for (int i = 0; i < 100; i ++) {
        simpleManySend.send(i);
    }
}

测试可以看到结果是两个消费者平均的消费了生产者生产的消息。

5.3 多对多使用

我们再增加一个消息的生产者,测试代码如下:

@Test
public void simpleManySend() {
    for (int i = 0; i < 100; i ++) {
        simpleManySend.send(i);
        simpleManySend1.send(i);
    }
}

测试可以看到结果是两个消费者平均的消费了两个生产者生产的消息。

5.4 Topic Exchange

首先还是先配置 Topic ,配置代码如下:

代码清单:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/config/TopicConfig.java
***

@Configuration
public class TopicConfig {

    private final String message = "topic.message";
    private final String messages = "topic.messages";

    @Bean
    public Queue queueMessage() {
        return new Queue(this.message);
    }

    @Bean
    public Queue queueMessages() {
        return new Queue(this.messages);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }

    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }
}

这里队列 queueMessages 可以同时匹配两个 route_key ,而队列 queueMessage 只能匹配 topic.message 。

消息的生产者代码如下:

代码清单:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/topic/TopicSend.java
***

@Component
public class TopicSend {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send1() {
        String message = "message 1";
        logger.info("send:{}", message);
        rabbitTemplate.convertAndSend("topicExchange", "topic.message", message);
    }

    public void send2() {
        String message = "message 2";
        logger.info("send:{}", message);
        rabbitTemplate.convertAndSend("topicExchange", "topic.messages", message);
    }
}

调用 send1() 消息会由 Exchange 同时转发到两个队列, 而调用 send2() 则只会转发至 receive2 。

5.5 Fanout Exchange

Fanout 就是我们熟悉的广播模式或者订阅模式,给 Fanout 交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。

Fanout 配置如下:

代码清单:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/config/FanoutConfig.java
***

@Configuration
public class FanoutConfig {
    @Bean
    public Queue MessageA() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue MessageB() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue MessageC() {
        return new Queue("fanout.C");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingExchangeA(Queue MessageA, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(MessageA).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue MessageB, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(MessageB).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(Queue MessageC, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(MessageC).to(fanoutExchange);
    }
}

消息生产者代码如下:

代码清单:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/fanout/FanoutSend.java
***

@Component
public class FanoutSend {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String message = "Hello FanoutSend.";
        logger.info("send:{}", message);
        this.rabbitTemplate.convertAndSend("fanoutExchange","", message);
    }
}

测试代码如下:

代码清单:spring-boot-rabbitmq/src/test/java/com/springboot/springbootrabbitmq/DemoApplicationTests.java
***

@Test
public void fanoutSend() {
    fanoutSend.send();
}

测试结果为绑定到 fanout 交换机上面的队列都收到了消息。

6. 示例代码

示例代码-Github

示例代码-Gitee

7. 参考

http://www.ityouknow.com/springboot/2016/11/30/spring-boot-rabbitMQ.html

https://blog.csdn.net/y4x5M0nivSrJaY3X92c/article/details/80416996

原文地址:https://www.cnblogs.com/babycomeon/p/11675313.html

时间: 2024-10-01 14:06:05

Spring Boot (十三): Spring Boot 整合 RabbitMQ的相关文章

Spring学习十三----------Spring AOP的基本概念

? 版权声明:本文为博主原创文章,转载请注明出处 什么是AOP -面向切面编程,通过预编译方式和运行期动态代理实现程序功能的统一维护的一种技术 -主要的功能是:日志记录.性能统计.安全控制.事务处理.异常处理等 AOP实现方式 -预编译 -AspectJ -运行期动态代理(JDK动态代理.CGLib动态代理) -Spring AOP.Jboss AOP AOP相关概念 -切面(Aspect)   一个关注点的模块化,这个关注点可能会横切多个对象 -连接点(Joinpoint)   程序执行过程中

spring boot 1.5.4 整合 druid(十三)

上一篇:spring boot 1.5.4 整合 mybatis(十二) 1      集成druid连接池 spring boot集成druid项目mybatis-spring-boot源码地址: https://git.oschina.net/wyait/springboot1.5.4.git 1.1  druid简介 Druid是阿里巴巴开源的一个项目.,整个项目由数据库连接池.插件框架和SQL解析器组成.该项目主要是为了扩展JDBC的一些限制,可以让程序员实现一些特殊的需求,比如向密钥服

Spring Boot (5) 整合 RabbitMQ

一.前言 RabbitMQ是实现了AMQP(高级消息队列协议)的开源消息中间件,RabbitMQ服务器是用Erlang(面向并发的编程语言)编写的. RabbitMQ官网下载地址:https://www.rabbitmq.com/download.html Docker部署则执行如下命令即可 # RABBITMQ_DEFAULT_USER:用户名,RABBITMQ_DEFAULT_PASS:密码 这里修改为自己的即可 docker run -d --name rabbitmq -p 5672:5

spring boot 1.5.4 整合 mybatis(十二)

上一篇:spring boot 1.5.4 整合log4j2(十一) Spring Boot集成Mybatis 更多更详细的配置参考文件:application.properties和<SpringBoot之application配置详解>(新版本新增属性缺失)  或参考官网http://projects.spring.io/spring-boot/ Spring Boot集成Mybatis有两种方式: 方式一:传统的引入外部资源配置的方式,方便对mybatis的控制: 方式二:mybatis

spring boot 整合spring Data JPA+Spring Security+Thymeleaf框架(上)

最近上班太忙所以耽搁了给大家分享实战springboot 框架的使用. 下面是spring boot 整合多个框架的使用. 首先是准备工作要做好. 第一  导入框架所需的包,我们用的事maven 进行对包的管理. 以上的举例是本人的H5DS的真实的后台管理项目,这个项目正在盛情融资中,各位多多捧点人场.关注一下软件发展的动态,说不定以后就是您的生活不可或缺的软件哟. 点击打开链接.闲话少说.现在切入正题. 第二,写点配置文件 第三,spring data -设计一个简单的po关系,这里需要下载一

Spring Boot学习记录(三)--整合Mybatis

Spring Boot学习记录(三)–整合Mybatis 标签(空格分隔): spring-boot 控制器,视图解析器前面两篇都已弄好,这一篇学习持久层框架整合. 1.数据源配置 数据源使用druid,maven引入相关依赖,包括spring-jdbc依赖,mysql依赖 1.转换问题 配置的过程要学会为什么这样配置,而不是只学会了配置.这里我们可以和以前的配置方式对比: 以前版本 <!--配置数据库连接池Druid--> <bean id="dataSource"

spring boot 1.5.4 整合log4j2(十一)

上一篇:spring boot 1.5.4 定时任务和异步调用(十) Spring Boot整合log4j2 spring boot整合log4j2项目spring-boot-jsp源码: https://git.oschina.net/wyait/springboot1.5.4.git 1.1  log4j2概要 对于我们开发人员来说,日志记录往往不被重视.在生产环境中,日志是查找问题来源的重要依据.日志可记录程序运行时产生的错误信息.状态信息.调试信息和执行时间信息等多种多样的信息.可以在程

spring boot与jdbcTemplate的整合案例2

简单入门了spring boot后,接下来写写跟数据库打交道的案例.博文采用spring的jdbcTemplate工具类与数据库打交道. 下面是搭建的springbootJDBC的项目的总体架构图: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www

Spring Kafka和Spring Boot整合实现消息发送与消费简单案例

本文主要分享下Spring Boot和Spring Kafka如何配置整合,实现发送和接收来自Spring Kafka的消息. 先前我已经分享了Kafka的基本介绍与集群环境搭建方法.关于Kafka的介绍请阅读Apache Kafka简介与安装(一),关于Kafka安装请阅读Apache Kafka安装,关于Kafka集群环境搭建请阅读Apache Kafka集群环境搭建 .这里关于服务器环境搭建不在赘述. Spring Kafka整合Spring Boot创建生产者客户端案例 创建一个kafk