【redis】spring boot利用redis的Keyspace Notifications实现消息通知

前言

需求:当redis中的某个key失效的时候,把失效时的value写入数据库。

github: https://github.com/vergilyn/RedisSamples

1、修改redis.conf

安装的redis服务默认是: notify-keyspace-events "",修改成 notify-keyspace-events Ex;

位置:redis安装目下的redis.windows-service.conf 或 redis.windows.conf。(具体看redis服务加载的哪个配置, 貌似要redis 2.8+才支持)

可以在redis.conf中找到对应的描述

# K    键空间通知,以[email protected]<db>__为前缀
# E    键事件通知,以[email protected]<db>__为前缀
# g    del , expipre , rename 等类型无关的通用命令的通知, ...
# $    String命令
# l    List命令
# s    Set命令
# h    Hash命令
# z    有序集合命令
# x    过期事件(每次key过期时生成)
# e    驱逐事件(当key在内存满了被清除时生成)
# A    g$lshzxe的别名,因此”AKE”意味着所有的事件

2、通过JedisPubSub实现

省略spring boot配置,完整代码见github。

/**
 * key过期事件推送到topic中只有key,无value,因为一旦过期,value就不存在了。
 */
@Component
public class JedisExpiredListener extends JedisPubSub {
    /** 参考redis目录下redis.conf中的"EVENT NOTIFICATION", redis默认的db{0, 15}一共16个数据库
     * K    Keyspace events, published with [email protected]<db>__ prefix.
     * E    Keyevent events, published with [email protected]<db>__ prefix.
     *
     */
    public final static String LISTENER_PATTERN = "[email protected]*__:expired";

    /**
     * 虽然能注入,但貌似在listener-class中jedis无法使用(无法建立连接到redis),exception message:

* "only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context"
*/
@Autowired
private Jedis jedis;

/**
* 初始化按表达式的方式订阅时候的处理
*/
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
System.out.print("onPSubscribe >> ");
System.out.println(String.format("pattern: %s, subscribedChannels: %d", pattern, subscribedChannels));
}

/**
* 取得按表达式的方式订阅的消息后的处理
*/
@Override
public void onPMessage(String pattern, String channel, String message) {
System.out.print("onPMessage >> ");
System.out.println(String.format("key: %s, pattern: %s, channel: %s", message, pattern, channel));
}

/**
* 取得订阅的消息后的处理
*/
@Override
public void onMessage(String channel, String message) {
super.onMessage(channel, message);
}

/**
* 初始化订阅时候的处理
*/
@Override
public void onSubscribe(String channel, int subscribedChannels) {
super.onSubscribe(channel, subscribedChannels);
}

/**
* 取消订阅时候的处理
*/
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
super.onUnsubscribe(channel, subscribedChannels);
}

/**
* 取消按表达式的方式订阅时候的处理
*/
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
super.onPUnsubscribe(pattern, subscribedChannels);
}
}

@RunWith(SpringRunner.class)
@SpringBootTest(classes=JedisExpiredApplication.class)
public class JedisExpiredApplicationTest {
    @Autowired
    private Jedis jedis;
    @Autowired
    private JedisExpiredListener expiredListener;
    @Before
    public void before() throws Exception {
        jedis.flushAll();

        jedis.set(JedisConfig.DEFAULE_KEY,"123321");
        System.out.println(JedisConfig.DEFAULE_KEY + " = " + jedis.get(JedisConfig.DEFAULE_KEY));
        System.out.println("set expired 5s");
        jedis.expire(JedisConfig.DEFAULE_KEY,5);
    }

    @Test
    public void testPSubscribe(){
        /* psubscribe是一个阻塞的方法,在取消订阅该频道前,会一直阻塞在这,只有当取消了订阅才会执行下面的other code
         * 可以onMessage/onPMessage里面收到消息后,调用了unsubscribe()/onPUnsubscribe(); 来取消订阅,这样才会执行后面的other code
         */
        jedis.psubscribe(expiredListener,JedisExpiredListener.LISTENER_PATTERN);

        // other code
    }
}

