springboot项目整合rabbitMq涉及消息的发送确认,消息的消费确认机制

1.引入maven依赖
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>2.在application.yml的配置:
spring:  rabbitmq:    host: 106.52.82.241    port: 5672    username: yang    password: Yangxiaohui227    virtual-host: /    publisher-confirms: true #消息发送后,如果发送成功到队列,则会回调成功信息    publisher-returns: true  #消息发送后,如果发送失败,则会返回失败信息信息    listener:  #加了2下面2个属性,消费消息的时候,就必须发送ack确认,不然消息永远还在队列中      direct:        acknowledge-mode: manual      simple:        acknowledge-mode: manual
//为了统一管理所有的Mq消息,建一个类存储常量,消息的设计都基本会涉及(队列(queue),交换机(exchange),路由键(route)三个值)
public class RabbitMqConstant {

    //下单发送消息 队列名,交换机名,路由键的配置
    public final static String SHOP_ORDER_CREATE_EXCHANGE="shop.order.create.exchange";
    public final static String SHOP_ORDER_CREATE_ROUTE="shop.order.create.route";
    public final static String SHOP_ORDER_CREATE_QUEUE="shop.order.create.queue";
}


package com.example.demo.mq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//该类是mq最重要的一个类,所有队列的创建,交换机的创建,队列和交换机的绑定都在这里实现
@Configuration
public class RabbitMqConfig {
    private final static Logger log = LoggerFactory.getLogger(RabbitMqConfig.class);
    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

    /**
     * 单一消费者
     *
     * @return
     */

    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        factory.setTxSize(1);
        return factory;
    }

    /**
     * 多个消费者
     *
     * @return
     */
    @Bean(name = "multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factoryConfigurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(20);
        factory.setMaxConcurrentConsumers(20);
        factory.setPrefetchCount(20);
        return factory;
    }

    /**
     * 模板的初始化配置
     *
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean sucess, String cause) {
                if (sucess) {
                    log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, sucess, cause);
                }

            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.warn("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
            }
        });
        return rabbitTemplate;
    }

    //消息的创建设计三个步骤:队列的创建,交换机创建(direct类型,topic类型,fanout类型),队列和交换机的通过路由键的绑定

    //--------- 下单消息配置
    //队列
    @Bean
    public Queue shopOrderCreateQueue() {
        return new Queue(RabbitMqConstant.SHOP_ORDER_CREATE_QUEUE, true);
    }

    //Direct交换机(一对一关系,一个direct交换机只能绑定一个队列,当有2个相同消费者时,如项目部署2台机,只有一个消费者能消费,)
    @Bean
    DirectExchange shopOrderCreateExchange() {
        return new DirectExchange(RabbitMqConstant.SHOP_ORDER_CREATE_EXCHANGE);
    }

    //绑定
    @Bean
    Binding bindShopOrderCreateQueue() {
        return BindingBuilder.bind(shopOrderCreateQueue()).to(shopOrderCreateExchange()).with(RabbitMqConstant.SHOP_ORDER_CREATE_ROUTE);
    }
}

 
import com.alibaba.fastjson.JSON;
import com.example.demo.domain.ShopOrderMast;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
//专门用一个类作为消息的生产者
@Service
public class ShopMessagePublisher {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendCreateOrderMessage(ShopOrderMast orderMast){
        CorrelationData correlationData=new CorrelationData(); //该参数可以传,可以不传,不传时,correlationData的id值默认是null,消息发送成功后,在RabbitMqConfig类的rabbitTemplate类的confirm方法会接收到该值
        correlationData.setId(orderMast.getCodOrderId());
        String msg = JSON.toJSONString(orderMast);
        //convertAndSend该方法有非常多的重构方法,找到适合自己的业务方法就行了,这里我用的是其中一个,发送时指定exchange和route值,这样就会发到对应的队列去了
        rabbitTemplate.convertAndSend(RabbitMqConstant.SHOP_ORDER_CREATE_EXCHANGE,RabbitMqConstant.SHOP_ORDER_CREATE_ROUTE,msg,correlationData);

    }
}



//所有的消费都写在一个消费类中
@Service
public class ShopMessageComsumer {
    //监听下单消息
    @RabbitListener(queues =RabbitMqConstant.SHOP_ORDER_CREATE_QUEUE)
    public void createOrderMesaageComsumer(String msg, Channel channel, Message message) {
        try {
                //消息可以通过msg获取也可以通过message的body属性获取
                System.out.println("开始消费了");
                ShopOrderMast shopOrderMast = JSON.parseObject(msg, ShopOrderMast.class);

            /**
             * 因为我在application.yml那里配置了消息手工确认也就是传说中的ack,所以消息消费后必须发送确认给mq
             * 很多人不理解ack(消息消费确认),以为这个确认是告诉消息发送者的,这个是错的,这个ack是告诉mq服务器,
             * 消息已经被我消费了,你可以删除它了
             * 如果没有发送basicAck的后果是:每次重启服务,你都会接收到该消息
             * 如果你不想用确认机制,就去掉application.yml的acknowledge-mode: manual配置,该配置默认
             * 是自动确认auto,去掉后,下面的channel.basicAck就不用写了
             *
             */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

            } catch (Exception e) {
            try {
                //出现异常,告诉mq抛弃该消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
                e.printStackTrace();
            } catch (IOException e1) {
                e1.printStackTrace();
            }

        }

    }
}//这里我发送了一条消息,orderId我设置为555556666666,在消息发送时,存到了CorrelationData对象中,因此,发送成功后,在confirm方法可以接收到该值了//消息发送成功后,在控制台会看到有成功的回调信息,也就是回调了rabbitTemplate的:
confirm(CorrelationData correlationData, boolean sucess, String cause)

