Redis源码解析:30发布和订阅

Redis的发布与订阅功能,由SUBSCRIBE,PSUBSCRIBE,UNSUBSCRIBE,PUNSUBSCRIBE,以及PUBLISH等命令实现。

通过执行SUBSCRIBE命令,客户端可以订阅一个或多个频道。当有客户端通过PUBLISH命令向某个频道发布消息时,频道的所有订阅者都会收到这条消息。

除了订阅具体的频道之外,客户端还可以通过执行PSUBSCRIBE命令订阅一个或多个频道模式。当有客户端通过PUBLISH命令向某个频道发布消息时,消息不仅会被发送给这个频道的所有订阅者,它还会发送给所有与这个频道相匹配的频道模式的订阅者。

UNSUBSCRIBE和PUNSUBSCRIBE命令,主要用于退订频道和退订频道模式。

一:订阅

1:数据结构

SUBSCRIBE命令主要使用字典结构。

在表示Redis服务器的redisServer结构体中,使用字典pubsub_channels记录某个频道都由哪些客户端订阅。

struct redisServer {
    ...
    /* Pubsub */
    dict *pubsub_channels;  /* Map channels to list of subscribed clients */
    ...
}

在字典pubsub_channels中,以具体的频道名为key,而value是一个列表,该列表中记录了订阅该频道的所有客户端。

当向某频道发布消息时,就是通过查询该字典,将消息发送给订阅该频道的所有客户端。

在表示客户端的结构体redisClient中,使用字典pubsub_channels记录该客户端都订阅了哪些频道:

typedef struct redisClient {
    ...
    dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */
    ...
} redisClient;

在字典pubsub_channels中,以具体的频道名为key,而value为NULL。因此使用该字典能够快速判断客户端是否订阅了某频道。

PSUBSCRIBE命令主要使用列表结构。

在表示Redis服务器的redisServer结构体中,使用列表pubsub_patterns记录客户端及其订阅的频道模式。列表中的元素都是pubsubPattern结构:

struct redisServer {
    ...
    /* Pubsub */
    list *pubsub_patterns;  /* A list of pubsub_patterns */
    ...
};   

typedef struct pubsubPattern {
    redisClient *client;
    robj *pattern;
} pubsubPattern;

当向某频道发布消息时,就是通过查询该列表,将消息发送给订阅了与该频道相匹配的频道模式的所有客户端。

在表示客户端的结构体redisClient中,使用列表pubsub_patterns记录该客户端都订阅了哪些频道模式,列表中的元素就是频道模式名:

typedef struct redisClient {
    ...
    list *pubsub_patterns;  /* patterns a client is interested in (PSUBSCRIBE) */
    ...
} redisClient;

2:SUBSCRIBE命令

当客户端发来SUBSCRIBE命令之后,该命令的处理函数是subscribeCommand,代码如下:

void subscribeCommand(redisClient *c) {
    int j;

    for (j = 1; j < c->argc; j++)
        pubsubSubscribeChannel(c,c->argv[j]);
    c->flags |= REDIS_PUBSUB;
}

代码很简单,针对命令参数中的每一个channel,调用函数pubsubSubscribeChannel,将客户端c和该channel,记录到相应数据结构中。

最后,向客户端标志位中增加REDIS_PUBSUB标记,表示该客户端进入订阅模式;

函数pubsubSubscribeChannel的代码如下:

int pubsubSubscribeChannel(redisClient *c, robj *channel) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* Add the channel to the client -> channels hash table */
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        /* Add the client to the channel -> list of clients hash table */
        de = dictFind(server.pubsub_channels,channel);
        if (de == NULL) {
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }
        listAddNodeTail(clients,c);
    }
    /* Notify the client */
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.subscribebulk);
    addReplyBulk(c,channel);
    addReplyLongLong(c,clientSubscriptionsCount(c));
    return retval;
}

首先向字典c->pubsub_channels中添加以该channel为key的键值对,键值对中的value为NULL。因此,该字典仅用于记录当前客户端订阅了哪些频道;

然后,在字典server.pubsub_channels中,以channel为key寻找字典项de,如果de为NULL,则创建列表clients,以channel为key,以列表clients为value,将该键值对添加到字典server.pubsub_channels中;如果de不为NULL,则从中取出当前已订阅该channel的客户端列表clients;然后,将客户端c添加到列表clients中;

