RabbitMQ延迟队列简单示例

简介

延迟队列存储的消息是不希望被消费者立刻拿到的,而是等待特定时间后,消费者才能拿到这个消息进行消费。使用场景比较多,例如订单限时30分钟内支付,否则取消,再如分布式环境中每隔一段时间重复执行某操作。

下面举一个简单的例子,例子大概意思是分别在首次发送消息后的10秒、40秒、100秒后重新读取到消息。为了直观,不使用RabbitMQ其他多余的特性。

准备工作

在Centos7下安装RabbitMQ,版本为3.6.12单机版本(非集群),IP是127.0.0.1,端口是15672,使用web管理页面或者rabbitmqctl提前准备好相关的用户、exchange和queue。

用户有producer(密码同用户名)、consumer(密码同用户名)。

Default exchange是RabbitMQ预定义的,名称为空字符串,自动绑定到每个queue,类型为direct,routingKey等于queue的名称。

三个死信交换器(队列中的消息过期后会被发送到该队列的死信交换器)10sDeadLetterExchange、30sDeadLetterExchange、60sDeadLetterExchange。

三个死信队列,10sDeadLetterQueue、30sDeadLetterQueue、60sDeadLetterQueue分别与三个死信交换器绑定。

三个队列,10sDelayQueue、30sDelayQueue、60sDelayQueue,分别设置过期时间为10秒、30秒、60秒,并设置对应的死信交换器。

示例

使用java代码创建一个生产者和三个消费者。生产者往Default exchange发送消息,routingKey为10sDelayQueue。三个消费者分别订阅三个死信队列。先启动三个消费者,再启动生产者。消费者会持续订阅死信队列,需要手动关闭连接。

Rabbitmq10sDeadLetterQueueConsumer消费者代码如下:

import com.rabbitmq.client.*;

import java.io.IOException;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.UUID;

import java.util.concurrent.TimeoutException;

public class Rabbitmq10sDeadLetterQueueConsumer {

public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory connectionFactory = new ConnectionFactory();

connectionFactory.setVirtualHost("/");

connectionFactory.setUsername("consumer");

connectionFactory.setPassword("consumer");

connectionFactory.setHost("127.0.0.1");

connectionFactory.setPort(15672);

Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();

channel.basicQos(64);

channel.basicConsume("10sDeadLetterQueue", false, UUID.randomUUID().toString(),

new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("第一次重试时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));

System.out.println("consumerTag:" + consumerTag);

System.out.println("envelope:" + envelope.toString());

System.out.println("basicProperties:" + properties.toString());

System.out.println("message:" + new String(body, "utf-8"));

channel.basicAck(envelope.getDeliveryTag(), false);

channel.basicPublish("", "30sDelayQueue",

new AMQP.BasicProperties().builder().deliveryMode(2).build(),

body);

}

});

}

}

Rabbitmq30sDeadLetterQueueConsumer消费者代码如下:

import com.rabbitmq.client.*;

import java.io.IOException;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.UUID;

import java.util.concurrent.TimeoutException;

public class Rabbitmq30sDeadLetterQueueConsumer {

public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory connectionFactory = new ConnectionFactory();

connectionFactory.setVirtualHost("/");

connectionFactory.setUsername("consumer");

connectionFactory.setPassword("consumer");

connectionFactory.setHost("127.0.0.1");

connectionFactory.setPort(15672);

Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();

channel.basicQos(64);

channel.basicConsume("30sDeadLetterQueue", false, UUID.randomUUID().toString(),

new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("第二次重试时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));

System.out.println("consumerTag:" + consumerTag);

System.out.println("envelope:" + envelope.toString());

System.out.println("basicProperties:" + properties.toString());

System.out.println("message:" + new String(body, "utf-8"));

channel.basicAck(envelope.getDeliveryTag(), false);

channel.basicPublish("", "60sDelayQueue",

new AMQP.BasicProperties().builder().deliveryMode(2).build(),

body);

}

});

}

}

Rabbitmq60sDeadLetterQueueConsumer消费者代码如下:

import com.rabbitmq.client.*;

import java.io.IOException;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.UUID;

import java.util.concurrent.TimeoutException;

