SpringBoot集成RabbitMQ(注解+手动确认)

1.pom文件

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.yml配置文件

spring:
 #MQ配置
  rabbitmq:
    addresses: 127.0.0.1
    port: 5672
    username: adminmq
    password: 123456
    publisher-confirms: true
    publisher-returns: true
    template:
      retry:
        enabled: true
      mandatory: true
    listener:
      simple:
        acknowledge-mode: manual
        #并发消费者初始化值
        concurrency: 10
        #并发消费者的最大值
        max-concurrency: 20
        #每个消费者每次监听时可拉取处理的消息数量
        prefetch: 5

      direct:
        retry:
          enabled: true
          max-attempts: 1

3.消费者代码(手动确认)

/** * msgByte:报文头加报文体 * channel和message  消息确认机制 * queuesToDeclare = @Queue("${queueropertie.queue-name}") *///点对点//@RabbitListener([email protected](QueueAndExchangeProperties.afsendfirQueue))//发布订阅@RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = "${queueropertie.exchange}",durable = "true",type = "direct"),
            value = @Queue(value = "${queueropertie.queue-name}",durable = "true"),
            key = "${queueropertie.exchangekey}"
    ))
    @RabbitHandler
    public void monitoringMethod(byte[] msgByte, Channel channel, Message message) throws IOException {
        Map<String, Object> logMap = new ConcurrentHashMap<>();
        try {
            //消息确认
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            monotoringInsertDB(msgByte,new ConcurrentHashMap<>());
        } catch (Exception e) {
            //失败后消息被确认
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            commonRabbitService.insertLogError(logMap, ERROR_104, e.getMessage());
            LOGGER.error("mq接收消息失败",e);
        }
    }

4.生产者

@Autowired
RabbitTemplate rabbitTemplate;
//发布订阅
rabbitTemplate.convertAndSend(QueueAndExchangeProperties.afMQExchange, QueueAndExchangeProperties.afIcf, msgStr.getBytes());//点对点//rabbitTemplate.convertAndSend(QueueAndExchangeProperties.afsendsecQueue,msgStr);


原文地址:https://www.cnblogs.com/yuhongqiang/p/11497258.html

时间: 2024-11-10 15:54:00

SpringBoot集成RabbitMQ(注解+手动确认)的相关文章

Java SpringBoot集成RabbitMq实战和总结

目录 交换器.队列.绑定的声明 关于消息序列化 同一个队列多消费类型 注解将消息和消息头注入消费者方法 关于消费者确认 关于发送者确认模式 消费消息.死信队列和RetryTemplate RPC模式的消息(不常用) 关于消费模型 关于RabbitMq客户端的线程模型 在公司里一直在用RabbitMQ,由于api已经封装的很简单,关于RabbitMQ本身还有封装的实现没有了解,最近在看RabbitMQ实战这本书,结合网上的一些例子和spring文档,实现了RabbitMQ和spring的集成,对着

SpringBoot集成rabbitmq(一)

前言 Rabbitmq是一个开源的消息代理软件,是AMQP协议的实现.核心作用就是创建消息队列,异步发送和接收消息.通常用来在高并发中处理削峰填谷.延迟处理.解耦系统之间的强耦合.处理秒杀订单.  入门rabbitmq之前主要是想了解下秒杀排队订单入库后,异步通知客户端秒杀结果. 基础知识 1.基本概念(角色) 了解rabbitmq之前先要了解3个基本概念:生产者.消费者.代理(队列). rabbitmq在生产者和代理中间做了一层抽象.这样消息生产者和队列就没有直接联系,在中间加入了一层交换器(

SpringBoot 集成RabbitMQ

1.application.yml 配置 spring:rabbitmq: host: localhost port: 5672 listener: simple: acknowledge-mode: manual // 手动签发 prefetch: 1 retry: enabled: true initial-interval: 60000 # 第一次和第二次尝试发布或传递消息间隔: 1分钟 max-attempts: 8 max-interval: 7200000 # 最大重试时间间隔:2小

Spring-boot集成RabbitMQ踩过的坑

1.java.net.SocketException: socket closed 官方文档已经说明,新建user和guest的账户是没有远程登录的权限的 需要对登录所用账户授权 解决方法: rabbitmqctl set_permissions -p /${user_name} user_admin '.*' '.*' '.*' 2. An unexpected connection driver error occured 报错如下 [AMQP Connection 192.168.71.1

springboot集成RabbitMQ,Eclipse开发集成RabbitMq,IDEA集成RabbitMQ报错 socket close

java.net.SocketException: socket closed    at java.net.SocketInputStream.socketRead0(Native Method)    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)    at java.net.SocketInputStream.read(SocketInputStream.java:170)    at java.n

springBoot(24):集成rabbitmq

注意:springboot支持的amqp规范的中间件只有rabbitmq 第一步:添加依赖 <!-- amqp --> <dependency>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 第二步:配置application.yml sp

使用rabbitmq手动确认消息的,定时获取队列消息实现

描述问题 最近项目中因为有些数据,需要推送到第三方系统中,因为数据会一直增加,并且需要与第三方系统做相关交互. 相关业务 本着不影响线上运行效率的思想,我们将增加的消息放入rabbitmq,使用另一个应用获取消费,因为数据只是推送,并且业务的数据有15分钟左右的更新策略,对实时性不是很高所以我们需要一个定时任务来主动链接rabbit去消费,然后将数据以网络方式传送 相关分析 网络上大致出现了相关的解决办法,但由于实现相关数据丢失及处理.性能和效率等相关基础业务的工作量,望而却步...... 还好

springboot + shiro 权限注解、请求乱码解决、统一异常处理

springboot + shiro 权限注解.请求乱码解决.统一异常处理 前篇 后台权限管理系统 相关: spring boot + mybatis + layui + shiro后台权限管理系统 springboot + shiro之登录人数限制.登录判断重定向.session时间设置 springboot + shiro 动态更新用户信息 基于前篇,新增功能: 新增shiro权限注解: 请求乱码问题解决: 统一异常处理. 源码已集成到项目中: github源码: https://githu

RabbitMQ的消息确认机制

一:确认种类 RabbitMQ的消息确认有两种. 一种是消息发送确认.这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递.发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列. 第二种是消费接收确认.这种是确认消费者是否成功消费了队列中的消息. 二:消息发送确认 (1)ConfirmCallback 通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调. 使用该功能需要开启确认,spring-boot中配置如下: spr