1.pom文件
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.yml配置文件
spring: #MQ配置 rabbitmq: addresses: 127.0.0.1 port: 5672 username: adminmq password: 123456 publisher-confirms: true publisher-returns: true template: retry: enabled: true mandatory: true listener: simple: acknowledge-mode: manual #并发消费者初始化值 concurrency: 10 #并发消费者的最大值 max-concurrency: 20 #每个消费者每次监听时可拉取处理的消息数量 prefetch: 5 direct: retry: enabled: true max-attempts: 1
3.消费者代码(手动确认)
/** * msgByte:报文头加报文体 * channel和message 消息确认机制 * queuesToDeclare = @Queue("${queueropertie.queue-name}") *///点对点//@RabbitListener([email protected](QueueAndExchangeProperties.afsendfirQueue))//发布订阅@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = "${queueropertie.exchange}",durable = "true",type = "direct"), value = @Queue(value = "${queueropertie.queue-name}",durable = "true"), key = "${queueropertie.exchangekey}" )) @RabbitHandler public void monitoringMethod(byte[] msgByte, Channel channel, Message message) throws IOException { Map<String, Object> logMap = new ConcurrentHashMap<>(); try { //消息确认 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); monotoringInsertDB(msgByte,new ConcurrentHashMap<>()); } catch (Exception e) { //失败后消息被确认 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); commonRabbitService.insertLogError(logMap, ERROR_104, e.getMessage()); LOGGER.error("mq接收消息失败",e); } }
4.生产者
@Autowired RabbitTemplate rabbitTemplate; //发布订阅 rabbitTemplate.convertAndSend(QueueAndExchangeProperties.afMQExchange, QueueAndExchangeProperties.afIcf, msgStr.getBytes());//点对点//rabbitTemplate.convertAndSend(QueueAndExchangeProperties.afsendsecQueue,msgStr);
原文地址:https://www.cnblogs.com/yuhongqiang/p/11497258.html
时间: 2024-11-10 15:54:00