分布式 延时任务解决方案

在开发中,往往会遇到一些关于延时任务的需求。例如

  • 生成订单30分钟未支付,则自动取消
  • 生成订单60秒后,给用户发短信

对上述的任务,我们给一个专业的名字来形容,那就是延时任务。那么这里就会产生一个问题,这个延时任务和定时任务的区别究竟在哪里呢?一共有如下几点区别

  1. 定时任务有明确的触发时间,延时任务没有
  2. 定时任务有执行周期,而延时任务在某事件触发后一段时间内执行,没有执行周期
  3. 定时任务一般执行的是批处理操作是多个任务,而延时任务一般是单个任务

下面,我们以判断订单是否超时为例,进行方案分析

redis缓存

- 思路一

利用redis的zset,zset是一个有序集合,每一个元素(member)都关联了一个score,通过score排序来取集合中的值

添加元素:ZADD key score member [[score member] [score member] …]

按顺序查询元素:ZRANGE key start stop [WITHSCORES]

查询元素score:ZSCORE key member

移除元素:ZREM key member [member …]

测试如下

# 添加单个元素

redis> ZADD page_rank 10 google.com

(integer) 1

# 添加多个元素

redis> ZADD page_rank 9 baidu.com 8 bing.com

(integer) 2

redis> ZRANGE page_rank 0 -1 WITHSCORES

1) "bing.com"

2) "8"

3) "baidu.com"

4) "9"

5) "google.com"

6) "10"

# 查询元素的score值

redis> ZSCORE page_rank bing.com

"8"

# 移除单个元素

redis> ZREM page_rank google.com

(integer) 1

redis> ZRANGE page_rank 0 -1 WITHSCORES

1) "bing.com"

2) "8"

3) "baidu.com"

4) "9"

那么如何实现呢?我们将订单超时时间戳与订单号分别设置为score和member,系统扫描第一个元素判断是否超时,具体如下图所示

实现一

package com.rjzheng.delay4;

import java.util.Calendar;

import java.util.Set;

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.Tuple;