输出结果:

vkey = 123321
   set expired 5s
   onPSubscribe >> pattern: [email protected]*__:expired, subscribedChannels: 1
   onPMessage >> key: vkey, pattern: [email protected]*__:expired, channel: [email protected]__:expired

3、通过实现添加MessageListener

省略spring boot的redis配置。

@SpringBootApplication
public class RedisExpiredApplication implements CommandLineRunner{
    @Autowired
    private RedisTemplate redisTemplate;
    @Autowired
    private RedisExpiredListener expiredListener;

    /**
     * 解决redisTemplate的key/value乱码问题:
     *  <br/> <a href="http://www.zhimengzhe.com/shujuku/other/192111.html">http://www.zhimengzhe.com/shujuku/other/192111.html</a>
     *  <br/> <a href="http://blog.csdn.net/tianyaleixiaowu/article/details/70595073">http://blog.csdn.net/tianyaleixiaowu/article/details/70595073</a>
     * @return
     */
    @Bean("redis")
    @Primary
    public RedisTemplate redisTemplate(){
        RedisSerializer<String> stringSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringSerializer);
        redisTemplate.setValueSerializer(stringSerializer);
        redisTemplate.setHashKeySerializer(stringSerializer);
        redisTemplate.setHashValueSerializer(stringSerializer);
        return redisTemplate;
    }

    @Bean
    public RedisMessageListenerContainer listenerContainer(RedisConnectionFactory redisConnection, Executor executor){
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        // 设置Redis的连接工厂
        container.setConnectionFactory(redisConnection);
        // 设置监听使用的线程池
//        container.setTaskExecutor(executor);
        // 设置监听的Topic: PatternTopic/ChannelTopic
        Topic topic = new PatternTopic(RedisExpiredListener.LISTENER_PATTERN);
        // 设置监听器
        container.addMessageListener(new RedisExpiredListener(), topic);
        return container;
    }

    @Bean
    public Executor executor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(100);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("V-Thread");

        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    public static void main(String[] args) {
        SpringApplication.run(RedisExpiredApplication.class,args);
    }

    @Override
    public void run(String... strings) throws Exception {
        redisTemplate.opsForValue().set("vkey", "vergilyn", 5, TimeUnit.SECONDS);
        System.out.println("init : set vkey vergilyn ex 5");
        System.out.println("thread sleep: 10s");
        Thread.sleep(10 * 1000);
        System.out.println("thread recover: get vkey = " + redisTemplate.opsForValue().get("vkey"));
    }
}
@Component
public class RedisExpiredListener implements MessageListener {
    public final static String LISTENER_PATTERN = "__key*__:*";

    /**
     * 客户端监听订阅的topic,当有消息的时候,会触发该方法;
     * 并不能得到value, 只能得到key。
     * 姑且理解为: redis服务在key失效时(或失效后)通知到java服务某个key失效了, 那么在java中不可能得到这个redis-key对应的redis-value。
     * 
     * 解决方案:
     *  创建copy/shadow key, 例如 set vkey "vergilyn"; 对应copykey: set copykey:vkey "" ex 10;
     *  真正的key是"vkey"(业务中使用), 失效触发key是"copykey:vkey"(其value为空字符为了减少内存空间消耗)。
     *  当"copykey:vkey"触发失效时, 从"vkey"得到失效时的值, 并在逻辑处理完后"del vkey"
     *
     * 缺陷:
     *  1: 存在多余的key; (copykey/shadowkey)
     *  2: 不严谨, 假设copykey在 12:00:00失效, 通知在12:10:00收到, 这间隔的10min内程序修改了key, 得到的并不是 失效时的value.
     *  (第1点影响不大; 第2点貌似redis本身的Pub/Sub就不是严谨的, 失效后还存在value的修改, 应该在设计/逻辑上杜绝)
     *  当"copykey:vkey"触发失效时, 从"vkey"得到失效时的值, 并在逻辑处理完后"del vkey"
     *
     */
    @Override
    public void onMessage(Message message, byte[] bytes) {
        byte[] body = message.getBody();// 建议使用: valueSerializer
        byte[] channel = message.getChannel();
        System.out.print("onMessage >> " );
        System.out.println(String.format("channel: %s, body: %s, bytes: %s"
                ,new String(channel), new String(body), new String(bytes)));
    }

}