//上面测试的下单消息是direct类型消息的,现在创建一个topic消息

//RabbitMqConstant新增topic的配置信息
//下单topic消息:路由键的名字 星号* 代表多个字符,#号代表一个字符
    //topic交换机,发送消息时,发送到指定shop.order.create.topic.exchange和shop.order.create.topic.route中
    public final static String SHOP_ORDER_CREATE_TOPIC_EXCHANGE="shop.order.create.topic.exchange";
    public final static String SHOP_ORDER_CREATE_TOPIC_TOUTE="shop.order.create.topic.route";

    //队列1,通过shop.order.create.topic.*与交换机绑定
    public final static String SHOP_ORDER_CREATE_TOPIC_ROUTE_ONE="shop.order.create.topic.*";
    public final static String SHOP_ORDER_CREATE_TOPIC_QUEUE_ONE="shop.order.create.topic.queue.one";

    //队列2 通过shop.order.create.topic.*与交换机绑定shop.order.create.topic.#
    public final static String SHOP_ORDER_CREATE_TOPIC_ROUTE_TWO="shop.order.create.topic.#";
    public final static String SHOP_ORDER_CREATE_TOPIC_QUEUE_TWO="shop.order.create.topic.queue.two";
//在RabbitMqConfig新增topic队列的基本信息
 //--------- 下单消息配置
    //队列
    @Bean
    public Queue shopOrderCreateQueue() {
        return new Queue(RabbitMqConstant.SHOP_ORDER_CREATE_QUEUE, true);
    }

    //Direct交换机(一对一关系,一个direct交换机只能绑定一个队列,当有2个相同消费者时,如项目部署2台机,只有一个消费者能消费,)
    @Bean
    DirectExchange shopOrderCreateExchange() {
        return new DirectExchange(RabbitMqConstant.SHOP_ORDER_CREATE_EXCHANGE);
    }

    //绑定
    @Bean
    Binding bindShopOrderCreateQueue() {
        return BindingBuilder.bind(shopOrderCreateQueue()).to(shopOrderCreateExchange()).with(RabbitMqConstant.SHOP_ORDER_CREATE_ROUTE);
    }

    //-------------------------下单TOPIC消息的创建

    //创建TOPIC交换机
    @Bean
    TopicExchange shopOrderCreateTopicExchange() {
        return new TopicExchange(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_EXCHANGE);
    }
    //---------------------------//队列1使用自己的route和交换机绑定
    //创建队列1
    @Bean
    public Queue shopOrderCreateQueueOne() {
        return new Queue(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_QUEUE_ONE, true);
    }
    //绑定
    @Bean
    Binding bindShopOrderCreateQueueOne() {
        return BindingBuilder.bind(shopOrderCreateQueueOne()).to(shopOrderCreateTopicExchange()).with(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_ROUTE_ONE);
    }

    //---------------------------//队列2用自己的route和交换机绑定

    //创建队列2
    @Bean
    public Queue shopOrderCreateQueueTWO() {
        return new Queue(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_QUEUE_TWO, true);
    }

    //绑定
    @Bean
    Binding bindShopOrderCreateQueueTWO() {
        return BindingBuilder.bind(shopOrderCreateQueueTWO()).to(shopOrderCreateTopicExchange()).with(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_ROUTE_TWO);
    }
