jedis的publish/subscribe[转]含有redis源码解析

首先使用redis客户端来进行publish与subscribe的功能是否能够正常运行。

打开redis服务器

[[email protected] ~]# redis-server /opt/redis-2.4.10/redis.conf
[7719] 16 Apr 11:37:22 # Warning: 32 bit instance detected but no memory limit set. Setting 3.5 GB maxmemory limit with ‘noeviction‘ policy now.
[7719] 16 Apr 11:37:22 * Server started, Redis version 2.4.10
[7719] 16 Apr 11:37:22 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add ‘vm.overcommit_memory = 1‘ to /etc/sysctl.conf and then reboot or run the command ‘sysctl vm.overcommit_memory=1‘ for this to take effect. 

打开一个客户端订阅一个news.sports的channel。

[[email protected] ~]# redis-cli
redis 127.0.0.1:6379> subscribe news.sports
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news.sports"
3) (integer) 1 

可以看到已经开始了监听,向news.sports channel发布一条消息

[[email protected] ~]# redis-cli
redis 127.0.0.1:6379> publish news.sports "kaka is back"
(integer) 1

订阅的客户端顺利收到消息

redis 127.0.0.1:6379> subscribe news.sports
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news.sports"
3) (integer) 1
1) "message"
2) "news.sports"
3) "kaka is back"

接下来使用jedis来进行发布/订阅的验证

发布消息是通过jedis.publish(String channel, String message)来发布的,其实就是往redis服务器发布一条publish命令。

public void publish(final byte[] channel, final byte[] message) {
       sendCommand(PUBLISH, channel, message);
   } 

订阅消息是通过jedis.subscribe(JedisPub pub,String channel)来进行的,channel好理解,那么JedisPub是什么呢。

看源码吧。

Jedis订阅方法的源码为

public void subscribe(JedisPubSub jedisPubSub, String... channels) {
      checkIsInMulti();
      connect();
      client.setTimeoutInfinite();
      jedisPubSub.proceed(client, channels);
      client.rollbackTimeout();
  } 

可以看到,主要是通过jedisPubSub.proceed(client, channels);来进行订阅的。看proceed方法。

public void proceed(Client client, String... channels) {
       this.client = client;
       client.subscribe(channels);
       client.flush();
       process(client);
   } 

追踪client.subscribe(channels)可以看到,

public void subscribe(final byte[]... channels) {
       sendCommand(SUBSCRIBE, channels);
   } 

其只是向服务器发送了一个subcribe的命令而已。

那么要了解jedisPubSub的作用,只能看process方法了。简单看process其实是一个do...while循环

private void process(Client client) {
       do { 

       } while (isSubscribed());
   } 

我们可以猜测正是靠着这个循环来不断的读取服务器那边传到来的订阅的消息。

看主体

List<Object> reply = client.getObjectMultiBulkReply();
           final Object firstObj = reply.get(0);
           if (!(firstObj instanceof byte[])) {
               throw new JedisException("Unknown message type: " + firstObj);
           }
           final byte[] resp = (byte[]) firstObj;
           if (Arrays.equals(SUBSCRIBE.raw, resp)) {
               subscribedChannels = ((Long) reply.get(2)).intValue();
               final byte[] bchannel = (byte[]) reply.get(1);
               final String strchannel = (bchannel == null) ? null
                       : SafeEncoder.encode(bchannel);
               onSubscribe(strchannel, subscribedChannels);
           } else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) {
               subscribedChannels = ((Long) reply.get(2)).intValue();
               final byte[] bchannel = (byte[]) reply.get(1);
               final String strchannel = (bchannel == null) ? null
                       : SafeEncoder.encode(bchannel);
               onUnsubscribe(strchannel, subscribedChannels);
           } else if (Arrays.equals(MESSAGE.raw, resp)) {
               final byte[] bchannel = (byte[]) reply.get(1);
               final byte[] bmesg = (byte[]) reply.get(2);
               final String strchannel = (bchannel == null) ? null
                       : SafeEncoder.encode(bchannel);
               final String strmesg = (bmesg == null) ? null : SafeEncoder
                       .encode(bmesg);
               onMessage(strchannel, strmesg);
           } else if (Arrays.equals(PMESSAGE.raw, resp)) {
               final byte[] bpattern = (byte[]) reply.get(1);
               final byte[] bchannel = (byte[]) reply.get(2);
               final byte[] bmesg = (byte[]) reply.get(3);
               final String strpattern = (bpattern == null) ? null
                       : SafeEncoder.encode(bpattern);
               final String strchannel = (bchannel == null) ? null
                       : SafeEncoder.encode(bchannel);
               final String strmesg = (bmesg == null) ? null : SafeEncoder
                       .encode(bmesg);
               onPMessage(strpattern, strchannel, strmesg);
           } else if (Arrays.equals(PSUBSCRIBE.raw, resp)) {
               subscribedChannels = ((Long) reply.get(2)).intValue();
               final byte[] bpattern = (byte[]) reply.get(1);
               final String strpattern = (bpattern == null) ? null
                       : SafeEncoder.encode(bpattern);
               onPSubscribe(strpattern, subscribedChannels);
           } else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) {
               subscribedChannels = ((Long) reply.get(2)).intValue();
               final byte[] bpattern = (byte[]) reply.get(1);
               final String strpattern = (bpattern == null) ? null
                       : SafeEncoder.encode(bpattern);
               onPUnsubscribe(strpattern, subscribedChannels);
           } else {
               throw new JedisException("Unknown message type: " + firstObj);
           } 

可以看到,通过client.getObjectMultiBulkReply()来得到返回来的消息。判断消息的类型来进行不同的操作。比如Arrays.equals(SUBSCRIBE.raw, resp)判断返回来的消息是订阅,subscribedChannels = ((Long) reply.get(2)).intValue();是取得消息,也是do...while判断循环的条件,也就是说这一次如果读到消息了,则进行下一次循环。那么onSubscribe(String channel, int subscribedChannels)究竟做了什么事,看开头

public abstract void onMessage(String channel, String message); 

   public abstract void onPMessage(String pattern, String channel,
           String message); 

   public abstract void onSubscribe(String channel, int subscribedChannels); 

   public abstract void onUnsubscribe(String channel, int subscribedChannels); 

   public abstract void onPUnsubscribe(String pattern, int subscribedChannels); 

   public abstract void onPSubscribe(String pattern, int subscribedChannels);

可以看到这是xetorthio留给我们的方法。onSubscrible是订阅时应该做些什么,onMessage就是有消息传来是做些什么,以此类推。

接下来可以写一个方法来发布和订阅消息了。

package redis.client.jredis.tests; 

import java.util.Timer;
import java.util.TimerTask; 

import org.junit.Test; 

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisPubSub; 

public class JedisTest extends JedisTestBase {
    JedisPool pool = null;
    /**
     * 测试发布验证
     */
    @Test
    public void testPS(){
        /**
        Jedis jedis = new Jedis("192.168.5.146",6379);
        jedis.set("name", "xiaoruoen");
        jedis.publish("news.blog.title", "Hello,World");
        //*/
        final String host = "192.168.5.146";
        JedisPoolConfig config = new JedisPoolConfig();
        pool = new JedisPool(new JedisPoolConfig(),host);
        subscribe(new NewsListener(), "news.sports");
        Timer timer = new Timer();
        timer.schedule(new TimerTask() { 

            @Override
            public void run() {
                // TODO Auto-generated method stub
                publish("news.sports", "{\"_id\":335566,\"author\":\"xiaoruoen\",\"title\":\"kaka is back\"}");
            }
        }, 1000, 3000); 

    } 

    public Jedis getResource(int dbnum){
        Jedis jedis = pool.getResource();
        jedis.select(dbnum);
        return jedis;
    } 