public class Rabbitmq60sDeadLetterQueueConsumer {

public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory connectionFactory = new ConnectionFactory();

connectionFactory.setVirtualHost("/");

connectionFactory.setUsername("consumer");

connectionFactory.setPassword("consumer");

connectionFactory.setHost("127.0.0.1");

connectionFactory.setPort(15672);

Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();

channel.basicQos(64);

channel.basicConsume("60sDeadLetterQueue", false, UUID.randomUUID().toString(),

new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("第三次重试时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));

System.out.println("consumerTag:" + consumerTag);

System.out.println("envelope:" + envelope.toString());

System.out.println("basicProperties:" + properties.toString());

System.out.println("message:" + new String(body, "utf-8"));

channel.basicAck(envelope.getDeliveryTag(), false);

System.out.println("执行完成");

}

});

}

}

生产者代码如下:

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.UUID;

import java.util.concurrent.TimeoutException;

public class RabbitmqDelayQueueProducer {

public static void main(String[] args) throws IOException, TimeoutException {

Connection connection = null;

Channel channel = null;

try {

ConnectionFactory connectionFactory = new ConnectionFactory();

connectionFactory.setVirtualHost("/");

connectionFactory.setUsername("producer");

connectionFactory.setPassword("producer");

connectionFactory.setHost("127.0.0.1");

connectionFactory.setPort(15672);

connection = connectionFactory.newConnection();

channel = connection.createChannel();

channel.basicPublish("", "10sDelayQueue",

new AMQP.BasicProperties().builder().deliveryMode(2).build(),

("测试延迟队列" + UUID.randomUUID().toString()).getBytes("utf-8"));

System.out.println("发送时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));

} finally {

if (channel != null) {

channel.close();

}

if (connection != null) {

connection.close();

}

}

}

}

生产者打印:

发送时间:2019-08-25 10:08:29

Rabbitmq10sDeadLetterQueueConsumer消费者打印:

第一次重试时间:2019-08-25 10:08:39

consumerTag:83787685-28a9-4ae8-b2bc-b89b90f14b68

envelope:Envelope(deliveryTag=1, redeliver=false, exchange=10sDeadLetterExchange, routingKey=10sDelayQueue)

basicProperties:#contentHeader<basic>(content-type=null, content-encoding=null, headers={x-death=[{reason=expired, count=1, exchange=, time=Sun Aug 25 10:08:37 CST 2019, routing-keys=[10sDelayQueue], queue=10sDelayQueue}]}, delivery-mode=2, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)

message:测试延迟队列f862c616-a99a-4b65-9482-d74d5aee0814

Rabbitmq30sDeadLetterQueueConsumer消费者打印:

第二次重试时间:2019-08-25 10:09:09

consumerTag:27bb0e0e-07fe-49b6-8c0f-06fbc32cd784

envelope:Envelope(deliveryTag=1, redeliver=false, exchange=30sDeadLetterExchange, routingKey=30sDelayQueue)

basicProperties:#contentHeader<basic>(content-type=null, content-encoding=null, headers={x-death=[{reason=expired, count=1, exchange=, time=Sun Aug 25 10:09:07 CST 2019, routing-keys=[30sDelayQueue], queue=30sDelayQueue}]}, delivery-mode=2, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)

message:测试延迟队列f862c616-a99a-4b65-9482-d74d5aee0814

Rabbitmq60sDeadLetterQueueConsumer消费者打印:

第三次重试时间:2019-08-25 10:10:09

consumerTag:8bfdc795-443d-4940-bf88-1e6f14e7b530

envelope:Envelope(deliveryTag=1, redeliver=false, exchange=60sDeadLetterExchange, routingKey=60sDelayQueue)

basicProperties:#contentHeader<basic>(content-type=null, content-encoding=null, headers={x-death=[{reason=expired, count=1, exchange=, time=Sun Aug 25 10:10:07 CST 2019, routing-keys=[60sDelayQueue], queue=60sDelayQueue}]}, delivery-mode=2, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)

message:测试延迟队列f862c616-a99a-4b65-9482-d74d5aee0814

执行完成

原文地址:https://www.cnblogs.com/gjb724332682/p/11407296.html

时间: 2024-11-05 23:35:26

RabbitMQ延迟队列简单示例的相关文章

PHP静态延迟绑定简单示例

没怎么用过这个新特性,其实也不算新啦,试试吧,现在静态类的继承很方便了 <?php class A { protected static $def = '123456'; public static function test() { echo get_class(new static); } public static function test2() { echo static::$def; } } class B extends A { protected static $def = '4

C# RabbitMQ延迟队列功能实战项目演练

一.需求背景 当用户在商城上进行下单支付,我们假设如果8小时没有进行支付,那么就后台自动对该笔交易的状态修改为订单关闭取消,同时给用户发送一份邮件提醒.那么我们应用程序如何实现这样的需求场景呢?在之前的<C# Redis缓存过期实现延迟通知实战演练>分享课程中阿笨最后总结的时候说过Redis Pub/Sub是一种并不可靠地消息机制,他不会做信息的存储,只是在线转发,那么肯定也没有ack确认机制,另外只有订阅段监听时才会转发!我们是否有更好的方式去实现呢?今天给大家分享的比较好的解决方案就是通过

SpringBoot:RabbitMQ 延迟队列

SpringBoot 是为了简化 Spring 应用的创建.运行.调试.部署等一系列问题而诞生的产物,自动装配的特性让我们可以更好的关注业务本身而不是外部的XML配置,我们只需遵循规范,引入相关的依赖就可以轻易的搭建出一个 WEB 工程 初探RabbitMQ消息队列中介绍了RabbitMQ的简单用法,顺带提及了下延迟队列的作用.所谓延时消息就是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费. 延迟队列 延迟队列能做什么? 订单业务: 在电商/点餐

【RabbitMQ】一文带你搞定RabbitMQ延迟队列

本文口味:鱼香肉丝? ?预计阅读:10分钟 一.说明 在上一篇中,介绍了RabbitMQ中的死信队列是什么,何时使用以及如何使用RabbitMQ的死信队列.相信通过上一篇的学习,对于死信队列已经有了更多的了解,这一篇的内容也跟死信队列息息相关,如果你还不了解死信队列,那么建议你先进行上一篇文章的阅读. 这一篇里,我们将继续介绍RabbitMQ的高级特性,通过本篇的学习,你将收获: 什么是延时队列 延时队列使用场景 RabbitMQ中的TTL 如何利用RabbitMQ来实现延时队列 二.本文大纲

C#实现rabbitmq 延迟队列功能

最近在研究rabbitmq,项目中有这样一个场景:在用户要支付订单的时候,如果超过30分钟未支付,会把订单关掉.当然我们可以做一个定时任务,每个一段时间来扫描未支付的订单,如果该订单超过支付时间就关闭,但是在数据量小的时候并没有什么大的问题,但是数据量一大轮训数据库的方式就会变得特别耗资源.当面对千万级.上亿级数据量时,本身写入的IO就比较高,导致长时间查询或者根本就查不出来,更别说分库分表以后了.除此之外,还有优先级队列,基于优先级队列的JDK延迟队列,时间轮等方式.但如果系统的架构中本身就有

实现rabbitmq 延迟队列功能

最近在研究rabbitmq,项目中有这样一个场景:在用户要支付订单的时候,如果超过30分钟未支付,会把订单关掉.当然我们可以做一个定时任务,每个一段时间来扫描未支付的订单,如果该订单超过支付时间就关闭,但是在数据量小的时候并没有什么大的问题,但是数据量一大轮训数据库的方式就会变得特别耗资源.当面对千万级.上亿级数据量时,本身写入的IO就比较高,导致长时间查询或者根本就查不出来,更别说分库分表以后了.除此之外,还有优先级队列,基于优先级队列的JDK延迟队列,时间轮等方式.但如果系统的架构中本身就有

RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知

在第三方支付中,例如支付宝.或者微信,对于订单请求,第三方支付系统采用的是消息同步返回.异步通知+主动补偿查询的补偿机制. 由于互联网通信的不可靠性,例如双方网络.服务器.应用等因素的影响,不管是同步返回.异步通知.主动查询报文都可能出现超时无响应.报文丢失等情况,所以像支付业务,对结果的通知一般采用几种方案结合的补偿机制,不能完全依赖某一种机制. 例如一个支付结果的通知,一方面会在支付页面跳转时候返回支付结果(一般只用作前端展示使用,非最终状态),同时会采用后台异步通知机制(有前台.后台通知的

Spring Boot(十四)RabbitMQ延迟队列

一.前言 延迟队列的使用场景:1.未按时支付的订单,30分钟过期之后取消订单:2.给活跃度比较低的用户间隔N天之后推送消息,提高活跃度:3.过1分钟给新注册会员的用户,发送注册邮件等. 实现延迟队列的方式有两种: 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能: 使用rabbitmq-delayed-message-exchange插件实现延迟功能: 注意: 延迟插件rabbitmq-delayed-message-exchange是在RabbitMQ 3.5.7及以上

rabbitmq延迟队列demo

工程结构: 定义jar包依赖的版本,版本很重要,rabbit依赖spring,必须一致,否则报错: <properties> <springframework.version>4.2.7.RELEASE</springframework.version> <spring-rabbit.version>1.6.1.RELEASE</spring-rabbit.version> <junit.version>4.12</junit.