Redis的Keyspace notifications功能初探

本文出处:http://blog.csdn.net/chaijunkun/article/details/27361453,转载请注明。由于本人不定期会整理相关博文,会对相应内容作出完善。因此强烈建议在原始出处查看此文。

最近在做一套系统,其中要求若干个Worker服务器将心跳信息都上报给中央服务器。当一定时间中央服务器没有得到心跳信息时则认为该Worker失效了,发出告警。

满足这种需求的解决方法多种多样,我开始想到了memcache,上报一次心跳信息就刷新一次缓存,当缓存内心跳信息对象超时被删除,即认为对应的Worker失效。然而由于memcache的工作原理,删除都是被动的,我们无法及时判断数据是否过期,即便知道了数据过期,也没有一种机制来回调方法来执行自定义的处理动作。难道缓存架构就真的不行了吗?

答案是否定的。在Redis 2.8.0版本起,加入了“Keyspace notifications”(即“键空间通知”)的功能。如何理解该功能呢?我们来看下官方是怎么说的:

键空间通知,允许Redis客户端从“发布/订阅”通道中建立订阅关系,以便客户端能够在Redis中的数据因某种方式受到影响时收到相应事件。

可能接收到的事件举例如下:

影响一个给出的键的所有命令(会告诉你哪个键被执行了一个命令,这个命令是什么);

接收到了一个LPUSH操作的所有键(LPUSH命令:key v1 [v2 v3..]将指定的所有值从左到右进行压栈操作,形成一个栈,并将该栈命名为指定的key);

在数据库0中失效的所有键(不一定非得是数据库0,这里这样表述其实想表达可以知道影响的哪个数据库)。

看到这里我联想到,如果一条缓存数据失效了,通过订阅关系,客户端会收到消息,通过分析消息可以得知何种消息,分析消息内容可以知道是哪个key失效了。这样就可以间接实现开头所描述的功能。

接下来我们来看下实验的步骤:

1.准备redis服务器

作为开源软件,redis下载后得到的是源代码,使用tar -xzvf redis-2.8.9.tar.gz解压后对其进行编译,过程也很简单,make就可以了。编译完成之后可以使用自带的runtest进行测试,看是否编译成功。然后就是安装了,执行make PREFIX=/usr/local/redis-2.8.9 install,PREFIX参数指定的就是安装路径。现在安装的只有可执行文件,还没有配置文件。其实在源码目录中有一个模板redis.conf,我们对它进行改动就可以了。

其他配置我们不关心,但是官方文档中说Keyspace notifications功能默认是关闭的(默认地,Keyspace 时间通知功能是禁用的,因为它或多或少会使用一些CPU的资源),我们需要打开它。打开的方法也很简单,配置属性:notify-keyspace-events

默认配置是这样的:notify-keyspace-events ""

根据文档中的说明:

K     Keyspace events, published with [email protected]<db>__ prefix.
E     Keyevent events, published with [email protected]<db>__ prefix.
g     Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
$     String commands
l     List commands
s     Set commands
h     Hash commands
z     Sorted set commands
x     Expired events (events generated every time a key expires)
e     Evicted events (events generated when a key is evicted for maxmemory)
A     Alias for g$lshzxe, so that the "AKE" string means all the events.

我们配置为:notify-keyspace-events Ex,含义为:发布key事件,使用过期事件(当每一个key失效时,都会生成该事件)。

2.准备客户端和连接配置

本文中使用的客户端是Jedis,版本为2.4.2,为了代码的通用性,我使用Spring来管理连接:

<bean id="pool" class="redis.clients.jedis.JedisPool">
    	<constructor-arg>
    		<bean id="config" class="redis.clients.jedis.JedisPoolConfig">
    			<property name="maxIdle" value="0" />
    			<property name="maxTotal" value="20" />
    			<property name="maxWaitMillis" value="1000" />
    			<property name="testOnBorrow" value="true" />
   	 	</bean>
    	</constructor-arg>
    	<constructor-arg>
    		<value>192.168.1.100</value>
    	</constructor-arg>
    	<constructor-arg>
		<value>6379</value>
	</constructor-arg>
</bean>

然后使用Spring Test和Junit来测试代码

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("/applicationContext*.xml")
public class RedisSubscribeDemo {

	private static final Log Log= LogFactory.getLog(RedisSubscribeDemo.class);

	@Resource
	private JedisPool pool;