    /**
     *
     * @param channel
     * @param message ex:"{\"_id\":335566,\"author\":\"xiaoruoen\",\"title\":\"kaka is back\"}"
     */
    public void publish(String channel,String message){
        Jedis jedis = getResource(12);
        jedis.publish(channel, message);
        pool.returnResource(jedis);
    } 

    public void subscribe(JedisPubSub listener,String channel){
        Jedis jedis = getResource(12);
        jedis.subscribe(listener, channel);
        pool.returnResource(jedis);
    } 

} 
package redis.client.jredis.tests; 

import redis.clients.jedis.JedisPubSub; 

public class NewsListener extends JedisPubSub { 

    @Override
    public void onMessage(String channel, String message) {
        System.out.println("get message from"+channel+"   "+message);
    } 

    @Override
    public void onPMessage(String pattern, String channel, String message) {
        System.out.println("get message from"+channel+"   "+message);
    } 

    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println("subscribe the channel:"+channel);
    } 

    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        System.out.println("get message from"+channel);
    } 

    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        System.out.println("get message from"+subscribedChannels);
    } 

    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
        System.out.println("get message from"+subscribedChannels);
    } 

} 

发现只打印了一条数据subscribe the channel:news.sports

没按我们所期望的那样那所有发布的消息都打印出来。

到官网查看

https://github.com/xetorthio/jedis/wiki/AdvancedUsage

看到Note that subscribe is a blocking operation operation because it will poll Redis for responses on the thread that calls subscribe.可以看到subcribe是一个线程中的块操作。我猜测是在发布与接收的过程中,如果在同一线程里面进行操作,一边阻塞着流,另一边无法进行操作。于是将publish改写为另一线程启动。修改如下:

public static void main(String[] args){
        final String host = "192.168.5.146";
        JedisPoolConfig config = new JedisPoolConfig();
        pool = new JedisPool(new JedisPoolConfig(),host);
        Thread thread = new Thread(new Test().new PublishThread());
        thread.start();
        subscribe(new NewsListener(), "news.sports"); 

} 
class PublishThread implements Runnable{
        @Override
        public void run() {
            Timer timer = new Timer();
            timer.schedule(new TimerTask() { 

                @Override
                public void run() {
                    // TODO Auto-generated method stub
                    publish("news.sports", "{\"_id\":335566,\"author\":\"xiaoruoen\",\"title\":\"kaka is back\"}");
                }
            }, 1000, 3000);
        }  

    } 

最终发布订阅成功。

Pool:[email protected]
subscribe the channel:news.sports
Pool:[email protected]
get message fromnews.sports   {"_id":335566,"author":"xiaoruoen","title":"kaka is back"}
Pool:[email protected]
get message fromnews.sports   {"_id":335566,"author":"xiaoruoen","title":"kaka is back"}
Pool:[email protected]
get message fromnews.sports   {"_id":335566,"author":"xiaoruoen","title":"kaka is back"}
Pool:[email protected]
get message fromnews.sports   {"_id":335566,"author":"xiaoruoen","title":"kaka is back"}
Pool:[email protected] 

本文出自 “若是人间” 博客,请务必保留此出处http://xiaoruoen.blog.51cto.com/4828946/835710

时间: 2024-11-09 23:58:18

jedis的publish/subscribe[转]含有redis源码解析的相关文章

redis源码解析之事件驱动

Redis 内部有个小型的事件驱动,它主要处理两项任务: 文件事件:使用I/O多路复用技术处理多个客户端请求,并返回执行结果. 时间事件:维护服务器的资源管理,状态检查. 主要的数据结构包括文件事件结构体,时间事件结构体,触发事件结构体,事件循环结构体 /* File event structure */ typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE) */ aeFileProc *rfileProc

Redis源码解析——双向链表

相对于之前介绍的字典和SDS字符串库,Redis的双向链表库则是非常标准的.教科书般简单的库.但是作为Redis源码的一部分,我决定还是要讲一讲的.(转载请指明出于breaksoftware的csdn博客) 基本结构 首先我们看链表元素的结构.因为是双向链表,所以其基本元素应该有一个指向前一个节点的指针和一个指向后一个节点的指针,还有一个记录节点值的空间 typedef struct listNode { struct listNode *prev; struct listNode *next;

redis源码解析之内存管理

zmalloc.h的内容如下: 1 void *zmalloc(size_t size); 2 void *zcalloc(size_t size); 3 void *zrealloc(void *ptr, size_t size); 4 void zfree(void *ptr); 5 char *zstrdup(const char *s); 6 size_t zmalloc_used_memory(void); 7 void zmalloc_enable_thread_safeness(v

Redis源码解析之ziplist

Ziplist是用字符串来实现的双向链表,对于容量较小的键值对,为其创建一个结构复杂的哈希表太浪费内存,所以redis 创建了ziplist来存放这些键值对,这可以减少存放节点指针的空间,因此它被用来作为哈希表初始化时的底层实现.下图即ziplist 的内部结构. Zlbytes是整个ziplist 所占用的空间,必要时需要重新分配. Zltail便于快速的访问到表尾节点,不需要遍历整个ziplist. Zllen表示包含的节点数. Entries表示用户增加上去的节点. Zlend是一个255

redis源码解析之dict数据结构

dict 是redis中最重要的数据结构,存放结构体redisDb中. typedef struct dict { dictType *type; void *privdata; dictht ht[2]; int rehashidx; /* rehashing not in progress if rehashidx == -1 */ int iterators; /* number of iterators currently running */ } dict; 其中type是特定结构的处

Redis源码解析:15Resis主从复制之从节点流程

Redis的主从复制功能,可以实现Redis实例的高可用,避免单个Redis 服务器的单点故障,并且可以实现负载均衡. 一:主从复制过程 Redis的复制功能分为同步(sync)和命令传播(commandpropagate)两个操作: 同步操作用于将从节点的数据库状态更新至主节点当前所处的数据库状态: 命令传播操作则用于在主节点的数据库状态被修改,导致主从节点的数据库状态不一致时,让主从节点的数据库重新回到一致状态: 1:同步 当客户端向从节点发送SLAYEOF命令,或者从节点的配置文件中配置了

Redis源码解析(十五)--- aof-append only file解析

继续学习redis源码下的Data数据相关文件的代码分析,今天我看的是一个叫aof的文件,这个字母是append ONLY file的简称,意味只进行追加文件操作.这里的文件追加记录时为了记录数据操作的改变记录,用以异常情况的数据恢复的.类似于之前我说的redo,undo日志的作用.我们都知道,redis作为一个内存数据库,数据的每次操作改变是先放在内存中,等到内存数据满了,在刷新到磁盘文件中,达到持久化的目的.所以aof的操作模式,也是采用了这样的方式.这里引入了一个block块的概念,其实就

Redis源码解析——字符串map

本文介绍的是Redis中Zipmap的原理和实现.(转载请指明出于breaksoftware的csdn博客) 基础结构 Zipmap是为了实现保存Pair(String,String)数据的结构,该结构包含一个头信息.一系列字符串对(之后把一个"字符串对"称为一个"元素"(ELE))和一个尾标记.用图形表示该结构就是: Redis源码中并没有使用结构体来表达该结构.因为这个结构在内存中是连续的,而除了HEAD和红色背景的尾标记END(恒定是0xFF)是固定的8位,其

Redis源码解析:13Redis中的事件驱动机制

Redis中,处理网络IO时,采用的是事件驱动机制.但它没有使用libevent或者libev这样的库,而是自己实现了一个非常简单明了的事件驱动库ae_event,主要代码仅仅400行左右. 没有选择libevent或libev的原因大概在于,这些库为了迎合通用性造成代码庞大,而且其中的很多功能,比如监控子进程,复杂的定时器等,这些都不是Redis所需要的. Redis中的事件驱动库只关注网络IO,以及定时器.该事件库处理下面两类事件: a:文件事件(file  event):用于处理Redis