1. Redis处理过期事件方式
1.1. Redis处理过期key方式
Redis key过期的方式有二:被动方式和主动方式
当clients试图访问设置了过期时间且已过期的key时,为主动过期方式。
但仅是这样是不够的,以为可能存在一些key永远不会被再次访问到,这些设置了过期时间的key也是需要在过期后被删除的。因此,Redis会周期性的随机测试一批设置了过期时间的key并进行处理。测试到的已过期的key将被删除。典型的方式为,Redis每秒做10次如下的步骤:
1.随机测试100个设置了过期时间的key
2.删除所有发现的已过期的key
3.若删除的key超过25个则重复步骤1
此为主动方式
1.2. Redis处理过期事件
@Test public void testRedisEvent() { Jedis jedis = newJedis("192.168.92.134", 8000); jedis.psubscribe(newJedisPubSub() { public voidonPMessage(String pattern, String channel, Stringmessage) { System.err.println(pattern); System.err.println(channel); System.err.println(message); } },"[email protected]*__:expired"); }
当监听过期key时,则redis将此key通过pub/sub来发送事件。
由于codis不支持的命令包含了pub/sub,则codis不再支持此监听事件。
2. 解决方案
2.1. 采用java的DelayQueue解决
DelayQueue介绍:Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed元素。如果延迟都还没有期满,则队列没有头部,并且 poll将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS)方法返回一个小于等于 0 的值时,将发生到期。即使无法使用 take或 poll移除未到期的元素,也不会将这些元素作为正常元素对待。例如,size方法同时返回到期和未到期元素的计数。此队列不允许使用 null 元素。
代码如下:
public class RedisEvent<K, V> implements Runnable{ public DelayQueue<DelayItem<Pair<K, V>>> q = new DelayQueue<DelayItem<Pair<K, V>>>(); public void run() { while (true){ DelayItem<Pair<K, V>> delayItem = null; try { delayItem = q.take(); if (delayItem != null) { // 超时对象处理 Pair<K, V> pair = delayItem.getItem(); System.out.println("key : " + pair.first + ", value : " + pair.second); } } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { RedisEvent<String, String> re = new RedisEvent<String, String>(); //过期key,value Pair<String, String> pair1 = new Pair<String, String>("key1", "value1"); //2秒过期 DelayItem<Pair<String,String>> d1 = new DelayItem<Pair<String,String>>(pair1, TimeUnit.SECONDS.toNanos(2)); re.q.add(d1); System.out.println(re.q.poll()); Thread.sleep(1000 * 1); System.out.println(re.q.poll()); //2秒后可以拿到数据 Thread.sleep(1000 * 2); DelayItem<Pair<String,String>> delayItem = re.q.poll(); System.out.println(delayItem); Pair<String, String> pair =delayItem.getItem(); System.out.println("key : " + pair.first + ", value : " + pair.second); System.out.println(re.q.poll()); }
输出结果:
null null [email protected] key : key1, value : value1 null
此方案优缺点:
优点:自己实现,简单易用,方便修改、扩展
缺点:当过期事件过多时且过期时间过长,需要DelayQueue存放过多key,消耗jvm内存
2.2. 使用第三方queue队列实现
需要queue支持delay job,目前了解的有:Beanstalkd,sidekiq
Beanstalkd介绍:
Beanstalk,一个高性能、轻量级的分布式内存队列系统,最初设计的目的是想通过后台异步执行耗时的任务来降低高容量Web应用系统的页面访问延迟,支持过有9.5 million用户的Facebook Causes应用。
Beanstalkd设计里面的核心概念:
◆ job
一个需要异步处理的任务,是Beanstalkd中的基本单元,需要放在一个tube中。
◆ tube
一个有名的任务队列,用来存储统一类型的job,是producer和consumer操作的对象。
◆ producer
Job的生产者,通过put命令来将一个job放到一个tube中。
◆ consumer
Job的消费者,通过reserve/release/bury/delete命令来获取job或改变job的状态。
Beanstalkd中一个job的生命周期如图2所示。一个job有READY, RESERVED, DELAYED, BURIED四种状态。当producer直接put一个job时,job就处于READY状态,等待consumer来处理,如果选择延迟 put,job就先到DELAYED状态,等待时间过后才迁移到READY状态。consumer获取了当前READY的job后,该job的状态就迁移到RESERVED,这样其他的consumer就不能再操作该job。当consumer完成该job后,可以选择delete, release或者bury操作;delete之后,job从系统消亡,之后不能再获取;release操作可以重新把该job状态迁移回READY(也 可以延迟该状态迁移操作),使其他的consumer可以继续获取和执行该job;有意思的是bury操作,可以把该job休眠,等到需要的时候,再将休 眠的job kick回READY状态,也可以deleteBURIED状态的job。正是有这些有趣的操作和状态,才可以基于此做出很多意思的应用,比如要实现一个循环队列,就可以将RESERVED状态的 job休眠掉,等没有READY状态的job时再将BURIED状态的job一次性kick回READY状态。
优点:不需要自己实现,使用第三方queue
缺点:
1. Beanstalkd目前没有提供主从同步+故障切换机制,在应用中有可能成为单点的风险。在实际应用中,可以使用数据库为job提供持久化存储。
2. 需要花费成本学习第三方queue
2.3. 使用数据库实现
将过期key,value放入数据库中,使用线程定时扫描数据库。
启动三个线程:
线程1:根据过期时间,扫描状态为delay且过期时间已到的事件,将状态改为ready
线程2:处理状态为ready的事件,将ready状态改为done
线程3:删除done状态的事件
优点:数据持久化,不会丢失事件数据
缺点:线程定时扫描,过期事件存在延迟处理
2.4. 使用redis实现(推荐)
由于codis不支持pub/sub,则重新添加redis服务,此服务只用来做过期事件处理
优点:实现方式与原来相同,不需要修改任何代码
缺点:需要添加机器来做redis服务,主备需采用keepalive等处理