消息中间件——RabbitMQ(十)RabbitMQ整合SpringBoot实战!(全)


前言

1. SpringBoot整合配置详解

  • publisher-confirms,实现一个监听器用于监听Broker端给我们返回的确认请求:RabbitTemplate.ConfirmCallback
  • publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功:RabbitTemplate.ReturnCallback

注意一点,在发送消息的时候对template进行配置mandatory=true保证监听有效
生产端还可以配置其他属性,比如发送重试,超时时间,次数,间隔等

2. 代码演示

2.1 生产端

2.1.1 新建项目springboot-producer

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.cp</groupId>
    <artifactId>springboot-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>springboot-producer</name>
    <description>springboot-producer</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

RabbitSender.java 消息生产者


@Component
public class RabbitSender {

    //自动注入RabbitTemplate模板类
    @Autowired
    private RabbitTemplate rabbitTemplate;  

    //回调函数: confirm确认
    final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.err.println("correlationData: " + correlationData);
            System.err.println("ack: " + ack);
            if(!ack){
                //可以进行日志记录、异常处理、补偿处理等
                System.err.println("异常处理....");
            }else {
                //更新数据库,可靠性投递机制
            }
        }
    };

    //回调函数: return返回
    final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
                String exchange, String routingKey) {
            System.err.println("return exchange: " + exchange + ", routingKey: "
                + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
        }
    };

    //发送消息方法调用: 构建Message消息
    public void send(Object message, Map<String, Object> properties) throws Exception {
        MessageHeaders mhs = new MessageHeaders(properties);
        Message msg = MessageBuilder.createMessage(message, mhs);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 时间戳 全局唯一  用于ack保证唯一一条消息,这边做测试写死一个。但是在做补偿策略的时候,必须保证这是全局唯一的消息
        CorrelationData correlationData = new CorrelationData("1234567890");
        rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
    }

}

application.properties


spring.rabbitmq.addresses=localhost:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/vhost_cp
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

2.1.2 操作管控台

添加Exchange

添加Queue

Exchange绑定Queue

修改routingKey,springboot改为spring,则进入的是returnCallback方法

这时候我们发现报错了

correlationData: CorrelationData [id=1234567890]
ack: false
异常处理....

2.1.3 解决ack为false问题

这是由于我们在测试方法中进行测试,当测试方法结束,rabbitmq相关的资源也就关闭了,虽然我们的消息发送出去,但异步的ConfirmCallback却由于资源关闭而出现了上面的问题。
加入Thread.sleep()即可解决。


@Test
public void testSender1() throws Exception {
     Map<String, Object> properties = new HashMap<>();
     properties.put("number", "12345");
     properties.put("send_time", simpleDateFormat.format(new Date()));
     rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties);
     Thread.sleep(2000);
}

成功解决~

2.2 消费端

消费端核心配置:

签收模式-手工签收

spring.rabbitmq.listener.simple.acknowledge-mode=manual

设置监听限制:最大10,默认5

spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10

  • 首先配置手工确认模式,用于ACK的手工处理,这样我们可以保证消息的可靠性送达,或者再消费端消费失败的时候可以做到重回队列(不建议)、根据业务记录日志等处理。
  • 可以设置消费端的监听个数和最大个数,用于监控消费端的并发情况

@RabbitListener注解使用

  • 消费端监听@RabbitListener注解,这个对于在实际工作中非常的好用
  • @RabbitListener是一个组合注解,里面可以注解配置
  • @QueueBinding、@Queue、@Exchange直接通过这个组合注解一次性搞定消费端交换机、队列、绑定、路由、并且配置监听功能等。

比如在方法onMessage上加@RabbitListener注解,同时需要加另外一个注解@RabbitHandler,代码被消费者监听。

建立绑定,在Value上写上队列,设置Exchange,是否持久化,设置Exchange的类型、表达式设置为true以及路由key。通过这种简单的方式,就可以完成之前很复杂的代码逻辑。同时建议将配置放入到配置文件中,动态获取。如果mq中没有相应的队列、Exchange等,注解声明也可以创建它们,大家可以自行测试!

2.2.1 新建项目springboot-consumer

pom.xml



<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.cp</groupId>
    <artifactId>springboot-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>springboot-consumer</name>
    <description>springboot-consumer</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

RabbitReceiver.java 消息生产者


@Component
public class RabbitReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue-1",
            durable="true"),
            exchange = @Exchange(value = "exchange-1",
            durable="true",
            type= "topic",
            ignoreDeclarationExceptions = "true"),
            key = "springboot.*"
            )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端Payload: " + message.getPayload());
        Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK,获取deliveryTag
        channel.basicAck(deliveryTag, false);
    }
}

