一篇详解Redis -- 延时队列

Redis的 list 数据结构常用来作为 异步消息队列 使用,使用 rpush/lpush 操作 入队 ,使用 lpop/rpop 来操作 出队

> rpush my-queue apple banana pear
(integer) 3
> llen my-queue
(integer) 3
> lpop my-queue
"apple"
> llen my-queue
(integer) 2
> lpop my-queue
"banana"
> llen my-queue
(integer) 1
> lpop my-queue
"pear"
> llen my-queue
(integer) 0
> lpop my-queue
(nil)


空队列

  1. 如果队列为空,客户端会陷入 pop的死循环 , 空轮询 不仅拉高了 客户端的CPU , Redis的QPS 也会被拉高
  2. 如果空轮询的客户端有几十个, Redis的慢查询 也会显著增加,可以尝试让客户端线程 sleep 1s
  3. 但睡眠会导致消息的 延迟增大 ,可以使用 blpop/brpop (blocking, 阻塞读 )
  4. 阻塞读在队列没有数据时,会立即进入 休眠 状态,一旦有数据到来,会立即被 唤醒 , 消息延迟几乎为0


空闲连接

  1. 如果线程一直阻塞在那里,Redis的客户端连接就成了 闲置连接
  2. 闲置过久, 服务器 一般会 主动断开 连接, 减少闲置的资源占用 ,此时 blpop/brpop 会 抛出异常


锁冲突处理

  1. 分布式锁 加锁失败 的处理策略
  2. 直接抛出异常 ,通知用户稍后重试
  3. sleep 后再重试
  4. 将请求转移到 延时队列 ,过一会重试
  5. 抛出异常
  6. 这种方式比较适合由 用户直接发起 的请求
  7. sleep
  8. sleep会 阻塞 当前的消息处理线程,从而导致队列的后续消息处理出现 延迟
  9. 如果 碰撞比较频繁 ,sleep方案不合适
  10. 延时队列
  11. 比较适合异步消息处理的场景,通过将当前冲突的请求转移到另一个队列 延后处理 来 避免冲突


延时队列

  1. 可以通过Redis的 zset 来实现延时队列
  2. 将消息序列化成一个字符串作为zet的 value ,将该消息的 到期处理时间 作为 score
  3. 然后 多线程轮询 zset获取 到期的任务 进行处理
  4. 多线程是为了保障 可用性 ,但同时要考虑 并发安全 ,确保 任务不能被多次执行

public class RedisDelayingQueue<T> {
 @Data
 @AllArgsConstructor
 @NoArgsConstructor
 private static class TaskItem<T> {
 private String id;
 private T msg;
 }
 private Type taskType = new TypeReference<TaskItem<T>>() {
 }.getType();
 private Jedis jedis;
 private String queueKey;
 public RedisDelayingQueue(Jedis jedis, String queueKey) {
 this.jedis = jedis;
 this.queueKey = queueKey;
 }
 public void delay(T msg) {
 TaskItem<T> task = new TaskItem<>(UUID.randomUUID().toString(), msg);
 jedis.zadd(queueKey, System.currentTimeMillis() + 5000, JSON.toJSONString(task));
 }
 public void loop() {
 // 可以进一步优化,通过Lua脚本将zrangeByScore和zrem统一挪到Redis服务端进行原子化操作,减少抢夺失败出现的资源浪费
 while (!Thread.interrupted()) {
 // 只取一条
 Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
 if (values.isEmpty()) {
 try {
 Thread.sleep(500);
 } catch (InterruptedException e) {
 break;
 }
 continue;
 }
 String s = values.iterator().next();
 if (jedis.zrem(queueKey, s) > 0) {
 // zrem是多线程多进程争夺任务的关键
 TaskItem<T> task = JSON.parseObject(s, taskType);
 this.handleMsg(task.msg);
 }
 }
 }
 private void handleMsg(T msg) {
 try {
 System.out.println(msg);
 } catch (Throwable ignored) {
 // 一定要捕获异常,避免因为个别任务处理问题导致循环异常退出
 }
 }
 public static void main(String[] args) {
 final RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(new Jedis("localhost", 16379), "q-demo");
 Thread producer = new Thread() {
 @Override
 public void run() {
 for (int i = 0; i < 10; i++) {
 queue.delay("zhongmingmao" + i);
 }
 }
 };
 Thread consumer = new Thread() {
 @Override
 public void run() {
 queue.loop();
 }
 };
 producer.start();
 consumer.start();
 try {
 producer.join();
 Thread.sleep(6000);
 consumer.interrupt();
 consumer.join();
 } catch (InterruptedException ignored) {
 }
 }
}

原文地址:https://blog.51cto.com/14528283/2448034

时间: 2024-10-11 06:51:24

一篇详解Redis -- 延时队列的相关文章

Redis学习——详解Redis配置文件(三)

一.Redis脚本简介 在我们介绍Redis的配置文件之前,我们先来说一下Redis安装完成后生成的几个可执行文件: redis-server .redis-cli .redis-benchmark .redis-stat .redis-check-dump. redis-cgecj-aof : redis-server:Redis 服务器的daemon启动程序. redis-cli:Redis 命令行执行工具.当然,你也可以用telnet根据其纯文本协议来操作. redis-benchmark