//消息的发送方新增
  //发送TOPIC消息
    public void sendCreateOrderTOPICMessage(ShopOrderMast orderMast){
        CorrelationData correlationData=new CorrelationData(); //该参数可以传,可以不传,不传时,correlationData的id值默认是null,消息发送成功后,在RabbitMqConfig类的rabbitTemplate类的confirm方法会接收到该值
        correlationData.setId(orderMast.getCodOrderId());
        String msg = JSON.toJSONString(orderMast);
        //消息发送使用公共route而不是某个队列自己的route
        rabbitTemplate.convertAndSend(RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_EXCHANGE,RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_TOUTE,msg,correlationData);

    }
//消息的消费方新增
//消费者1
    @RabbitListener(queues =RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_QUEUE_ONE)
    public void createOrderMesaageComsumerOne(String msg, Channel channel, Message message) {
        try {
                //消息可以通过msg获取也可以通过message对象的body值获取
                System.out.println("我是消费者1");
                ShopOrderMast shopOrderMast = JSON.parseObject(msg, ShopOrderMast.class);

            /**
             * 因为我在application.yml那里配置了消息手工确认也就是传说中的ack,所以消息消费后必须发送确认给mq
             * 很多人不理解ack(消息消费确认),以为这个确认是告诉消息发送者的,这个是错的,这个ack是告诉mq服务器,
             * 消息已经被我消费了,你可以删除它了
             * 如果没有发送basicAck的后果是:每次重启服务,你都会接收到该消息
             * 如果你不想用确认机制,就去掉application.yml的acknowledge-mode: manual配置,该配置默认
             * 是自动确认auto,去掉后,下面的channel.basicAck就不用写了
             *
             */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

            } catch (Exception e) {
            try {
                //出现异常,告诉mq抛弃该消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
                e.printStackTrace();
            } catch (IOException e1) {
                e1.printStackTrace();
            }

        }

    }
    //消费者2
    @RabbitListener(queues =RabbitMqConstant.SHOP_ORDER_CREATE_TOPIC_QUEUE_TWO)
    public void createOrderMesaageComsumerTWO(String msg, Channel channel, Message message) {
        try {
                //消息可以通过msg获取也可以通过message对象的body值获取
                System.out.println("我是消费者2");
                ShopOrderMast shopOrderMast = JSON.parseObject(msg, ShopOrderMast.class);

            /**
             * 因为我在application.yml那里配置了消息手工确认也就是传说中的ack,所以消息消费后必须发送确认给mq
             * 很多人不理解ack(消息消费确认),以为这个确认是告诉消息发送者的,这个是错的,这个ack是告诉mq服务器,
             * 消息已经被我消费了,你可以删除它了
             * 如果没有发送basicAck的后果是:每次重启服务,你都会接收到该消息
             * 如果你不想用确认机制,就去掉application.yml的acknowledge-mode: manual配置,该配置默认
             * 是自动确认auto,去掉后,下面的channel.basicAck就不用写了
             *
             */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

            } catch (Exception e) {
            try {
                //出现异常,告诉mq抛弃该消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
                e.printStackTrace();
            } catch (IOException e1) {
                e1.printStackTrace();
            }

        }

    }

//测试结果:

//如何在rabbitMq管理页面查看没有还没被消费的消息信息:

通过界面发送Mq消息,场景,如日志发现某条消息没有发送,可以在这里发送回去:

原文地址:https://www.cnblogs.com/yangxiaohui227/p/11331086.html

时间: 2024-10-09 00:55:25

springboot项目整合rabbitMq涉及消息的发送确认,消息的消费确认机制的相关文章

微信公众平台-接收消息与发送被动消息

接收消息代码如下(包含回复消息调用): /// <summary> /// 接收用户消息 /// iftrue /// 2014-07-08 /// </summary> public class Receive { public delegate Models.Send_Msg delegate_SendMsg(string msgType); public delegate void delegate_RececiveHandler(Models.Receive_Msg mod

