基于RabbitMQ实现分布式延时任务调度

一.分布式延时任务

传统做法是将延时任务插入数据库,使用定时去扫描,比对任务是否到期,到期则执行并设置任务状态为完成。这种做法在分布式环境下还需要对定时扫描做特殊处理(加分布式锁)避免任务被重复执行。

然而使用RabbitMQ实现延时任务可以天然解决分布式环境下重复执行的问题(利用mq中消息只会被一个消费者消费这一特性可以让延时任务只会被一个消费者执行)。基于RabbitMQ做延时任务的核心是利用RabbitMQ的消息到期转发特性。发送消息时设置消息到期时间,等消息到期未被消费时会将消息转发到一个新的队列,新队列的消费者收到消息后再处理,利用这种时间差特性实现任务的延时触发。

二.准备RabbitMQ并设置延时任务用到的相关队列

1.安装erlang和RabbitMQ(注意erlang与RabbitMQ的版本对应关系)

2.开启rabbitmq_management

打开RabbitMQ Command Prompt输入命令:rabbitmq-plugins enable rabbitmq_management

3.创建两个Exchange

创建一个Exchange用于添加延时任务,相关配置如下

再创建一个Exchange用于接收到期的延时任务,相关配置如下

4.创建两个Queue

创建第一个Queue,用于添加延时任务,相关配置如下

上面配置创建了一个队列q1,设置到期消息被转移的目的地Exchange(dlx)和Route key(dlx_rk)

接下来配置q1绑定的Exchange为ExQ1,Route key为send

再创建第二个Queue,用于接收队列q1中到期被转移的任务,相关配置如下

并绑定到Exchange:dlx,Route key:dlx_rk

通过上面两个Exchange和两个Queue的配置,让RabbitMQ支持q1中的消息到期后转移到q2中。所以业务上我们只用将延时任务发送到q1,让任务到期触发执行的业务代码去监听(消费)q2。这样基本上就实现了分布式环境下延时任务的创建以及到期调度触发执行。

三.具体代码实现

1.创建简单maven项目,添加如下依赖

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.3</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>
    </dependencies>

2.封装用到的RabbitMQ操作

 1 import com.rabbitmq.client.*;
 2
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5
 6 /**
 7  * 1.连接RabbitMQ
 8  * 2.添加延时任务
 9  * 3.消费延时任务
10  */
11 public class RabbitMQUtil {
12
13     private static Connection conn;
14     private static Channel channel;
15
16     /**
17      * 初始化RabbitMQ连接与channel
18      */
19     static {
20         ConnectionFactory factory = new ConnectionFactory();
21         factory.setUsername("guest");
22         factory.setPassword("guest");
23         factory.setVirtualHost("/");
24         factory.setHost("localhost");
25         factory.setPort(5672);
26
27         try {
28             conn = factory.newConnection();
29             channel = conn.createChannel();
30         } catch (IOException e) {
31             System.out.println("获取RabbitMQ连接失败");
32         } catch (TimeoutException e) {
33             System.out.println("获取RabbitMQ连接超时");
34         }
35     }
36
37 //    public static void close() throws IOException, TimeoutException {
38 //        if (Objects.nonNull(channel))
39 //            channel.close();
40 //        if (Objects.nonNull(conn))
41 //            conn.close();
42 //    }
43
44     /**
45      * 向指定exchange下route key发送延时任务
46      * @param msg 延时任务JSON bytes
47      * @param exchangeName
48      * @param routingKey
49      * @param expiration 延时时间
50      */
51     public static void addTask(byte[] msg, String exchangeName, String routingKey, int expiration) {
52         try {
53             channel.basicPublish(exchangeName, routingKey,
54                     new AMQP.BasicProperties.Builder()
55                             .expiration(String.valueOf(expiration))
56                             .build(), msg);
57         } catch (IOException e) {
58             e.printStackTrace();
59         }
60     }
61
62     /**
63      * 消费指定queue的消息(延时任务)
64      * @param queueName
65      * @param handler 任务处理器
66      * @param consumerTag 消费者标签(多个消费者同时消息同一queue时可以使用consumerTag作区分)
67      */
68     public static void bindConsumer(String queueName, DemoTaskHandler handler, String consumerTag) {
69         try {
70             channel.basicConsume(queueName, false, consumerTag,
71                     new DefaultConsumer(channel) {
72                         @Override
73                         public void handleDelivery(String consumerTag,
74                                                    Envelope envelope,
75                                                    AMQP.BasicProperties properties,
76                                                    byte[] body)
77                                 throws IOException {
78                             long deliveryTag = envelope.getDeliveryTag();
79                             // (process the message components here ...)
80                             handler.execute(body, consumerTag);
81                             channel.basicAck(deliveryTag, false); // 应答,告知queue成功收到消息
82                         }
83                     });
84         } catch (IOException e) {
85             e.printStackTrace();
86         }
87     }
88
89 }

