一直想记录工作中遇到的问题和解决的方法,奈何没有找到一方乐土,最近经常反思,是否需要记录平时的点滴,后台还是决定下定决心记录一些,以便以后用到的时候找不着,实现这样的一个功能主要也是业务所需要的。
需求:要求统计所有会员在线人数,并根据会员在线状态同步改变人数。
之前用户登录使用session去控制,那么可以通过session进行在线用户人数统计,后来实现无状态不在依赖session作为用户在线的标准,使用Redis替换了Session,那么用户直接退出也好做,但是会存在用户直接关闭页面的情况,那么这个时候用户的缓存凭证没有主动触发去主动删掉,所以思来想去查了一些资料通过缓存的Key监听事件来处理,但是网上的大都是单机版的,对于集群环境下的就少之又少,由于集群是有多个节点,并且key采用的是分片的方式存储在不同片区,然而使用Spring的RedisTemplate的又不支持集群环境下的监听事件,由于每次与Redis服务系保持一个有效连接就可以,那么就有可能某个key所在的片区并没有被监听到事件,因此需要在源码上做一些调整,认为让它遍历所有集群节点用来监听集群中的key。所以通过翻阅资料实现下面的功能,还算圆满的完成了需求任务,当然如果看官看到某些似曾相识的地方请谅解,我也是从大家的经验中寻找方法有些地方与大家的相似也属正常。
第一步:修改Redis的配置文件,这一步可让《运维》同事协助操作,在配置文件中添加如下内容:
Redis的配置文件:
############################# EVENT NOTIFICATION ############################## # Redis can notify Pub/Sub clients about events happening in the key space. # This feature is documented at http://redis.io/topics/notifications # # For instance if keyspace events notification is enabled, and a client # performs a DEL operation on key "foo" stored in the Database 0, two # messages will be published via Pub/Sub: # # PUBLISH [email protected]__:foo del # PUBLISH [email protected]__:del foo # # It is possible to select the events that Redis will notify among a set # of classes. Every class is identified by a single character: # # K Keyspace events, published with [email protected]<db>__ prefix. # E Keyevent events, published with [email protected]<db>__ prefix. # g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ... # $ String commands # l List commands # s Set commands # h Hash commands # z Sorted set commands # x Expired events (events generated every time a key expires) # e Evicted events (events generated when a key is evicted for maxmemory) # A Alias for g$lshzxe, so that the "AKE" string means all the events. # # The "notify-keyspace-events" takes as argument a string that is composed # of zero or multiple characters. The empty string means that notifications # are disabled. # # Example: to enable list and generic events, from the point of view of the # event name, use: # # notify-keyspace-events Elg # # Example 2: to get the stream of the expired keys subscribing to channel # name [email protected]__:expired use: # notify-keyspace-events Ex # # By default all notifications are disabled because most users don‘t need # this feature and the feature has some overhead. Note that if you don‘t # specify at least one of K or E, no events will be delivered.
此时我们需要开启缓存的键空间通知事件的配置:notify-keyspace-events Ex
第二步配置Redis信息,我采用的是yml格式文件,同时配置了三套模式分别为单机模式、哨兵模式、集群模式,各位看官在配置文件中可自行开启或关闭。
#缓存配置 redis: database: 0 #host: 127.0.0.1 #port: 6379 #sentinel: #master: mymaster #nodes: 192.168.0.223:27001 #timeout: 6000ms password: Aa123456 cluster: max-redirects: 3 #获取失败 最大重定向次数 nodes: - 192.168.104.7:6379 - 192.168.104.7:6380 - 192.168.104.8:6379 - 192.168.104.8:6380 - 192.168.104.9:6379 - 192.168.104.9:6380 lettuce: pool: max-active: 1000 #连接池最大连接数(使用负值表示没有限制) max-idle: 10 #连接池中的最大空闲连接 min-idle: 5 #连接池中的最小空闲连接 max-wait: 3000 #连接池最大阻塞等待时间(使用负值表示没有限制)
第三步重写缓存的默认配置函数了,并绑定监听的主题,从程序中我们可以看到"[email protected]__:expired" 意思就是订阅Redis的第一个数据库的键值失效事件,这里需要多说一下,Redis有16个数据库,系统默认使用第一个苦也就是0,如果你在配置的时候不想使用系统默认数据库,你可以通过配置文件指定库,那么你这里就需要根据你指定的库做键值事件。
import org.springframework.beans.factory.BeanFactory; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.jedis.JedisConnection; import org.springframework.data.redis.core.RedisOperations; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import redis.clients.jedis.Jedis; @Configuration @ConditionalOnClass({ JedisConnection.class, RedisOperations.class, Jedis.class, MessageListener.class }) @AutoConfigureAfter({ JacksonAutoConfiguration.class,RedisAutoConfiguration.class }) public class RedisAutoConfiguration { @Configuration @ConditionalOnExpression("!‘${spring.redis.host:}‘.isEmpty()") public static class RedisStandAloneAutoConfiguration { @Bean public RedisMessageListenerContainer customizeRedisListenerContainer( RedisConnectionFactory redisConnectionFactory,MessageListener messageListener) { RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer(); redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory); redisMessageListenerContainer.addMessageListener(messageListener,new PatternTopic("[email protected]__:expired")); return redisMessageListenerContainer; } } @Configuration @ConditionalOnExpression("‘${spring.redis.host:}‘.isEmpty()") public static class RedisClusterAutoConfiguration { @Bean public RedisMessageListenerFactory redisMessageListenerFactory(BeanFactory beanFactory, RedisConnectionFactory redisConnectionFactory) { RedisMessageListenerFactory beans = new RedisMessageListenerFactory(); beans.setBeanFactory(beanFactory); beans.setRedisConnectionFactory(redisConnectionFactory); return beans; } } }
第四步实现《org.springframework.context.ApplicationListener》的onApplicationEvent方法,主要的目的就是监听集群中的所有节点,并且给《org.springframework.data.redis.listener.RedisMessageListenerContainer》创建一个键空间的主题事件。
import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.DefaultListableBeanFactory; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.RedisClusterConnection; import org.springframework.data.redis.connection.RedisClusterNode; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import redis.clients.jedis.JedisShardInfo; public class RedisMessageListenerFactory implements BeanFactoryAware, ApplicationListener<ContextRefreshedEvent> { @Value("${spring.redis.password}") private String password; private DefaultListableBeanFactory beanFactory; private RedisConnectionFactory redisConnectionFactory; @Autowired private MessageListener messageListener; public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = (DefaultListableBeanFactory) beanFactory; } public void setRedisConnectionFactory(RedisConnectionFactory redisConnectionFactory) { this.redisConnectionFactory = redisConnectionFactory; } @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { RedisClusterConnection redisClusterConnection = redisConnectionFactory.getClusterConnection(); if (redisClusterConnection != null) { Iterable<RedisClusterNode> nodes = redisClusterConnection.clusterGetNodes(); for (RedisClusterNode node : nodes) { if (node.isMaster()) { String containerBeanName = "messageContainer" + node.hashCode(); if (beanFactory.containsBean(containerBeanName)) { return; } JedisShardInfo jedisShardInfo = new JedisShardInfo(node.getHost(), node.getPort()); jedisShardInfo.setPassword(password); JedisConnectionFactory factory = new JedisConnectionFactory(jedisShardInfo); BeanDefinitionBuilder containerBeanDefinitionBuilder = BeanDefinitionBuilder .genericBeanDefinition(RedisMessageListenerContainer.class); containerBeanDefinitionBuilder.addPropertyValue("connectionFactory", factory); containerBeanDefinitionBuilder.setScope(BeanDefinition.SCOPE_SINGLETON); containerBeanDefinitionBuilder.setLazyInit(false); beanFactory.registerBeanDefinition(containerBeanName, containerBeanDefinitionBuilder.getRawBeanDefinition()); RedisMessageListenerContainer container = beanFactory.getBean(containerBeanName, RedisMessageListenerContainer.class); String listenerBeanName = "messageListener" + node.hashCode(); if (beanFactory.containsBean(listenerBeanName)) { return; } container.addMessageListener(messageListener, new PatternTopic("[email protected]__:expired")); container.start(); } } } } }
第五步实现监听事件触发后的业务代码
import org.apache.commons.lang3.StringUtils; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Component; import com.cn.tianxia.api.common.v2.CacheKeyConstants; import com.cn.tianxia.api.project.v2.OnlineUserEntity; import com.cn.tianxia.api.service.v2.OnlineUserService; import com.cn.tianxia.api.utils.SpringContextUtils; /** * @ClassName KeyExpiredEventMessageListener * @Description redis失效事件 * @author Hardy * @Date 2019年5月20日 下午2:53:33 * @version 1.0.0 */ @Component public class KeyExpiredEventMessageListener implements MessageListener { @Override public void onMessage(Message message, byte[] pattern) { String expired = message.toString(); String onlineKey = CacheKeyConstants.ONLINE_USER_KEY_UID; if (expired.contains(onlineKey)) { String uid = expired.replace(CacheKeyConstants.ONLINE_USER_KEY_UID, ""); if (StringUtils.isNoneEmpty(uid)) { OnlineUserService onlineUserService = (OnlineUserService) SpringContextUtils .getBeanByClass(OnlineUserService.class); OnlineUserEntity onlineUser = onlineUserService.getByUid(uid); if (onlineUser != null) { onlineUser.setLogoutTime(System.currentTimeMillis()); onlineUser.setOffStatus((byte) 0); onlineUser.setIsOff((byte) 1); onlineUser.setUid(Long.parseLong(uid)); onlineUserService.insertOrUpdateOnlineUser(onlineUser); } } } } }
整个过程实现这五步就完成了Redis的键值空间事件了,其实Redis本身提供订阅与发布的功能,追其根本就是通过订阅Redis服务器的发布的一个主题进行消费。
原文地址:https://www.cnblogs.com/xfearless/p/11393438.html