最后,回复客户端c相应的订阅信息;

3:PSUBSCRIBE命令

当客户端发来PSUBSCRIBE命令之后,该命令的处理函数是psubscribeCommand,代码如下:

void psubscribeCommand(redisClient *c) {
    int j;

    for (j = 1; j < c->argc; j++)
        pubsubSubscribePattern(c,c->argv[j]);
    c->flags |= REDIS_PUBSUB;
}

代码很简单,针对命令参数中的每一个频道模式,调用函数pubsubSubscribePattern,将客户端c和该频道模式,记录到相应数据结构中。

最后,向客户端标志位中增加REDIS_PUBSUB标记,表示该客户端进入订阅模式;

函数pubsubSubscribePattern的代码如下:

int pubsubSubscribePattern(redisClient *c, robj *pattern) {
    int retval = 0;

    if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
        retval = 1;
        pubsubPattern *pat;
        listAddNodeTail(c->pubsub_patterns,pattern);
        incrRefCount(pattern);
        pat = zmalloc(sizeof(*pat));
        pat->pattern = getDecodedObject(pattern);
        pat->client = c;
        listAddNodeTail(server.pubsub_patterns,pat);
    }
    /* Notify the client */
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.psubscribebulk);
    addReplyBulk(c,pattern);
    addReplyLongLong(c,clientSubscriptionsCount(c));
    return retval;
}

根据pattern,从列表c->pubsub_patterns中寻找相应的节点,如果找不到,说明该客户端未订阅该模式,因此:首先将pattern追加到列表c->pubsub_patterns中;然后根据c和pattern,创建pubsubPattern结构的pat,并将pat追加到列表server.pubsub_patterns中;

最后,回复客户端相应信息;

4:订阅模式

当客户端标志位中设置了REDIS_PUBSUB标记后,表示该客户端进入订阅模式。

在处理客户端命令的processCommand函数中,有下面的逻辑:

    /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
    if (c->flags & REDIS_PUBSUB &&
        c->cmd->proc != pingCommand &&
        c->cmd->proc != subscribeCommand &&
        c->cmd->proc != unsubscribeCommand &&
        c->cmd->proc != psubscribeCommand &&
        c->cmd->proc != punsubscribeCommand) {
        addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context");
        return REDIS_OK;
    }

因此,处于订阅模式下的客户端,只能向Redis服务器发送PING、SUBSCRIBE、UNSUBSCRIBE、PSUBSCRIBE、PUNSUBSCRIBE命令。

二:发布

当客户端向Redis服务器发送”PUBLISH <channel>  <message>”命令后,Redis服务器会将消息<message>发送给所有订阅了<channel>的客户端,以及那些订阅了与<channel>相匹配的频道模式的客户端。

PUBLISH命令的处理函数是publishCommand,代码如下:

void publishCommand(redisClient *c) {
    int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
    if (server.cluster_enabled)
        clusterPropagatePublish(c->argv[1],c->argv[2]);
    else
        forceCommandPropagation(c,REDIS_PROPAGATE_REPL);
    addReplyLongLong(c,receivers);
}

函数中,首先调用pubsubPublishMessage函数,将message发布到相应的频道;

然后,如果当前Redis处于集群模式下,则调用clusterPropagatePublish函数,向所有集群节点广播该消息;否则,调用forceCommandPropagation函数,向客户端c的标志位中增加REDIS_FORCE_REPL标记,以便后续能将该PUBLISH命令传递给从节点;

最后,将接收消息的客户端个数回复给客户端c;

发布消息的功能,主要是通过pubsubPublishMessage函数实现的,该函数的代码如下:

int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    dictEntry *de;
    listNode *ln;
    listIter li;

    /* Send to clients listening for that channel */
    de = dictFind(server.pubsub_channels,channel);
    if (de) {
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;

        listRewind(list,&li);
        while ((ln = listNext(&li)) != NULL) {
            redisClient *c = ln->value;

            addReply(c,shared.mbulkhdr[3]);
            addReply(c,shared.messagebulk);
            addReplyBulk(c,channel);
            addReplyBulk(c,message);
            receivers++;
        }
    }
    /* Send to clients listening to matching channels */
    if (listLength(server.pubsub_patterns)) {
        listRewind(server.pubsub_patterns,&li);
        channel = getDecodedObject(channel);
        while ((ln = listNext(&li)) != NULL) {
            pubsubPattern *pat = ln->value;

            if (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) {
                addReply(pat->client,shared.mbulkhdr[4]);
                addReply(pat->client,shared.pmessagebulk);
                addReplyBulk(pat->client,pat->pattern);
                addReplyBulk(pat->client,channel);
                addReplyBulk(pat->client,message);
                receivers++;
            }
        }
        decrRefCount(channel);
    }
    return receivers;
}

代码很简单,首先根据channel,在字典server.pubsub_channels中查找订阅了该频道的客户端列表list;针对列表中的每个客户端,向其发送message消息;

然后,轮训列表server.pubsub_patterns,针对列表中的每一个pat,如果channel与pat->pattern相匹配,则向pat->client发送message消息;

三:退订

1:UNSUBSCRIBE命令

客户端发送UNSUBSCRIBE命令,用于退订之前通过SUBSCRIBE命令订阅的频道。UNSUBSCRIBE命令的处理函数是unsubscribeCommand,代码如下:

void unsubscribeCommand(redisClient *c) {
    if (c->argc == 1) {
        pubsubUnsubscribeAllChannels(c,1);
    } else {
        int j;

        for (j = 1; j < c->argc; j++)
            pubsubUnsubscribeChannel(c,c->argv[j],1);
    }
    if (clientSubscriptionsCount(c) == 0) c->flags &= ~REDIS_PUBSUB;
}

如果该命令没有任何参数,则调用pubsubUnsubscribeAllChannels函数,设置该客户端c退订所有频道;

否则,针对命令参数中的每一个channel,调用pubsubUnsubscribeChannel,使客户端退订相应频道;

最后,如果当前该客户端c订阅的频道和频道模式数为0,则从客户端标志位中删除REDIS_PUBSUB标记,表示客户端退出订阅模式;

pubsubUnsubscribeAllChannels函数用于客户端退订所有频道,该函数的代码如下:

int pubsubUnsubscribeAllChannels(redisClient *c, int notify) {
    dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
    dictEntry *de;
    int count = 0;

    while((de = dictNext(di)) != NULL) {
        robj *channel = dictGetKey(de);

        count += pubsubUnsubscribeChannel(c,channel,notify);
    }
    /* We were subscribed to nothing? Still reply to the client. */
    if (notify && count == 0) {
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.unsubscribebulk);
        addReply(c,shared.nullbulk);
        addReplyLongLong(c,dictSize(c->pubsub_channels)+
                       listLength(c->pubsub_patterns));
    }
    dictReleaseIterator(di);
    return count;
}

轮训字典c->pubsub_channels,针对其中的每一个channel,调用函数pubsubUnsubscribeChannel,使客户端退订该频道,并将退订的频道数记录到count中;

最后,如果notify非0,并且count为0,则回复客户端相应信息;

pubsubUnsubscribeChannel函数用于客户端退订单个频道,该函数的代码如下:

int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
    dictEntry *de;
    list *clients;
    listNode *ln;
    int retval = 0;

    /* Remove the channel from the client -> channels hash table */
    incrRefCount(channel); /* channel may be just a pointer to the same object
                            we have in the hash tables. Protect it... */
    if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
        retval = 1;
        /* Remove the client from the channel -> clients list hash table */
        de = dictFind(server.pubsub_channels,channel);
        redisAssertWithInfo(c,NULL,de != NULL);
        clients = dictGetVal(de);
        ln = listSearchKey(clients,c);
        redisAssertWithInfo(c,NULL,ln != NULL);
        listDelNode(clients,ln);
        if (listLength(clients) == 0) {
            /* Free the list and associated hash entry at all if this was
             * the latest client, so that it will be possible to abuse
             * Redis PUBSUB creating millions of channels. */
            dictDelete(server.pubsub_channels,channel);
        }
    }
    /* Notify the client */
    if (notify) {
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.unsubscribebulk);
        addReplyBulk(c,channel);
        addReplyLongLong(c,dictSize(c->pubsub_channels)+
                       listLength(c->pubsub_patterns));

    }
    decrRefCount(channel); /* it is finally safe to release it */
    return retval;
}