3.模拟延时任务POJO

 1 import java.io.Serializable;
 2
 3 public class DemoTask implements Serializable {
 4
 5     private int id;
 6
 7     public int getId() {
 8         return id;
 9     }
10
11     public void setId(int id) {
12         this.id = id;
13     }
14 }

4.延时任务处理器

1 import com.alibaba.fastjson.JSON;
2
3 public class DemoTaskHandler {
4
5     public void execute(byte[] body, String consumerTag) {
6         DemoTask task = JSON.parseObject(new String(body), DemoTask.class);
7         System.out.println(consumerTag + "收到延时任务id:" + task.getId() + " 并处理完毕");
8     }
9 }

5.设计一个主程序往q1队列发送延时任务

 1 import com.alibaba.fastjson.JSON;
 2
 3 import java.util.Scanner;
 4
 5 public class Producer {
 6
 7     public static void main(String[] args) {
 8         // 添加延时任务
 9         System.out.println("按下键盘添加延时任务");
10         Scanner sc = new Scanner(System.in);
11         int i = 1;
12         while (sc.hasNextLine()) {
13             sc.nextLine();
14             DemoTask bo = new DemoTask();
15             bo.setId(i++);
16             RabbitMQUtil.addTask(JSON.toJSONString(bo).getBytes(),
17                     "ExQ1",
18                     "send",
19                     10000);
20             System.out.println("成功添加一个延时任务");
21         }
22     }
23
24 }

6.创建两个消费者(处理延时任务的业务)消费延时任务,模拟分布式环境

 1 public class Consumer1 {
 2
 3     public static void main(String[] args) {
 4         // 模拟分布式环境,处理到期的延时任务
 5         RabbitMQUtil.bindConsumer("q2",
 6                 new DemoTaskHandler(),
 7                 "consumer1");
 8
 9     }
10
11 }
1 public class Consumer2 {
2
3     public static void main(String[] args) {
4         // 模拟分布式环境,处理到期的延时任务
5         RabbitMQUtil.bindConsumer("q2",
6                 new DemoTaskHandler(),
7                 "consumer2");
8     }
9 }

7.运行Producer,Consumer1,Consumer2观察结果

通过观察发现,每次发送一个延时任务后,过10秒会被consumer1或者consumer2消费,以上就基本实现了分布式延时任务调度。

原文地址:https://www.cnblogs.com/yhcjhun/p/11727906.html

时间: 2024-10-09 14:55:36

基于RabbitMQ实现分布式延时任务调度的相关文章

一次基于etcd的分布式锁自动延时失败问题的排查

今天在测试基于etcd的分布式锁过程中,在测试获取锁后,释放之前超出TTL时长的情况下自动延长TTL这部分功能,在延长指定key的TTL时总是返回404错误信息,在对目标KEY更新TTL时目标KEY已不存在. 最终问题排查为ETCD集群3个节点之间的系统时间不一致,因为TTL延长是在KEY创建后单独一个监听线程中进行,在TTL过半之后会更新TTL,因此可能出现更新TTL之前,由于集群中时间超前的节点将目标KEY删除,导致更新TTL时找不到目标KEY的错误. 同步集群所有节点系统时间后问题排除:

一个基于RabbitMQ的可复用的事务消息方案

原文:一个基于RabbitMQ的可复用的事务消息方案 前提# 分布式事务是微服务实践中一个比较棘手的问题,在笔者所实施的微服务实践方案中,都采用了折中或者规避强一致性的方案.参考Ebay多年前提出的本地消息表方案,基于RabbitMQ和MySQL(JDBC)做了轻量级的封装,实现了低入侵性的事务消息模块.本文的内容就是详细分析整个方案的设计思路和实施.环境依赖如下: JDK1.8+ spring-boot-start-web:2.x.x.spring-boot-start-jdbc:2.x.x.

【教程分享】基于Greenplum Hadoop分布式平台的大数据解决方案及商业应用案例剖析