	@Test
	public void doTest() throws InterruptedException {
		for (int i = 0; i < 1; i++) {
			TestThread thread= new TestThread(pool);
			thread.start();
		}
		Thread.sleep(50000L);
		Log.info("Test finish...");
    }
}

由于要使用一定的延迟,我们把主要测试代码放到了TestThread中。当测试线程启动后,主线程停滞50秒,让我们有足够的时间来操作。

public class TestThread extends Thread {

	private Log log= LogFactory.getLog(TestThread.class);

	private JedisPool pool;

	public TestThread(JedisPool pool){
		log.info("loading test thread");
		this.pool= pool;
	}

	@Override
	public void run() {
		Jedis jedis= pool.getResource();
		jedis.psubscribe(new MySubscribe(), "*");
		try {
			Thread.sleep(10000L);
		} catch (InterruptedException e) {
			log.info("延时失败", e);
		}
		jedis.close();
		log.info("Test run finished");
	}
}

在测试线程中,我们将自定义的MySubscribe加入到了Jedis的模板订阅(即psubscribe,因为模板订阅的channel是支持星号‘*‘通配的,这样可以收集到多个通配通道的消息,而与之相反的还有一个subscribe,此订阅只能指定严格匹配的通道)中,同样为了测试过程能够将结果显示出来,在绑定了订阅后,对该线程进行了延时10秒。

public class MySubscribe extends JedisPubSub {

	private static final Log log= LogFactory.getLog(MySubscribe.class);

	// 初始化按表达式的方式订阅时候的处理
	public void onPSubscribe(String pattern, int subscribedChannels) {
    		log.info(pattern + "=" + subscribedChannels);
	}

	// 取得按表达式的方式订阅的消息后的处理
	public void onPMessage(String pattern, String channel, String message) {
    		log.info(pattern + "=" + channel + "=" + message);
	}

	...其他未用到的重写方法忽略
}

作为Jedis自定义订阅,必须继承redis.clients.jedis.JedisPubSub类,在psubscribe模式下,重点重写onPMessage方法,该方法为接收到模板订阅后处理事件的重要代码。pattern为在绑定订阅时使用的通配模板,channel为通配后符合条件的实际通道名称,message就不用多说了,就是事件消息内容。

3.实战

通过Redis自带的redis-cli命令,我们可以在服务端通过命令行的方式直接操作。我们运行上面的示例代码,然后迅速切换到redis-cli命令中,建立一个生命周期很短暂的数据:

127.0.0.1:6379> set chaijunkun 123 PX 100

PX参数指定生命周期单位为毫秒,100即声明周期,即100毫秒。key为chaijunkun的数据,其值为123。

当执行语句后,回显:

OK

这时我们看实例程序的输出:

*[email protected]__:expired=chaijunkun

从输出可以看出,之前指定的通配符为*,通配任何通道;之后是实际的通道名称:[email protected]__:expired,这里我们可以看到订阅收到了一个keyevent位于数据库0,事件类型为:expired,是一个过期事件;最后是chaijunkun,这个是过期数据的key。

在官方文档中,keyevent通道的格式永远是这样的:

[email protected]<db>__:prefix

对于数据过期事件,我们在绑定订阅时通配模板也可以精确地写成:

[email protected]*__:expired

通过示例代码,我们可以看到确实印证了之前的构想,实现了数据过期的事件触发(event)或者说回调(callback)。

4.其他应用

之前的代码中,对于事件的发布都是由Redis自己生成的。实际上在命令中主动发布自定义消息也是可以的,在publish命令的帮助中我们看到:

127.0.0.1:6379> help publish

  PUBLISH channel message
  summary: Post a message to a channel
  since: 2.0.0
  group: pubsub

通过参数,可以自定义通道名称和通道消息。而在Jedis中,发布API甚至做到了字节数据的级别:

jedis.publish(byte[] channel, byte[] message)

因此我们可以构想,自定一套通讯协议,channel为命令字,message为消息体,我们可以通过redis这种简单的发布/订阅机制实现消息的分发。

Redis的Keyspace notifications功能初探,布布扣,bubuko.com

时间: 2024-10-18 19:44:48

Redis的Keyspace notifications功能初探的相关文章

【redis】spring boot利用redis的Keyspace Notifications实现消息通知

前言 需求:当redis中的某个key失效的时候,把失效时的value写入数据库. github: https://github.com/vergilyn/RedisSamples 1.修改redis.conf 安装的redis服务默认是: notify-keyspace-events "",修改成 notify-keyspace-events Ex; 位置:redis安装目下的redis.windows-service.conf 或 redis.windows.conf.(具体看re