首先从字典c->pubsub_channels中,删除以channel为key的字典项;

然后在字典server.pubsub_channels中,寻找以channel为key的字典项de,并从de中得到订阅该channel的客户端列表clients,然后在列表clients中找到客户端c对应的元素ln,将ln从clients中删除;如果clients长度为0,则从字典server.pubsub_channels中删除该channel;

最后,如果参数notify非0,则回复客户端相应信息;

2:PUNSUBSCRIBE命令

客户端发送PUNSUBSCRIBE命令,用于退订之前通过PSUBSCRIBE命令订阅的频道模式。PUNSUBSCRIBE命令的处理函数是punsubscribeCommand,代码如下:

void punsubscribeCommand(redisClient *c) {
    if (c->argc == 1) {
        pubsubUnsubscribeAllPatterns(c,1);
    } else {
        int j;

        for (j = 1; j < c->argc; j++)
            pubsubUnsubscribePattern(c,c->argv[j],1);
    }
    if (clientSubscriptionsCount(c) == 0) c->flags &= ~REDIS_PUBSUB;
}

如果该命令没有任何参数,则调用pubsubUnsubscribeAllPatterns函数,设置该客户端c退订所有频道模式;

否则,针对命令参数中的每一个pattern,调用pubsubUnsubscribePattern,使客户端退订相应频道模式;

最后,如果该客户端c订阅的频道和频道模式数为0,则从客户端标志位中删除REDIS_PUBSUB标记,表示客户端退出订阅模式;

pubsubUnsubscribeAllPatterns函数用于客户端退订所有频道模式,该函数的代码如下:

int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) {
    listNode *ln;
    listIter li;
    int count = 0;

    listRewind(c->pubsub_patterns,&li);
    while ((ln = listNext(&li)) != NULL) {
        robj *pattern = ln->value;

        count += pubsubUnsubscribePattern(c,pattern,notify);
    }
    if (notify && count == 0) {
        /* We were subscribed to nothing? Still reply to the client. */
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.punsubscribebulk);
        addReply(c,shared.nullbulk);
        addReplyLongLong(c,dictSize(c->pubsub_channels)+
                       listLength(c->pubsub_patterns));
    }
    return count;
}

轮训列表c->pubsub_patterns,针对其中的每一个pattern,调用函数pubsubUnsubscribePattern使客户端退订该频道模式,并将退订的频道模式数记录到count中;

最后,如果notify非0,并且count为0,则回复客户端相应信息;

pubsubUnsubscribePattern函数用于客户端退订单个频道模式,该函数的代码如下:

int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) {
    listNode *ln;
    pubsubPattern pat;
    int retval = 0;

    incrRefCount(pattern); /* Protect the object. May be the same we remove */
    if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {
        retval = 1;
        listDelNode(c->pubsub_patterns,ln);
        pat.client = c;
        pat.pattern = pattern;
        ln = listSearchKey(server.pubsub_patterns,&pat);
        listDelNode(server.pubsub_patterns,ln);
    }
    /* Notify the client */
    if (notify) {
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.punsubscribebulk);
        addReplyBulk(c,pattern);
        addReplyLongLong(c,dictSize(c->pubsub_channels)+
                       listLength(c->pubsub_patterns));
    }
    decrRefCount(pattern);
    return retval;
}

首先,根据pattern,在列表c->pubsub_patterns中寻找相应的节点ln;然后将ln从列表c->pubsub_patterns中删除;

然后,根据c和pattern,在列表server.pubsub_patterns中寻找相应的节点ln,然后将ln从列表server.pubsub_patterns中删除;

最后,如果参数notify非0,回复客户端相应信息;

时间: 2024-10-30 02:20:22

Redis源码解析:30发布和订阅的相关文章

redis源码解析之事件驱动

Redis 内部有个小型的事件驱动,它主要处理两项任务: 文件事件:使用I/O多路复用技术处理多个客户端请求,并返回执行结果. 时间事件:维护服务器的资源管理,状态检查. 主要的数据结构包括文件事件结构体,时间事件结构体,触发事件结构体,事件循环结构体 /* File event structure */ typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE) */ aeFileProc *rfileProc

Redis源码解析——双向链表

相对于之前介绍的字典和SDS字符串库,Redis的双向链表库则是非常标准的.教科书般简单的库.但是作为Redis源码的一部分,我决定还是要讲一讲的.(转载请指明出于breaksoftware的csdn博客) 基本结构 首先我们看链表元素的结构.因为是双向链表,所以其基本元素应该有一个指向前一个节点的指针和一个指向后一个节点的指针,还有一个记录节点值的空间 typedef struct listNode { struct listNode *prev; struct listNode *next;

redis源码解析之内存管理

zmalloc.h的内容如下: 1 void *zmalloc(size_t size); 2 void *zcalloc(size_t size); 3 void *zrealloc(void *ptr, size_t size); 4 void zfree(void *ptr); 5 char *zstrdup(const char *s); 6 size_t zmalloc_used_memory(void); 7 void zmalloc_enable_thread_safeness(v

Redis源码解析之ziplist

Ziplist是用字符串来实现的双向链表,对于容量较小的键值对,为其创建一个结构复杂的哈希表太浪费内存,所以redis 创建了ziplist来存放这些键值对,这可以减少存放节点指针的空间,因此它被用来作为哈希表初始化时的底层实现.下图即ziplist 的内部结构. Zlbytes是整个ziplist 所占用的空间,必要时需要重新分配. Zltail便于快速的访问到表尾节点,不需要遍历整个ziplist. Zllen表示包含的节点数. Entries表示用户增加上去的节点. Zlend是一个255

redis源码解析之dict数据结构

dict 是redis中最重要的数据结构,存放结构体redisDb中. typedef struct dict { dictType *type; void *privdata; dictht ht[2]; int rehashidx; /* rehashing not in progress if rehashidx == -1 */ int iterators; /* number of iterators currently running */ } dict; 其中type是特定结构的处

Redis源码解析:15Resis主从复制之从节点流程

Redis的主从复制功能,可以实现Redis实例的高可用,避免单个Redis 服务器的单点故障,并且可以实现负载均衡. 一:主从复制过程 Redis的复制功能分为同步(sync)和命令传播(commandpropagate)两个操作: 同步操作用于将从节点的数据库状态更新至主节点当前所处的数据库状态: 命令传播操作则用于在主节点的数据库状态被修改,导致主从节点的数据库状态不一致时,让主从节点的数据库重新回到一致状态: 1:同步 当客户端向从节点发送SLAYEOF命令,或者从节点的配置文件中配置了

Redis源码解析(十五)--- aof-append only file解析

继续学习redis源码下的Data数据相关文件的代码分析,今天我看的是一个叫aof的文件,这个字母是append ONLY file的简称,意味只进行追加文件操作.这里的文件追加记录时为了记录数据操作的改变记录,用以异常情况的数据恢复的.类似于之前我说的redo,undo日志的作用.我们都知道,redis作为一个内存数据库,数据的每次操作改变是先放在内存中,等到内存数据满了,在刷新到磁盘文件中,达到持久化的目的.所以aof的操作模式,也是采用了这样的方式.这里引入了一个block块的概念,其实就

Redis源码解析——字符串map

本文介绍的是Redis中Zipmap的原理和实现.(转载请指明出于breaksoftware的csdn博客) 基础结构 Zipmap是为了实现保存Pair(String,String)数据的结构,该结构包含一个头信息.一系列字符串对(之后把一个"字符串对"称为一个"元素"(ELE))和一个尾标记.用图形表示该结构就是: Redis源码中并没有使用结构体来表达该结构.因为这个结构在内存中是连续的,而除了HEAD和红色背景的尾标记END(恒定是0xFF)是固定的8位,其

Redis源码解析:13Redis中的事件驱动机制

Redis中,处理网络IO时,采用的是事件驱动机制.但它没有使用libevent或者libev这样的库,而是自己实现了一个非常简单明了的事件驱动库ae_event,主要代码仅仅400行左右. 没有选择libevent或libev的原因大概在于,这些库为了迎合通用性造成代码庞大,而且其中的很多功能,比如监控子进程,复杂的定时器等,这些都不是Redis所需要的. Redis中的事件驱动库只关注网络IO,以及定时器.该事件库处理下面两类事件: a:文件事件(file  event):用于处理Redis