springboot 项目==基于websocket的服务端推送消息。

1.创建springboot项目,首先我们还是先引入依赖 <!-- webSocket begin--><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId></dependency><!-- webSocket end--> 2.创建配置类 

RabbitMQ事务和Confirm发送方消息确认——深入解读

引言 根据前面的知识(深入了解RabbitMQ工作原理及简单使用.Rabbit的几种工作模式介绍与实践)我们知道,如果要保证消息的可靠性,需要对消息进行持久化处理,然而消息持久化除了需要代码的设置之外,还有一个重要步骤是至关重要的,那就是保证你的消息顺利进入Broker(代理服务器),如图所示: 正常情况下,如果消息经过交换器进入队列就可以完成消息的持久化,但如果消息在没有到达broker之前出现意外,那就造成消息丢失,有没有办法可以解决这个问题? RabbitMQ有两种方式来解决这个问题: 通

SpringBoot整合RabbitMQ之发送接收消息实战

实战前言 前几篇文章中,我们介绍了SpringBoot整合RabbitMQ的配置以及实战了Spring的事件驱动模型,这两篇文章对于我们后续实战RabbitMQ其他知识要点将起到奠基的作用的.特别是Spring的事件驱动模型,当我们全篇实战完毕RabbitMQ并大概了解一下RabbitMQ相关组件的源码时,会发现其中的ApplicationEvent.ApplicationListener.ApplicationEventPublisher跟RabbitMQ的Message.Listener.R

SpringBoot和Mycat动态数据源项目整合

SpringBoot项目整合动态数据源(读写分离) 1.配置多个数据源,根据业务需求访问不同的数据,指定对应的策略:增加,删除,修改操作访问对应数据,查询访问对应数据,不同数据库做好的数据一致性的处理.由于此方法相对易懂,简单,不做过多介绍. 2. 动态切换数据源,根据配置的文件,业务动态切换访问的数据库:此方案通过Spring的AOP,AspactJ来实现动态织入,通过编程继承实现Spring中的AbstractRoutingDataSource,来实现数据库访问的动态切换,不仅可以方便扩展,

使用Maven插件构建SpringBoot项目,生成Docker镜像push到DockerHub上

一个用于构建和推送Docker镜像的Maven插件. 使用Maven插件构建Docker镜像,将Docker镜像push到DockerHub上,或者私有仓库,上一篇文章是手写Dockerfile,这篇文章借助开源插件docker-maven-plugin 进行操作 以下操作.默认你已经阅读过我上一篇文章: Docker 部署 SpringBoot 项目整合 Redis 镜像做访问计数Demo http://www.ymq.io/2018/01/11/Docker-deploy-spring-bo

Java秒杀系统实战系列~整合RabbitMQ实现消息异步发送

摘要: 本篇博文是“Java秒杀系统实战系列文章”的第八篇,在这篇文章中我们将整合消息中间件RabbitMQ,包括添加依赖.加入配置信息以及自定义注入相关操作组件,比如RabbitTemplate等等,最终初步实现消息的发送和接收,并在下一篇章将其与邮件服务整合,实现“用户秒杀成功发送邮件通知消息”的功能! 内容: 对于消息中间件RabbitMQ,想必各位小伙伴没有用过.也该有听过,它是一款目前市面上应用相当广泛的消息中间件,可以实现消息异步通信.业务服务模块解耦.接口限流.消息分发等功能,在微

RocketMQ 整合SpringBoot发送事务消息

环境 jdk: 8u22rocketmq: rocketmq-all-4.5.2-bin-releasespringboot: 2.1.6.RELEASErocketmq-springboot: 2.0.3 发送流程(事务消息) Rocket发送事务消息:1.由producer发送prepare(半消息)给MQ的broker2.prepare消息发送成功以后执行本地业务(本地事务),根据本地事务执行结果手动返回相应状态(RocketMQLocalTransactionState.COMMIT.R

springboot学习笔记-6 springboot整合RabbitMQ

一 RabbitMQ的介绍 RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,现已经转让给apache). 消息中间件的工作过程可以用生产者消费者模型来表示.即,生产者不断的向消息队列发送信息,而消费者从消息队列中消费信息.具体过程如下: 从上图可看出,对于消息队列来说,生产者,消息队列,消费者是最重要的三个概念,生产者发消息到消息队列中去,消费者监听指定的消息