Redis集群环境下的键值空间监听事件实现方案

一直想记录工作中遇到的问题和解决的方法,奈何没有找到一方乐土,最近经常反思,是否需要记录平时的点滴,后台还是决定下定决心记录一些,以便以后用到的时候找不着,实现这样的一个功能主要也是业务所需要的。

需求:要求统计所有会员在线人数,并根据会员在线状态同步改变人数。

之前用户登录使用session去控制,那么可以通过session进行在线用户人数统计,后来实现无状态不在依赖session作为用户在线的标准,使用Redis替换了Session,那么用户直接退出也好做,但是会存在用户直接关闭页面的情况,那么这个时候用户的缓存凭证没有主动触发去主动删掉,所以思来想去查了一些资料通过缓存的Key监听事件来处理,但是网上的大都是单机版的,对于集群环境下的就少之又少,由于集群是有多个节点,并且key采用的是分片的方式存储在不同片区,然而使用Spring的RedisTemplate的又不支持集群环境下的监听事件,由于每次与Redis服务系保持一个有效连接就可以,那么就有可能某个key所在的片区并没有被监听到事件,因此需要在源码上做一些调整,认为让它遍历所有集群节点用来监听集群中的key。所以通过翻阅资料实现下面的功能,还算圆满的完成了需求任务,当然如果看官看到某些似曾相识的地方请谅解,我也是从大家的经验中寻找方法有些地方与大家的相似也属正常。

第一步:修改Redis的配置文件,这一步可让《运维》同事协助操作,在配置文件中添加如下内容:

Redis的配置文件:

############################# EVENT NOTIFICATION ##############################

# Redis can notify Pub/Sub clients about events happening in the key space.
# This feature is documented at http://redis.io/topics/notifications
#
# For instance if keyspace events notification is enabled, and a client
# performs a DEL operation on key "foo" stored in the Database 0, two
# messages will be published via Pub/Sub:
#
# PUBLISH [email protected]__:foo del
# PUBLISH [email protected]__:del foo
#
# It is possible to select the events that Redis will notify among a set
# of classes. Every class is identified by a single character:
#
# K Keyspace events, published with [email protected]<db>__ prefix.
# E Keyevent events, published with [email protected]<db>__ prefix.
# g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
# $ String commands
# l List commands
# s Set commands
# h Hash commands
# z Sorted set commands
# x Expired events (events generated every time a key expires)
# e Evicted events (events generated when a key is evicted for maxmemory)
# A Alias for g$lshzxe, so that the "AKE" string means all the events.
#
# The "notify-keyspace-events" takes as argument a string that is composed
# of zero or multiple characters. The empty string means that notifications
# are disabled.
#
# Example: to enable list and generic events, from the point of view of the
# event name, use:
#
# notify-keyspace-events Elg
#
# Example 2: to get the stream of the expired keys subscribing to channel
# name [email protected]__:expired use:
#
notify-keyspace-events Ex
#
# By default all notifications are disabled because most users don‘t need
# this feature and the feature has some overhead. Note that if you don‘t
# specify at least one of K or E, no events will be delivered.

此时我们需要开启缓存的键空间通知事件的配置:notify-keyspace-events Ex

第二步配置Redis信息,我采用的是yml格式文件,同时配置了三套模式分别为单机模式、哨兵模式、集群模式,各位看官在配置文件中可自行开启或关闭。

#缓存配置
  redis:
    database: 0
    #host: 127.0.0.1
    #port: 6379
    #sentinel:
      #master: mymaster
      #nodes: 192.168.0.223:27001
    #timeout: 6000ms
    password: Aa123456
    cluster:
      max-redirects: 3   #获取失败 最大重定向次数
      nodes:
        - 192.168.104.7:6379
        - 192.168.104.7:6380
        - 192.168.104.8:6379
        - 192.168.104.8:6380
        - 192.168.104.9:6379
        - 192.168.104.9:6380
    lettuce:
      pool:
        max-active: 1000  #连接池最大连接数(使用负值表示没有限制)
        max-idle: 10      #连接池中的最大空闲连接
        min-idle: 5       #连接池中的最小空闲连接
        max-wait: 3000      #连接池最大阻塞等待时间(使用负值表示没有限制)

第三步重写缓存的默认配置函数了,并绑定监听的主题,从程序中我们可以看到"[email protected]__:expired" 意思就是订阅Redis的第一个数据库的键值失效事件,这里需要多说一下,Redis有16个数据库,系统默认使用第一个苦也就是0,如果你在配置的时候不想使用系统默认数据库,你可以通过配置文件指定库,那么你这里就需要根据你指定的库做键值事件。

import org.springframework.beans.factory.BeanFactory;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnection;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

import redis.clients.jedis.Jedis;

@Configuration
@ConditionalOnClass({ JedisConnection.class, RedisOperations.class, Jedis.class, MessageListener.class })
@AutoConfigureAfter({ JacksonAutoConfiguration.class,RedisAutoConfiguration.class })
public class RedisAutoConfiguration {