基于Greenplum Hadoop分布式平台的大数据解决方案及商业应用案例剖析 课程讲师:迪伦 课程分类:Java 适合人群:高级 课时数量:96课时 用到技术:MapReduce.HDFS.Map-Reduce.Hive.Sqoop 涉及项目:Greenplum Hadoop大数据分析平台 更新程度:完毕 对这个课程有兴趣的朋友可以加我的QQ2059055336和我联系 下载地址:链接:   pan.baidu.com/s/1nthYpKH 密码: niyi 随着云计算.大数据迅速发展,亟需

分布式的任务调度框架

[niubi-job——一个分布式的任务调度框架]----niubi-job这下更牛逼了! niubi-job迎来第一次重大优化 niubi-job是一款专门针对定时任务所设计的分布式任务调度框架,它可以进行动态发布任务,并且有超高的可用性保证. 有多少人半夜被叫起来查BUG,结果差到最后发现,是因为某个定时任务挂了导致出了问题? 有了niubi-job,你再也不用担心这个问题! 又有多少人因为要发布一个新的定时任务,为了不影响线上的运行,只能等到半夜再去发布应用? 有了niubi-job,你可

基于请求的分布式互斥算法

一个悲剧的文章,研究的东西确实比较老,但是因为这些研究,让我对分布式的底层的关系有了更加清晰的认识,也算是不枉此功. 下面贴出来核心的部分. 引言 分布式系统中的一组进程可能会同时访问一个资源或者同时执行一个给定的函数,我们称这些资源或者函数为临界区(Critical Section),若不加控制的话,会造成资源或者环境的不一致的现象.保证任何给定时刻只允许一个进程或者给定的进程去执行临界区的算法称为互斥算法.互斥也可以称为并发控制. 这个问题最早由Dijkstra[1]在1965年提出.互斥可

分布式 延时任务解决方案

在开发中,往往会遇到一些关于延时任务的需求.例如 生成订单30分钟未支付,则自动取消 生成订单60秒后,给用户发短信 对上述的任务,我们给一个专业的名字来形容,那就是延时任务.那么这里就会产生一个问题,这个延时任务和定时任务的区别究竟在哪里呢?一共有如下几点区别 定时任务有明确的触发时间,延时任务没有 定时任务有执行周期,而延时任务在某事件触发后一段时间内执行,没有执行周期 定时任务一般执行的是批处理操作是多个任务,而延时任务一般是单个任务 下面,我们以判断订单是否超时为例,进行方案分析 red

个推基于 Zipkin 的分布式链路追踪实践

作者:个推应用平台基础架构高级研发工程师 阿飞 01业务背景 随着微服务架构的流行,系统变得越来越复杂,单体的系统被拆成很多个模块,各个模块通过轻量级的通信协议进行通讯,相互协作,共同实现系统功能. 单体架构时,一个请求的调用链路很清晰,一般由负载均衡器将用户请求转发到后端服务,由后端服务进行业务处理,需要的数据从外部的存储中获取,处理完请求后,再经由负载均衡器返回给用户. 而在微服务架构中,一个请求往往需要多个模块共同协作处理,不同模块可能还依赖于不同的外部存储,各个模块的实现技术还不尽相同,

基于ZooKeeper的分布式Session实现(转)

1.   认识ZooKeeper ZooKeeper—— “动物园管理员”.动物园里当然有好多的动物,游客可以根据动物园提供的向导图到不同的场馆观赏各种类型的动物,而不是像走在原始丛林里,心惊胆颤的被动 物所观赏.为了让各种不同的动物呆在它们应该呆的地方,而不是相互串门,或是相互厮杀,就需要动物园管理员按照动物的各种习性加以分类和管理,这样我们才 能更加放心安全的观赏动物.回到我们企业级应用系统中,随着信息化水平的不断提高,我们的企业级系统变得越来越庞大臃肿,性能急剧下降,客户抱怨频频.拆 分系

基于Redis的分布式锁到底安全吗(上)?

网上有关Redis分布式锁的文章可谓多如牛毛了,不信的话你可以拿关键词"Redis 分布式锁"随便到哪个搜索引擎上去搜索一下就知道了.这些文章的思路大体相近,给出的实现算法也看似合乎逻辑,但当我们着手去实现它们的时候,却发现如果你越是仔细推敲,疑虑也就越来越多. 实际上,大概在一年以前,关于Redis分布式锁的安全性问题,在分布式系统专家Martin Kleppmann和Redis的作者antirez之间就发生过一场争论.由于对这个问题一直以来比较关注,所以我前些日子仔细阅读了与这场争