输出结果:

init : set vkey vergilyn ex 5
   thread sleep: 10s
   onMessage >> channel: [email protected]__:expired, body: vkey, bytes: __key*__:*
   thread recover: get vkey = null

4、问题

1) 不管是JedisPubSub,还是MessageListener都不可能得到value。

个人理解:在12:00:00,java推送给redis一条命令”set vkey vergilyn ex 10”。此时redis服务已经完整的知道了这个key的失效时间,在12:00:10时redis服务把”vkey”失效。

然后通知到java(即回调到JedisPubSub/MessageListener),此时不可能在java中通过”vkey”得到其value。

(最简单的测试,在Listener中打断点,然后通过redis-cli.exe命令查看,“vkey”已经不存在了,但Listener才进入到message()方法)

2) redis的expire不是严格的即时执行

摘自 http://redisdoc.com/topic/notification.html

Redis 使用以下两种方式删除过期的键:

  • 当一个键被访问时,程序会对这个键进行检查,如果键已经过期,那么该键将被删除。

  • 底层系统会在后台渐进地查找并删除那些过期的键,从而处理那些已经过期、但是不会被访问到的键。

当过期键被以上两个程序的任意一个发现、 并且将键从数据库中删除时, Redis 会产生一个 expired 通知。

Redis 并不保证生存时间(TTL)变为 0 的键会立即被删除: 如果程序没有访问这个过期键, 或者带有生存时间的键非常多的话, 那么在键的生存时间变为 0 , 直到键真正被删除这中间, 可能会有一段比较显著的时间间隔。

因此, Redis 产生 expired 通知的时间为过期键被删除的时候, 而不是键的生存时间变为 0 的时候。

如果业务无法容忍从过期到删除中间的时间间隔,那么就只有用其他的方式了。

3) 如何在expire回调中得到expire key的value

参考:https://stackoverflow.com/questions/26406303/redis-key-expire-notification-with-jedis

set vkey somevalue
set shadowkey:vkey "" ex 10

相当于每个key都有对应的一个shadowkey,”shadowkey”只是用来设置expire时间,”key”才保存value及参与业务逻辑。

所以当”shadowkey”失效通知到listener时,程序中可以通过”key”得到其value,并在逻辑处理完时”del key”。

(“shadowkey”的value为null或空字符串,目的是为了节约内存空间。)

缺陷:

    1. 多余了很多无效的 shadowkey;

2. 数据不严谨。假设copykey在 12:00:00失效, 通知在12:10:00收到, 这间隔的10min内程序修改了key, 得到的并不是 失效时的value.

相对来说,第1点无关紧要,只是暂时多了一些辅助用的key,但会被程序自己清理掉,不用再去维护,或一直存在于redis缓存中。

第2点,更多的是设计逻辑有缺陷,可以把失效时间定的更长,保证在”那个间隔”内不可能出现失效key的修改。

4)  特别

摘自 http://blog.csdn.net/gqtcgq/article/details/50808729

Redis的发布/订阅目前是即发即弃(fire and forget)模式的,因此无法实现事件的可靠通知。也就是说,如果发布/订阅的客户端断链之后又重连,则在客户端断链期间的所有事件都丢失了。

未来计划支持事件的可靠通知,但是这可能会通过让订阅与发布功能本身变得更可靠来实现,也可能会在Lua脚本中对消息的订阅与发布进行监听,从而实现类似将事件推入到列表这样的操作。

参考:

redis设置键的生存时间或过期时间

Redis Key expire notification with Jedis

(以下的文章都讲的差不多)

JAVA实现redis超时失效key 的监听触发

spring boot-使用redis的Keyspace Notifications实现定时任务队列

redis 超时失效key 的监听触发

Redis键空间通知(keyspace notifications)

时间: 2025-01-25 07:18:10