    @Configuration
    @ConditionalOnExpression("!‘${spring.redis.host:}‘.isEmpty()")
    public static class RedisStandAloneAutoConfiguration {
        @Bean
        public RedisMessageListenerContainer customizeRedisListenerContainer(
                RedisConnectionFactory redisConnectionFactory,MessageListener messageListener) {
            RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
            redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
            redisMessageListenerContainer.addMessageListener(messageListener,new PatternTopic("[email protected]__:expired"));
            return redisMessageListenerContainer;
        }
    }

    @Configuration
    @ConditionalOnExpression("‘${spring.redis.host:}‘.isEmpty()")
    public static class RedisClusterAutoConfiguration {
        @Bean
        public RedisMessageListenerFactory redisMessageListenerFactory(BeanFactory beanFactory,
                RedisConnectionFactory redisConnectionFactory) {
            RedisMessageListenerFactory beans = new RedisMessageListenerFactory();
            beans.setBeanFactory(beanFactory);
            beans.setRedisConnectionFactory(redisConnectionFactory);
            return beans;
        }
    }
}

第四步实现《org.springframework.context.ApplicationListener》的onApplicationEvent方法,主要的目的就是监听集群中的所有节点,并且给《org.springframework.data.redis.listener.RedisMessageListenerContainer》创建一个键空间的主题事件。

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

import redis.clients.jedis.JedisShardInfo;

public class RedisMessageListenerFactory implements BeanFactoryAware, ApplicationListener<ContextRefreshedEvent> {

    @Value("${spring.redis.password}")
    private String password;

    private DefaultListableBeanFactory beanFactory;

    private RedisConnectionFactory redisConnectionFactory;

    @Autowired
    private MessageListener messageListener;

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = (DefaultListableBeanFactory) beanFactory;
    }

    public void setRedisConnectionFactory(RedisConnectionFactory redisConnectionFactory) {
        this.redisConnectionFactory = redisConnectionFactory;
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        RedisClusterConnection redisClusterConnection = redisConnectionFactory.getClusterConnection();
        if (redisClusterConnection != null) {
            Iterable<RedisClusterNode> nodes = redisClusterConnection.clusterGetNodes();
            for (RedisClusterNode node : nodes) {
                if (node.isMaster()) {
                    String containerBeanName = "messageContainer" + node.hashCode();
                    if (beanFactory.containsBean(containerBeanName)) {
                        return;
                    }
                    JedisShardInfo jedisShardInfo = new JedisShardInfo(node.getHost(), node.getPort());
                    jedisShardInfo.setPassword(password);
                    JedisConnectionFactory factory = new JedisConnectionFactory(jedisShardInfo);
                    BeanDefinitionBuilder containerBeanDefinitionBuilder = BeanDefinitionBuilder
                            .genericBeanDefinition(RedisMessageListenerContainer.class);
                    containerBeanDefinitionBuilder.addPropertyValue("connectionFactory", factory);
                    containerBeanDefinitionBuilder.setScope(BeanDefinition.SCOPE_SINGLETON);
                    containerBeanDefinitionBuilder.setLazyInit(false);
                    beanFactory.registerBeanDefinition(containerBeanName,
                            containerBeanDefinitionBuilder.getRawBeanDefinition());

                    RedisMessageListenerContainer container = beanFactory.getBean(containerBeanName,
                            RedisMessageListenerContainer.class);
                    String listenerBeanName = "messageListener" + node.hashCode();
                    if (beanFactory.containsBean(listenerBeanName)) {
                        return;
                    }
                    container.addMessageListener(messageListener, new PatternTopic("[email protected]__:expired"));
                    container.start();
                }
            }
        }
    }

}

第五步实现监听事件触发后的业务代码

import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

import com.cn.tianxia.api.common.v2.CacheKeyConstants;
import com.cn.tianxia.api.project.v2.OnlineUserEntity;
import com.cn.tianxia.api.service.v2.OnlineUserService;
import com.cn.tianxia.api.utils.SpringContextUtils;

/**
 * @ClassName KeyExpiredEventMessageListener
 * @Description redis失效事件
 * @author Hardy
 * @Date 2019年5月20日 下午2:53:33
 * @version 1.0.0
 */
@Component
public class KeyExpiredEventMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String expired = message.toString();
        String onlineKey = CacheKeyConstants.ONLINE_USER_KEY_UID;
        if (expired.contains(onlineKey)) {
            String uid = expired.replace(CacheKeyConstants.ONLINE_USER_KEY_UID, "");
            if (StringUtils.isNoneEmpty(uid)) {
                OnlineUserService onlineUserService = (OnlineUserService) SpringContextUtils
                        .getBeanByClass(OnlineUserService.class);
                OnlineUserEntity onlineUser = onlineUserService.getByUid(uid);
                if (onlineUser != null) {
                    onlineUser.setLogoutTime(System.currentTimeMillis());
                    onlineUser.setOffStatus((byte) 0);
                    onlineUser.setIsOff((byte) 1);
                    onlineUser.setUid(Long.parseLong(uid));
                    onlineUserService.insertOrUpdateOnlineUser(onlineUser);
                }
            }
        }
    }

}