Redis Geo: Redis新增位置查询功能

转载于:http://www.itxuexiwang.com/a/shujukujishu/redis/2016/0216/144.html 移动互联网增进了人与人之间的联系,其中基于位置信息的服务(Location Based Service,LBS)起到很重要的促进作用.在移动互联网的大环境下,每个手机都变成了一个位置追踪设备,为人们提供了非常丰富的位置服务.无论是附近的人,还是摇一摇等耳熟能详的应用都需要LBS在后台的支撑.但是,目前位置信息的使用过程中存在诸多挑战如相邻计算不准确等.由于

转:Redis Geo: Redis新增位置查询功能

原文来自于:http://www.infoq.com/cn/news/2015/07/redis-geo 移动互联网增进了人与人之间的联系,其中基于位置信息的服务(Location Based Service,LBS)起到很重要的促进作用.在移动互联网的大环境下,每个手机都变成了一个位置追踪设备,为人们提供了非常丰富的位置服务.无论是附近的人,还是摇一摇等耳熟能详的应用都需要LBS在后台的支撑.但是,目前位置信息的使用过程中存在诸多挑战如相邻计算不准确等.由于经常面对海量数据请求,通常位置服务的

通过Keepalived实现Redis Failover自动故障切换功能

通过Keepalived实现Redis Failover自动故障切换功能[实践分享] 参考资料: http://patrick-tang.blogspot.com/2012/06/redis-keepalived-failover-system.html http://deidara.blog.51cto.com/400447/302402 目前,Redis还没有一个类似于MySQL Proxy或Oracle RAC的官方HA方案.Redis作者有一个名为Redis Sentinel的计划(ht

Python操作redis的订阅发布功能

安装redis-server yum -y install gcc gcc-c++    #安装编译工具 cd /opt wget -c http://download.redis.io/releases/redis-3.0.5.tar.gz   #下载包 tar xf redis-3.0.5.tar.gz     #解压 cd redis-3.0.5 make MALLOC=libc  #编译 make PREFIX=/usr/local/redis install  #安装 echo 'PA

redis实现音乐排行榜功能

sorted_set类型 新的存储需求:数据排序有利于数据的有效展示,需要提供一种可以根据自身特征进行排序的方式 需要的存储结构:新的存储模型,可以保存可排序的数据 sorted_set类型:在set的存储结构基础上添加可排序字段 基本操作 添加数据 zadd key score1 member1 [score2 member2] 获取全部数据 zrange key start stop [WITHSCORES] zrevrange key start stop [WITHSCORES] 删除数

Redis实践操作之—— 直播视频定时控制【TCP长连接框架(WorkerMan)+键空间通知的机制 ( Keyspace Notifications)+短信接口(API)】

一.思路梳理 同步直播视频到Redis 用户观看直播模板,点击直播按钮,检查是否有权限. 直播定时免费观看(免费观看10分钟),用户点击播放按钮开始,异步检查获取直播活动设置的免费观看时间(后台维护人员设置,Redis的hash存储信息),是否是直播. 是直播视频:判断该客户是否已经观看过了免费的20分钟时间, 没有看过,则获取该直播视频的免费时间根据活动ID,同时设置该直播视频的过期时间(只针对该用户自己哦),返回个模板,说:这个人可以观看的. 直播视频已经看过了,则不可以继续观看哦!嘻嘻..

redis技巧--自动完成功能实现

自动完成功能一般都伴随搜索框出现,就是用户在输入时帮助其自动补全. 比如对成语进行补全,现有如下成语:一心一意,一心二用,一帆风顺. 两种实现方式: 实现方式一: 为每个成语的每个前缀都使用一个集合类型键来存储该前缀对应的成语名,并且为了实现排序,我们使用有序集合,并score都为0,这样就按元素值的字典序排序.如果想要实现按照词的热度排序,需要再创建一个有序集合,存放词和score,最后把查询结果和这个集合做交集即可,这样可以避免更新一个词的热度需要更新多个集合的情况,因为一个词会出现在多个集

redis+php实现微博功能(一)

(一).微博功能概况 微博用户账号注册 微博用户登录 微博发布 添加微博好友(粉丝) 微博推送 微博冷数据写入mysql数据库 (二).redis数据结构设计 这节分享微博用户注册与登录:我们完全采用redis作为数据库来实现注册于登录先来看一下redis数据结构的设计: 注册用户表:user set global:userid set user:userid:1:username zhangshan set user:userid:1:password 1212121212 set user: