一.分布式延时任务
传统做法是将延时任务插入数据库,使用定时去扫描,比对任务是否到期,到期则执行并设置任务状态为完成。这种做法在分布式环境下还需要对定时扫描做特殊处理(加分布式锁)避免任务被重复执行。
然而使用RabbitMQ实现延时任务可以天然解决分布式环境下重复执行的问题(利用mq中消息只会被一个消费者消费这一特性可以让延时任务只会被一个消费者执行)。基于RabbitMQ做延时任务的核心是利用RabbitMQ的消息到期转发特性。发送消息时设置消息到期时间,等消息到期未被消费时会将消息转发到一个新的队列,新队列的消费者收到消息后再处理,利用这种时间差特性实现任务的延时触发。
二.准备RabbitMQ并设置延时任务用到的相关队列
1.安装erlang和RabbitMQ(注意erlang与RabbitMQ的版本对应关系)
2.开启rabbitmq_management
打开RabbitMQ Command Prompt输入命令:rabbitmq-plugins enable rabbitmq_management
3.创建两个Exchange
创建一个Exchange用于添加延时任务,相关配置如下
再创建一个Exchange用于接收到期的延时任务,相关配置如下
4.创建两个Queue
创建第一个Queue,用于添加延时任务,相关配置如下
上面配置创建了一个队列q1,设置到期消息被转移的目的地Exchange(dlx)和Route key(dlx_rk)
接下来配置q1绑定的Exchange为ExQ1,Route key为send
再创建第二个Queue,用于接收队列q1中到期被转移的任务,相关配置如下
并绑定到Exchange:dlx,Route key:dlx_rk
通过上面两个Exchange和两个Queue的配置,让RabbitMQ支持q1中的消息到期后转移到q2中。所以业务上我们只用将延时任务发送到q1,让任务到期触发执行的业务代码去监听(消费)q2。这样基本上就实现了分布式环境下延时任务的创建以及到期调度触发执行。
三.具体代码实现
1.创建简单maven项目,添加如下依赖
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.3</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.58</version> </dependency> </dependencies>
2.封装用到的RabbitMQ操作
1 import com.rabbitmq.client.*; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 /** 7 * 1.连接RabbitMQ 8 * 2.添加延时任务 9 * 3.消费延时任务 10 */ 11 public class RabbitMQUtil { 12 13 private static Connection conn; 14 private static Channel channel; 15 16 /** 17 * 初始化RabbitMQ连接与channel 18 */ 19 static { 20 ConnectionFactory factory = new ConnectionFactory(); 21 factory.setUsername("guest"); 22 factory.setPassword("guest"); 23 factory.setVirtualHost("/"); 24 factory.setHost("localhost"); 25 factory.setPort(5672); 26 27 try { 28 conn = factory.newConnection(); 29 channel = conn.createChannel(); 30 } catch (IOException e) { 31 System.out.println("获取RabbitMQ连接失败"); 32 } catch (TimeoutException e) { 33 System.out.println("获取RabbitMQ连接超时"); 34 } 35 } 36 37 // public static void close() throws IOException, TimeoutException { 38 // if (Objects.nonNull(channel)) 39 // channel.close(); 40 // if (Objects.nonNull(conn)) 41 // conn.close(); 42 // } 43 44 /** 45 * 向指定exchange下route key发送延时任务 46 * @param msg 延时任务JSON bytes 47 * @param exchangeName 48 * @param routingKey 49 * @param expiration 延时时间 50 */ 51 public static void addTask(byte[] msg, String exchangeName, String routingKey, int expiration) { 52 try { 53 channel.basicPublish(exchangeName, routingKey, 54 new AMQP.BasicProperties.Builder() 55 .expiration(String.valueOf(expiration)) 56 .build(), msg); 57 } catch (IOException e) { 58 e.printStackTrace(); 59 } 60 } 61 62 /** 63 * 消费指定queue的消息(延时任务) 64 * @param queueName 65 * @param handler 任务处理器 66 * @param consumerTag 消费者标签(多个消费者同时消息同一queue时可以使用consumerTag作区分) 67 */ 68 public static void bindConsumer(String queueName, DemoTaskHandler handler, String consumerTag) { 69 try { 70 channel.basicConsume(queueName, false, consumerTag, 71 new DefaultConsumer(channel) { 72 @Override 73 public void handleDelivery(String consumerTag, 74 Envelope envelope, 75 AMQP.BasicProperties properties, 76 byte[] body) 77 throws IOException { 78 long deliveryTag = envelope.getDeliveryTag(); 79 // (process the message components here ...) 80 handler.execute(body, consumerTag); 81 channel.basicAck(deliveryTag, false); // 应答,告知queue成功收到消息 82 } 83 }); 84 } catch (IOException e) { 85 e.printStackTrace(); 86 } 87 } 88 89 }
3.模拟延时任务POJO
1 import java.io.Serializable; 2 3 public class DemoTask implements Serializable { 4 5 private int id; 6 7 public int getId() { 8 return id; 9 } 10 11 public void setId(int id) { 12 this.id = id; 13 } 14 }
4.延时任务处理器
1 import com.alibaba.fastjson.JSON; 2 3 public class DemoTaskHandler { 4 5 public void execute(byte[] body, String consumerTag) { 6 DemoTask task = JSON.parseObject(new String(body), DemoTask.class); 7 System.out.println(consumerTag + "收到延时任务id:" + task.getId() + " 并处理完毕"); 8 } 9 }
5.设计一个主程序往q1队列发送延时任务
1 import com.alibaba.fastjson.JSON; 2 3 import java.util.Scanner; 4 5 public class Producer { 6 7 public static void main(String[] args) { 8 // 添加延时任务 9 System.out.println("按下键盘添加延时任务"); 10 Scanner sc = new Scanner(System.in); 11 int i = 1; 12 while (sc.hasNextLine()) { 13 sc.nextLine(); 14 DemoTask bo = new DemoTask(); 15 bo.setId(i++); 16 RabbitMQUtil.addTask(JSON.toJSONString(bo).getBytes(), 17 "ExQ1", 18 "send", 19 10000); 20 System.out.println("成功添加一个延时任务"); 21 } 22 } 23 24 }
6.创建两个消费者(处理延时任务的业务)消费延时任务,模拟分布式环境
1 public class Consumer1 { 2 3 public static void main(String[] args) { 4 // 模拟分布式环境,处理到期的延时任务 5 RabbitMQUtil.bindConsumer("q2", 6 new DemoTaskHandler(), 7 "consumer1"); 8 9 } 10 11 }
1 public class Consumer2 { 2 3 public static void main(String[] args) { 4 // 模拟分布式环境,处理到期的延时任务 5 RabbitMQUtil.bindConsumer("q2", 6 new DemoTaskHandler(), 7 "consumer2"); 8 } 9 }
7.运行Producer,Consumer1,Consumer2观察结果
通过观察发现,每次发送一个延时任务后,过10秒会被consumer1或者consumer2消费,以上就基本实现了分布式延时任务调度。
原文地址:https://www.cnblogs.com/yhcjhun/p/11727906.html