整个过程实现这五步就完成了Redis的键值空间事件了,其实Redis本身提供订阅与发布的功能,追其根本就是通过订阅Redis服务器的发布的一个主题进行消费。

原文地址:https://www.cnblogs.com/xfearless/p/11393438.html

时间: 2024-08-02 11:04:22

Redis集群环境下的键值空间监听事件实现方案的相关文章

【Redis】redis集群与非集群环境下的jedis客户端通用开发

非集群环境下 package com.chiwei.redis; import java.util.ArrayList; import java.util.List; import org.junit.Test; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import redis.clients.jedis.

Redis集群~windows下搭建Sentinel环境及它对主从模式的实际意义

Redis集群~windows下搭建Sentinel环境及它对主从模式的实际意义 学习了:https://www.cnblogs.com/lori/p/5825691.html 哨兵机制: 原文地址:https://www.cnblogs.com/stono/p/9003930.html

CAS服务器集群和客户端集群环境下的单点登录和单点注销解决方案

CAS的集群环境,包括CAS的客户应用是集群环境,以及CAS服务本身是集群环境这两种情况.在集群环境下使用CAS,要解决两个问题,一是单点退出(注销)时,CAS如何将退出请求正确转发到用户session所在的具体客户应用服务器,而不是转发到其他集群服务器上,二是解决CAS服务端集群环境下各种Ticket信息的共享. CAS集群部署 由于CAS Server是一个Web应用,因此可部署在Tomcat等容器中.直接部署CAS集群并使用负载均衡配置后,由于每次访问的CAS Server不固定,会发生通

分布式集群环境下,如何实现session共享三(环境搭建)

这是分布式集群环境下,如何实现session共享系列的第三篇.在上一篇:分布式集群环境下,如何实现session共享二(项目开发)中,准备好了一个通过原生态的servlet操作session的案例.本篇需要搭建相关的环境,包括:tomcat.nginx.redis. 1.通过两个tomcat搭建集群:tomcat_1.tomcat_2 2.通过nginx实现负载均衡 3.通过redis存储session 1.安装tomcat 1.1.tomcat_1 上传tomcat_1到服务器192.168.

集群环境下如何防止定时任务重复执行?

起因 最近做项目是遇到这样一个问题:系统需要在每天的16:00向一些符合特定条件的用户发送一份邮件,发送成功后修改掉数据库中对应数据的标志位.本来是没有问题的,但后来系统被部署到了集群环境下,导致每天会向这些用户发送多次同样的数据,遭到了客户的抱怨. 解决 下面来介绍一下处理这种问题的解决办法: 1.在数据库中建立tm_job_group表 Name Type Comments group_id number 组id interval number 时间间隔区分定时任务的间隔即多长时间内不可重复

Redis 集群环境添加节点失败问题

最近在给公司网管系统Redis集群环境添加节点时候遇到一个问题,提示新增的Node不为空: [[email protected] src]# ./redis-trib.rb add-node --slave --master-id4f6424e47a2275d2b7696bfbf8588e8c4c3a5b95 172.168.63.202:7001172.168.63.202:7000 ...... [OK] All nodes agree about slotsconfiguration. >

Redis集群环境搭建(实验)

环境信息: 集群中至少有奇数个主节点,所以至少三个主节点, 每个节点至少一个备份节点,所以共6个节点(master和slave各3个) 节点信息: (我这里准备了3台主机,每台主机运行一个master和一个slave) 节点1:192.168.2.100:6379     master 节点2:192.168.2.100:6380     slave 节点3:192.168.2.200:6379     master 节点4:192.168.2.200:6380     slave 节点5:19

分布式集群环境下,如何实现session共享二(项目开发)

在上一篇分布式集群环境下,如何实现session共享一(应用场景)中,介绍了在分布式集群下,需要实现session共享的应用场景.并且最后留下了一个问题:在集群环境下,如何实现session的共享呢?.要解决这个问题,放在一篇中内容量有点大,还是一步一步来吧.本篇先搭建一个基础的web应用,完全基于原生态的servlet实现.思路是这样的: 1.准备一个页面index.jsp,页面中可以提交key/value对的请求参数数据数据 2.编写一个servlet,接收页面提交的请求,获取请求参数,并且

SpringBoot系列教程之Redis集群环境配置

之前介绍的几篇redis的博文都是基于单机的redis基础上进行演示说明的,然而在实际的生产环境中,使用redis集群的可能性应该是大于单机版的redis的,那么集群的redis如何操作呢?它的配置和单机的有什么区别,又有什么需要注意的呢? 本篇将主要介绍SpringBoot项目整合redis集群,并针对这个过程中出现的问题进行说明,并给出相应的解决方案 I. 环境相关 首先需要安装redis集群环境,可以参考博文:redis-集群搭建手册 然后初始化springboot项目,对应的pom结构如