application.properties

spring.rabbitmq.addresses=localhost:5672
spring.rabbitmq.username=user_cp
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/vhost_cp
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10

运行Application,查看之前在生产端发送的消息,是否能被消费。

打印结果

这里之前由于我测试的时候多发了消息,所以消费的时候会有这么多。

3. 优化代码

  • 自定义Java对象消息
  • @RabbitListener注解中的配置改为动态配置

@Payload:指定具体的消息体Body。
@Headers: 获取Headers。

3.1 消费端优化

1、先定义一个Order对象


public class Order implements Serializable {

    private String id;
    private String name;

    public Order() {
    }
    public Order(String id, String name) {
        super();
        this.id = id;
        this.name = name;
    }
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
}

注意:我们在传输对象的时候,必须序列化。否则会传输失败。

2、RabbitReceiver添加监听


/**
     *
     *  spring.rabbitmq.listener.order.queue.name=queue-2
        spring.rabbitmq.listener.order.queue.durable=true
        spring.rabbitmq.listener.order.exchange.name=exchange-2
        spring.rabbitmq.listener.order.exchange.durable=true
        spring.rabbitmq.listener.order.exchange.type=topic
        spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
        spring.rabbitmq.listener.order.key=springboot.*
     * @param order
     * @param channel
     * @param headers
     * @throws Exception
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
            durable="${spring.rabbitmq.listener.order.queue.durable}"),
            exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
            durable="${spring.rabbitmq.listener.order.exchange.durable}",
            type= "${spring.rabbitmq.listener.order.exchange.type}",
            ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
            key = "${spring.rabbitmq.listener.order.key}"
            )
    )
    @RabbitHandler
    public void onOrderMessage(@Payload com.cp.springboot.entity.Order order,
            Channel channel,
            @Headers Map<String, Object> headers) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端order: " + order.getId());
        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }

已经将配置写入到了application.properties中,进行动态获取。也可以像我们公司一样放入到配置中心当中。例如:携程开源配置中心Apollo

3、application.properties


spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*

3.2 生产端优化

1、同样是一个Order对象,必须跟消费端的保持一致。

2、RabbitSender添加发送消息


//发送消息方法调用: 构建自定义对象消息
public void sendOrder(Order order) throws Exception {
    rabbitTemplate.setConfirmCallback(confirmCallback);
    rabbitTemplate.setReturnCallback(returnCallback);
    //id + 时间戳 全局唯一
    CorrelationData correlationData = new CorrelationData("0987654321");
    rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
}

3、添加测试方法


@Test
public void testSender2() throws Exception {
     Order order = new Order("001", "第一个订单");
     rabbitSender.sendOrder(order);
     //防止资源提前关闭,ConfirmCallback异步回调失败
     Thread.sleep(2000);
}

4.测试

运行testSender2()方法。

生产端打印消息

消费端打印消息

至此,RabbitMQ整合SpringBoot完毕,在实际工作中,使用场景也是差不多的。

文末

欢迎关注个人微信公众号:Coder编程
获取最新原创技术文章和免费学习资料,更有大量精品思维导图、面试资料、PMP备考资料等你来领,方便你随时随地学习技术知识!
新建了一个qq群:315211365,欢迎大家进群交流一起学习。谢谢了!也可以介绍给身边有需要的朋友。

文章收录至
Github: https://github.com/CoderMerlin/coder-programming
Gitee: https://gitee.com/573059382/coder-programming
欢迎关注并star~

参考文章:

《RabbitMQ消息中间件精讲》

推荐文章:

消息中间件——RabbitMQ(七)高级特性全在这里!(上)

消息中间件——RabbitMQ(八)高级特性全在这里!(下)

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

原文地址:https://www.cnblogs.com/coder-programming/p/11602910.html

时间: 2024-07-29 04:56:37

消息中间件——RabbitMQ(十)RabbitMQ整合SpringBoot实战!(全)的相关文章

SpringBoot(二十六)整合Redis之共享Session

集群现在越来越常见,当我们项目搭建了集群,就会产生session共享问题.因为session是保存在服务器上面的.那么解决这一问题,大致有三个方案,1.通过nginx的负载均衡其中一种ip绑定来实现(通过ip绑定服务器其中一台,就没有集群概念了):2.通过cookie备份session实现(因为cookie数据保存在客户端,不推荐;3.通过redis备份session实现(推荐); 学习本章节之前,建议依次阅读以下文章,更好的串联全文内容,如已掌握以下列出知识点,请跳过: SpringBoot(

重磅发布-SpringBoot实战实现分布式锁视频教程

概要介绍:历经一个月的时间,我录制的分布式锁实战之SpringBoot实战实现系列完整视频教程终于出世了!在本课程中,我分享介绍了分布式锁出现的背景.实现方式以及将其应用到实际的业务场景中,包括"重复提交"."CRM系统销售人员抢单",并采用当前相当流行的微服务SpringBoot来搭建项目实战实现分布式锁. 课程学习:目前博主已将分布式锁实现以及实际业务场景实战的要点整理成课程,感兴趣的童鞋可以前往学习:http://edu.51cto.com/course/15

ASP.NET MVC5微信公众平台整合开发实战教程

<ASP.NET MVC5&微信公众平台整合开发实战(响应式布局.JQuery Mobile,Windows Azure.微信核心开发)> 课程讲师:57Code 课程分类:ASP.NET MVC 适合人群:中级 课时数量:29课时 用到技术:深入MVC开发模式.C#核心语言特性.C#核心语言特性(二).视图引擎Razor 涉及项目:体育商店.微信公众平台开发 咨询QQ:1337192913(小公子) 1.1.1.背景分析 庞大的微信用户数是微信公众平台重要性的根本 微信用户的真实性使

apollo客户端springboot实战(四)

1. apollo客户端springboot实战(四) 1.1. 前言 ??经过前几张入门学习,基本已经完成了apollo环境的搭建和简单客户端例子,但我们现在流行的通常是springboot的客户端,所以这章还是来学习下springboot客户端如何和apollo整合 ??接下来我来改造我自己的项目,我本来的项目接入的是spring config配置管理中心,读的git上的配置,它没有管理界面,功能也比较单一,所以我打算替换成apollo 1.2. 配置改动 添加配置类,这个@EnableAp

从零开始学习jQuery (十) jQueryUI常用功能实战

原文:从零开始学习jQuery (十) jQueryUI常用功能实战 本系列文章导航 从零开始学习jQuery (一) 开天辟地入门篇 从零开始学习jQuery (二) 万能的选择器 从零开始学习jQuery (三) 管理jQuery包装集 从零开始学习jQuery (四) 使用jQuery操作元素的属性与样式 从零开始学习jQuery (五) 事件与事件对象 从零开始学习jQuery (六) jQuery中的Ajax 从零开始学习jQuery (七) jQuery动画-让页面动起来! 从零开始

dubbo入门学习(三)-----dubbo整合springboot

springboot节省了大量的精力去配置各种bean,因此通过一个简单的demo来整合springboot与dubbo 一.创建boot-user-service-provider 本篇博文基于上篇中的dubbo项目,整体工程如下: 1.pom.xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"

HTML5端云整合:智能端应用与云端服务整合开发实战

课程简介:       作为Web与移动开发的新标准的HTML5/JavaScript/CSS3,已经纷纷被手机应该开发采用,这不但让UI极具弹性,而且也更容易与云计算整合. 本课程是云计算与智能终端时代的HTML5开发的一站式解决方案,专为企业内训和公开课制作,是完整覆盖HTML5时代开发人员所需使用的360度的技术解决方案,内容细致入微: 课程目标: 全面解析以HTML5+JavaScript来开发应用: 使用JavaScript开发云计算: 定制浏览器并具备开放html5浏览器的能力: 学

基于J2EE平台下SSH2+JBPM4.4+ExtJs4.1 框架整合&项目实战

基于J2EE平台下SSH2+JBPM4.4+ExtJs4.1 框架整合&项目实战 推荐给大家一套企业实战项目开发的教程. 课程包含了以下主要的技术面:前端采用Extjs4.x:后台使用目前中小型开发较为常用的SSH2作为框架,囊括了JBPM4.4工作流引擎. 课程经由基础学习向高阶跨进,一步一步介绍了前后台的关系,以及前后台的整合.JBPM的整合等等,都是属于目前较为重要的技术点. 咨询QQ:1609173918 课程大纲: 第1讲:课程概要_Extjs入门: 第2讲:Grid与Store的应用

第二十五章 springboot + hystrixdashboard

注意: hystrix基本使用:第十九章 springboot + hystrix(1) hystrix计数原理:附6 hystrix metrics and monitor 一.hystrixdashboard 作用: 监控各个hystrixcommand的各种值. 通过dashboards的实时监控来动态修改配置,直到满意为止 仪表盘: 二.启动hystrix 1.下载standalone-hystrix-dashboard-1.5.3-all.jar https://github.com/