public class AppTest {

private static final String ADDR = "127.0.0.1";

private static final int PORT = 6379;

private static JedisPool jedisPool = new JedisPool(ADDR, PORT);

public static Jedis getJedis() {

return jedisPool.getResource();

}

//生产者,生成5个订单放进去

public void productionDelayMessage(){

for(int i=0;i<5;i++){

//延迟3秒

Calendar cal1 = Calendar.getInstance();

cal1.add(Calendar.SECOND, 3);

int second3later = (int) (cal1.getTimeInMillis() / 1000);

AppTest.getJedis().zadd("OrderId", second3later,"OID0000001"+i);

System.out.println(System.currentTimeMillis()+"ms:redis生成了一个订单任务:订单ID为"+"OID0000001"+i);

}

}

//消费者,取订单

public void consumerDelayMessage(){

Jedis jedis = AppTest.getJedis();

while(true){

Set<Tuple> items = jedis.zrangeWithScores("OrderId", 0, 1);

if(items == null || items.isEmpty()){

System.out.println("当前没有等待的任务");

try {

Thread.sleep(500);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

continue;

}

int  score = (int) ((Tuple)items.toArray()[0]).getScore();

Calendar cal = Calendar.getInstance();

int nowSecond = (int) (cal.getTimeInMillis() / 1000);

if(nowSecond >= score){

String orderId = ((Tuple)items.toArray()[0]).getElement();

jedis.zrem("OrderId", orderId);

System.out.println(System.currentTimeMillis() +"ms:redis消费了一个任务:消费的订单OrderId为"+orderId);

}

}

}

public static void main(String[] args) {

AppTest appTest =new AppTest();

appTest.productionDelayMessage();

appTest.consumerDelayMessage();

}

}

此时对应输出如下

可以看到,几乎都是3秒之后,消费订单。

然而,这一版存在一个致命的硬伤,在高并发条件下,多消费者会取到同一个订单号,我们上测试代码ThreadTest

package com.rjzheng.delay4;

import java.util.concurrent.CountDownLatch;

public class ThreadTest {

private static final int threadNum = 10;

private static CountDownLatch cdl = new CountDownLatch(threadNum);

static class DelayMessage implements Runnable{

public void run() {

try {

cdl.await();

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

AppTest appTest =new AppTest();

appTest.consumerDelayMessage();

}

}

public static void main(String[] args) {

AppTest appTest =new AppTest();

appTest.productionDelayMessage();

for(int i=0;i<threadNum;i++){

new Thread(new DelayMessage()).start();

cdl.countDown();

}

}

}

输出如下所示

显然,出现了多个线程消费同一个资源的情况。

解决方案

(1)用分布式锁,但是用分布式锁,性能下降了,该方案不细说。

(2)对ZREM的返回值进行判断,只有大于0的时候,才消费数据,于是将consumerDelayMessage()方法里的

if(nowSecond >= score){

String orderId = ((Tuple)items.toArray()[0]).getElement();

jedis.zrem("OrderId", orderId);

System.out.println(System.currentTimeMillis()+"ms:redis消费了一个任务:消费的订单OrderId为"+orderId);

}

修改为

if(nowSecond >= score){

String orderId = ((Tuple)items.toArray()[0]).getElement();

Long num = jedis.zrem("OrderId", orderId);

if( num != null && num>0){

System.out.println(System.currentTimeMillis()+"ms:redis消费了一个任务:消费的订单OrderId为"+orderId);

}

}

在这种修改后,重新运行ThreadTest类,发现输出正常了

- 思路二

该方案使用redis的Keyspace Notifications,中文翻译就是键空间机制,就是利用该机制可以在key失效之后,提供一个回调,实际上是redis会给客户端发送一个消息。是需要redis版本2.8以上。

实现二

在redis.conf中,加入一条配置

notify-keyspace-events Ex

运行代码如下

package com.rjzheng.delay5;

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.JedisPubSub;

public class RedisTest {

private static final String ADDR = "127.0.0.1";

private static final int PORT = 6379;

private static JedisPool jedis = new JedisPool(ADDR, PORT);

private static RedisSub sub = new RedisSub();

public static void init() {

new Thread(new Runnable() {

public void run() {

jedis.getResource().subscribe(sub, "[email protected]__:expired");

}

}).start();

}

public static void main(String[] args) throws InterruptedException {

init();

for(int i =0;i<10;i++){

String orderId = "OID000000"+i;

jedis.getResource().setex(orderId, 3, orderId);

System.out.println(System.currentTimeMillis()+"ms:"+orderId+"订单生成");

}

}

static class RedisSub extends JedisPubSub {

<a href=‘http://www.jobbole.com/members/wx610506454‘>@Override</a>

public void onMessage(String channel, String message) {

System.out.println(System.currentTimeMillis()+"ms:"+message+"订单取消");

}

}

}

输出如下

可以明显看到3秒过后,订单取消了

ps:redis的pub/sub机制存在一个硬伤,官网内容如下

原:Because Redis Pub/Sub is fire and forget currently there is no way to use this feature if your application demands reliable notification of events, that is, if your Pub/Sub client disconnects, and reconnects later, all the events delivered during the time the client was disconnected are lost.

翻: Redis的发布/订阅目前是即发即弃(fire and forget)模式的,因此无法实现事件的可靠通知。也就是说,如果发布/订阅的客户端断链之后又重连,则在客户端断链期间的所有事件都丢失了。
因此,方案二不是太推荐。当然,如果你对可靠性要求不高,可以使用。

优缺点

优点:(1)由于使用Redis作为消息通道,消息都存储在Redis中。如果发送程序或者任务处理程序挂了,重启之后,还有重新处理数据的可能性。
(2)做集群扩展相当方便
(3)时间准确度高

缺点:(1)需要额外进行redis维护

(5)使用消息队列

我们可以采用rabbitMQ的延时队列。RabbitMQ具有以下两个特性,可以实现延迟队列

  • RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter
  • lRabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,用来控制队列内出现了deadletter,则按照这两个参数重新路由。
    结合以上两个特性,就可以模拟出延迟消息的功能,具体的,我改天再写一篇文章,这里再讲下去,篇幅太长。

优缺点

优点: 高效,可以利用rabbitmq的分布式特性轻易的进行横向扩展,消息支持持久化增加了可靠性。

缺点:本身的易用度要依赖于rabbitMq的运维.因为要引用rabbitMq,所以复杂度和成本变高

原文链接: https://mp.weixin.qq.com/s/4RMT427vnsRezfV_s7RVGA

原文地址:https://www.cnblogs.com/wangdaijun/p/9498498.html

时间: 2024-07-30 18:03:08

分布式 延时任务解决方案的相关文章

分布式事务,EventBus 解决方案:CAP【中文文档】(转)

出处:http://www.cnblogs.com/savorboard/p/cap-document.html 前言 很多同学想对CAP的机制以及用法等想有一个详细的了解,所以花了将近两周时间写了这份中文的CAP文档,对 CAP 还不知道的同学可以先看一下这篇文章. 本文档为 CAP 文献(Wiki),本文献同时提供中文和英文版本,英文版本目前还在翻译中,会放到Github Wiki 中. 目录 前言 1.Getting Started 1.1 介绍 1.2 应用场景 1.3 Quick St

设计----【分布式事务】分布式事务和解决方案

一.前言 分布式事务是企业集成中的一个技术难点,也是每一个分布式系统架构中都会涉及到的一个东西,特别是在微服务架构中,几乎可以说是无法避免,本文就分布式事务来简单聊一下. 二.数据库事务 在说分布式事务之前,我们先从数据库事务说起. 数据库事务可能大家都很熟悉,在开发过程中也会经常使用到.但是即使如此,可能对于一些细节问题,很多人仍然不清楚.比如很多人都知道数据库事务的几个特性:原子性(Atomicity ).一致性( Consistency ).隔离性或独立性( Isolation)和持久性(

分布式事务的解决方案

分布式事务是什么: 分布式事务是指事务的参与者.支持事务的服务器.资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上. 为什么会产生分布式事务: 当我们的单个数据库的性能产生瓶颈的时候,我们可能会对数据库进行分区,这里所说的分区指的是物理分区,分区之后可能不同的库就处于不同的服务器上了,这个时候单个数据库的ACID已经不能适应这种情况了,而在这种ACID的集群环境下,再想保证集群的ACID几乎是很难达到,或者即使能达到那么效率和性能会大幅下降,最为关键的是再很难扩展新的分区了,这个时

分布式事务一致性解决方案分析

一.从数据一致性谈起 一致性问题,"万恶之源"是数据冗余和分布并通过网络交互+网络异常是常态. 1.数据一致性的情形 主库.从库和缓存数据一致性,相同数据冗余,关系数据库,为保证关据库的高可用和高性能,一般会采用主从(备)架构并引入缓存.其中数据不一致性存在于数据冗余的时间窗口内.常用的解决方案见数据库之架构. 多副本数据之间的数据一致性,相同数据副本,大数据领域,一份数据会有多个副本并存储到不同的节点上.客户端可以访问任何一个节点进行读写操作.常用的解决方案是基于Paxos.ZAB.

cap理论与分布式事务的解决方案

现在很火的微服务架构所设计的系统是分布式系统.分布式系统有一个著名的CAP理论,即一个分布式系统要同时满足一致性(Consistency).可用性(Availablility)和分区容错(Partition Tolerance)三个特性是一件不可能的事情. CAP理论的简介 CAP理论是由Eric Brewer在2000年的PODC会议上提出的,该理论在两年后被证明成立. CAP理论告诉架构师不要妄想设计出同时满足三者的系统,应该有所取舍,设计出适合业务的系统. 一致性(Consistency)

基于RabbitMQ实现分布式延时任务调度

一.分布式延时任务 传统做法是将延时任务插入数据库,使用定时去扫描,比对任务是否到期,到期则执行并设置任务状态为完成.这种做法在分布式环境下还需要对定时扫描做特殊处理(加分布式锁)避免任务被重复执行. 然而使用RabbitMQ实现延时任务可以天然解决分布式环境下重复执行的问题(利用mq中消息只会被一个消费者消费这一特性可以让延时任务只会被一个消费者执行).基于RabbitMQ做延时任务的核心是利用RabbitMQ的消息到期转发特性.发送消息时设置消息到期时间,等消息到期未被消费时会将消息转发到一

[转帖]分布式事务之解决方案(XA和2PC)

分布式事务之解决方案(XA和2PC) https://zhuanlan.zhihu.com/p/93459200 3. 分布式事务解决方案之2PC(两阶段提交) 针对不同的分布式场景业界常见的解决方案有2PC.TCC.可靠消息最终一致性.最大努力通知这几种. 3.1. 什么是2PC 2PC即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段(Prepare phase).提交阶段(commit phase),2是指两阶段,P是指准备阶段,C是提交阶段.举例 :张三和李四好久不见,老友约起聚餐

分布式任务调度的解决方案

简介 随着系统规模的发展,定时任务数量日益增多,任务也变得越来越复杂,尤其是在分布式环境下,存在多个业务系统,每个业务系统都有定时任务的需求,如果都在自身系统中调度,一方面增加业务系统的复杂度,另一方面也不方便管理,因此需要有一个任务平台对分散的任务进行统一管理调度,基于目前的情况,任务平台需要支持以下几个方面: 1.任务统一管理,提供图形化界面对任务进行配置和调度. 2.任务并发控制,同一个任务在同一时间只能允许一个执行. 3.任务弹性扩容,可根据繁忙情况动态增减服务器分摊压力,对大任务进行分

传统Webform 跨域调用 MVC 4 Web API实现分布式 无法解析 解决方案

前言: 正好在大概7月1日学习一份kendo ui的资料时发现一共5个章节,前3个章节都是用来讲Web API了,既然都已经看了索性就尝试一下,也不做深层次了解了,于是查阅了一些资料尝试写一个基于MVC 4 的WebAPI Demo. 正文: 下文简略介绍服务端的部分实现: 创建一个MVC4 基本项目,也就是不需要自己创建一些基本的注册绑定了. 直接新建一个Controller以及一个Model,如下所示: UserModel.cs 1 public class UserModel 2 { 3