【redis】spring boot利用redis的Keyspace Notifications实现消息通知的相关文章

Spring Boot使用redis实现数据缓存

基于Spring Boot 1.5.2.RELEASE版本,一方面验证与Redis的集成方法,另外了解使用方法. 集成方法 配置依赖 修改pom.xml,增加如下内容. <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> 配置Redis

Spring Boot 和 Redis 常用操作

1    第4-2课:Spring Boot 和 Redis 常用操作 Redis 是目前使用最广泛的缓存中间件,相比 Memcached,Redis 支持更多的数据结构和更丰富的数据操作,另外 Redis 有着丰富的集群方案和使用场景,这一课我们一起学习 Redis 的常用操作. 1.1    Redis 介绍 Redis 是一个速度非常快的非关系数据库(Non-Relational Database),它可以存储键(Key)与 5 种不同类型的值(Value)之间的映射(Mapping),可

Spring Boot 2.X(六):Spring Boot 集成 Redis

Redis 简介 什么是 Redis Redis 是目前使用的非常广泛的免费开源内存数据库,是一个高性能的 key-value 数据库. Redis 与其他 key-value 缓存(如 Memcached )相比有以下三个特点: 1.Redis 支持数据的持久化,它可以将内存中的数据保存在磁盘中,重启的时候可以再次加载进行使用. 2.Redis 不仅仅支持简单的 key-value 类型的数据,同时还提供 list,set,zset,hash 等数据结构的存储. 3.Redis 支持数据的备份

spring boot,gradle,redis

今天做spring boot/gradle/redis相关的项目,遇到若干问题,记录如下: spring boot  org.springframework.session.data.redis.RedisFlushMode 这个报错是与redis服务器连接的问题,查看一下在application.properties里写的redis服务器IP是不是错了,改正即可;或者redis服务器启用了protected-mode,运行Redis时使用命令: nohup redis-server --pro

SpringBoot(三) :Spring boot 中 Redis 的使用

前言: 这一篇讲的是Spring Boot中Redis的运用,之前没有在项目中用过Redis,所以没有太大的感觉,以后可能需要回头再来仔细看看. 原文出处: 纯洁的微笑 SpringBoot对常用的数据库支持外,对NoSQL 数据库也进行了封装自动化. redis介绍 Redis是目前业界使用最广泛的内存数据存储.相比memcached,Redis支持更丰富的数据结构,例如hashes, lists, sets等,同时支持数据持久化.除此之外,Redis还提供一些类数据库的特性,比如事务,HA,

Spring boot、Redis、ActiveMQ、Nginx、Mycat、Netty、Jvm大型分布式项目实战视频教程

15套java架构师.集群.高可用.高可扩展.高性能.高并发.性能优化.Spring boot.Redis.ActiveMQ.Nginx.Mycat.Netty.Jvm大型分布式项目实战视频教程 视频课程内容包含: 高级Java架构师包含:Spring boot.Spring  cloud.Dubbo.Redis.ActiveMQ.Nginx.Mycat.Spring.MongoDB.ZeroMQ.Git.Nosql.Jvm.Mecached.Netty.Nio.Mina.性能调优.高并发.to

Spring Boot学习笔记——Spring Boot与Redis的集成

一.添加Redis缓存 1.添加Redis起步依赖 在pom.xml中添加Spring Boot支持Redis的依赖配置,具体如下: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-redis</artifactId> <version>1.4.7.RELEASE</version> </

spring boot 结合Redis 实现工具类

自己整理了 spring boot 结合 Redis 的工具类引入依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId></dependency>加入配置 # Redis数据库索引(默认为0)spring.redis.database=0# Redis服务器地址

Spring Boot整合Redis

一.Spring Boot对Redis的支持 Spring对Redis的支持是使用Spring Data Redis来实现的,一般使用Jedis或者lettuce(默认),Java客户端在 org.springframework.boot.autoconfigure.data.redis(Spring Boot 2.x) 中redis的自动配置 AutoConfigureDataRedis RedisAutoConfiguration提供了RedisTemplate与StringRedisTem