U-Mail邮件服务器详解邮件延时

在快节奏的信息社会,我们要求邮件"又好又快"的传送,即保证高安全性之外,还需要及时传 输.但是由于国际互联网环境的复杂特殊性及企业邮件系统自身原因,邮件延迟送达的现象很常见,这往往给企业业务拓展带来不利影响,毕竟商机转瞬即逝,谁能 抢先一步抓住,谁就能独占鳌头守住财富.那么,邮件延迟是什么原因呢?有没有好的解决方案?据统计,目前对于这个问题针对措施做得最好的是U-Mail邮 件服务器,小编特意请来U-Mail何工程师详解一番. 问:邮件延时对于企业,尤其是外贸类公司来说,既常见,同时负

【详解】消息队列和线程关系

1.进程-线程-消息队列 简单的来说,什么是进程?什么是线程?打个比方,你的程序要执行,操作系统就会把你的exe文件加载到内存中,那就生成一个进程了(当然还包含分配到的资源等):对于线程,你可以理解成是一个程序里的不同部分,这有点类似函数,所不同的是各个线程是同时执行的. 例如,你的主线程创建了另一个副线程,那么这两个线程是同时在工作的,不存在调用 - 返回的概念. 一个进程里可以有多个线程在执行,称为执行实例. shining:因为线程的资源是从进程资源中分配出来的,因此同一个进程中的多个线程

PHP函数篇详解十进制、二进制、八进制和十六进制转换函数说明

PHP函数篇详解十进制.二进制.八进制和十六进制转换函数说明 作者: 字体:[增加 减小] 类型:转载 中文字符编码研究系列第一期,PHP函数篇详解十进制.二进制.八进制和十六进制互相转换函数说明,主要掌握各进制转换的方法,以应用于实际开发 一,十进制(decimal system)转换函数说明 1,十进制转二进制 decbin() 函数,如下实例 echo decbin(12); //输出 1100 echo decbin(26); //输出 11010 decbin (PHP 3, PHP

走向DBA[MSSQL篇] 详解游标

原文:走向DBA[MSSQL篇] 详解游标 前篇回顾:上一篇虫子介绍了一些不常用的数据过滤方式,本篇详细介绍下游标. 概念 简单点说游标的作用就是存储一个结果集,并根据语法将这个结果集的数据逐条处理. 观点 正因为游标可以将结果集一条条取出处理,所以会增加服务器的负担.再者使用游标的效率远远没有使用默认的结果集效率高,在默认结果集中,从客户端发送到服务器的唯一一个数据包是包含需执行语句的数据包.而在使用服务器游标时,每一个FETCH语句都必须从客户端发送到服务器,然后在服务器中将它解析并编译为执

详解redis持久化

我们的Redis必须使用数据持久化吗?如果我们的Redis服务器只作为缓存使用,Redis中存储的所有数据都是从其他地方同步过来的备份,那么就没必要开启数据持久化的选项.Redis提供了将数据定期自动持久化至硬盘的能力,包括RDB和AOF两种方案,两种方案分别有其长处和短板,可以配合起来同时运行,确保数据的稳定性. vRedis 持久化 Redis 提供了多种不同级别的持久化方式: RDB 持久化可以在指定的时间间隔内生成数据集的时间点快照(point-in-time snapshot). AO

详解 Redis 应用场景及应用实例

Redis是一个开源的使用ANSI C语言编写.支持网络.可基于内存亦可持久化的日志型.Key-Value数据库,并提供多种语言的API.从2010年3月15日起,Redis的开发工作由VMware主持. 1. MySql+Memcached架构的问题 实际MySQL是适合进行海量数据存储的,通过Memcached将热点数据加载到cache,加速访问,很多公司都曾经使用过这样的架构,但随着业务数据量的不断增加,和访问量的持续增长,我们遇到了很多问题: 1.MySQL需要不断进行拆库拆表,Memc

轻松搞定高并发:详解Redis的五种数据类型及应用场景分析!

一.Redis基本概念介绍和特性 1.1 Redis基本概念介绍 1.Redis是远程的,有客户端和服务端,我们一般说的是服务端: 2.Redis是基于内存的,所以比基于硬盘的MySQL要快很多,但非常吃内存 3.Redis是非关系型数据库.本质上也是数据库,但MySQL关系型数据库存储时必须定义数据词典,而Redis则不需要. 1.2 Redis 和 Memcached比较 Redis数据类型都支持push/pop.add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的

重温JSP/Servlet技术之Jsp基础篇(详解二)

cookie 今天就讲cookie,因为在课堂上我没有听懂,所以,如其说是博客,不如说是我的复习笔记,哈哈 首先先发布一张Cookie原理图 1.什么是 Cookie  “cookie 是存储于访问者的计算机中的变量.每当同一台计算机通过浏览器请求某个页面时,就会发送这个 cookie.你可以使用 Jsp 来创建和取回 cookie 的值.”  cookie 是访问过的网站创建的文件,用于存储浏览信息,例如个人资料信息. 从Jsp的角度看,cookie 就是一些字符串信息.这些信息存放在客户端的