redis的发布订阅模式

概要

redis的每个server实例都维护着一个保存服务器状态的redisServer结构

struct redisServer

{

/* Pubsub */

// 字典,键为频道,值为链表

// 链表中保存了所有订阅某个频道的客户端

// 新客户端总是被添加到链表的表尾

dict *pubsub_channels;  /* Map channels to list of subscribed clients */

// 这个链表记录了客户端订阅的所有模式的名字

list *pubsub_patterns;  /* A list of pubsub_patterns */

};

pubsub_channels记录了所有客户端订阅的频道的信息。

redis的发布订阅模式

订阅者通过sub命令订阅频道,server使用pub命令把消息推送到符合条件的pubsub_channels中。

内部数据结构

pubsub_channels是一个字典结构,字典内部使用hash表存储和索引数据。

实现字典的可选数据结构

hash表:简单但是不稳定的基于数组的冲突解决法;简单且平均效率稳定的链式地址的冲突解决法;

字典树:使用树结构。

redis采用hash的链式地址法作为实现,好处是直观、简单、可期待平均时间复杂度较小且平稳。

typedef struct dict {

// 类型特定函数

dictType *type;

// 私有数据

void *privdata;

// 哈希表

dictht ht[2];

// rehash 索引

// 当 rehash 不在进行时,值为 -1

int rehashidx; /* rehashing not in progress if rehashidx == -1 */

// 目前正在运行的安全迭代器的数量

int iterators; /* number of iterators currently running */

} dict;

dict使用两个hash表进行索引,当进行rehash的时候使用ht[1]的hash表,其他时候都使用hd[0]的hash表。

使用的hash表的定义如下

typedef struct dictht {

// 哈希表数组

dictEntry **table;

// 哈希表大小

unsigned long size;

// 哈希表大小掩码,用于计算索引值

// 总是等于 size - 1

unsigned long sizemask;

// 该哈希表已有节点的数量

unsigned long used;

} dictht;

table是一个存储dictEntry*指针的数组,table中的每一项都是一个指向DictEntry结构的指针,同时每一项也都带有一个指向下一项的指针。可以看出这是一个使用链地址法解决冲突的hash结构。

dictEntry的定义,包含key、value、next。

/*

* 哈希表节点

*/

typedef struct dictEntry {

// 键

void *key;

// 值

union {

void *val;

uint64_t u64;

int64_t s64;

} v;

// 指向下个哈希表节点,形成链表

struct dictEntry *next;

} dictEntry;

客户端订阅频道

就是字典中插入新元素的过程

// 关联示意图

// {

//  频道名        订阅频道的客户端

//  ‘channel-a‘ : [c1, c2, c3],

//  ‘channel-b‘ : [c5, c2, c1],

//  ‘channel-c‘ : [c10, c2, c1]

// }

/* Add the client to the channel -> list of clients hash table */

// 从 pubsub_channels 字典中取出保存着所有订阅了 channel 的客户端的链表

// 如果 channel 不存在于字典,那么添加进去

de = dictFind(server.pubsub_channels,channel);

if (de == NULL) {

clients = listCreate();

dictAdd(server.pubsub_channels,channel,clients);

incrRefCount(channel);

} else {

clients = dictGetVal(de);

}

// before:

// ‘channel‘ : [c1, c2]

// after:

// ‘channel‘ : [c1, c2, c3]

// 将客户端添加到链表的末尾

listAddNodeTail(clients,c);

1.查询server是否包含指定频道

2.如果频道存在就获取频道指向的list地址;如果频道不存在,就创建频道和list

3.使用步骤2的list,将客户端添加到list的末尾。

完成这三步以后,server维护的发布订阅频道就新增了一个频道和关注的客户端,server发布时检测发布订阅字典,获取订阅客户端并依次发送。

server发布消息到channel

/* Send to clients listening for that channel */

// 取出包含所有订阅频道 channel 的客户端的链表

// 并将消息发送给它们

de = dictFind(server.pubsub_channels,channel);

if (de) {

list *list = dictGetVal(de);

listNode *ln;

listIter li;

// 遍历客户端链表,将 message 发送给它们

listRewind(list,&li);

while ((ln = listNext(&li)) != NULL) {

redisClient *c = ln->value;

// 回复客户端。

// 示例:

// 1) "message"

// 2) "xxx"

// 3) "hello"

addReply(c,shared.mbulkhdr[3]);

// "message" 字符串

addReply(c,shared.messagebulk);

// 消息的来源频道

addReplyBulk(c,channel);

// 消息内容

addReplyBulk(c,message);

// 接收客户端计数

receivers++;

}

}

结合订阅者订阅的过程,发布过程就是一个查找订阅者并轮询发送消息给订阅者的过程。

时间: 2024-10-08 14:06:43

redis的发布订阅模式的相关文章

使用redis的发布订阅模式实现消息队列

配置文件 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/sc

Redis研究(十六)—发布/订阅模式

在上一篇中我们写了Redis的任务队列. 除了实现任务队列外,Redis还提供了一组命令可以让开发者实现"发布/订阅"(publish/subscribe)模式."发布/订阅"模式同样可以实现进程间的消息传递,其原理是这样的: "发布/订阅"模式中包含两种角色,分别是发布者和订阅者.订阅者可以订阅一个或若干个频道(channel),而发布者可以向指定的频道发送消息,所有订阅此频道的订阅者都会收到此消息. 发布者发布消息的命令是PUBLISH,用法

redis 发布/订阅 模式

发布/订阅模式的命令如下: * 进入发布订阅模式的客户端,不能执行除发布订阅模式以上命令的其他命令,否则出错.

redis(3)发布订阅

一.发布/订阅模式 在软件工程里面,发布/订阅是一种消息模式,这种模式旨在将消息发送者和消息接收者解耦.发送者不需要关心将消息发送给谁,接收者也不需要知道消息的发送者是谁.发送者将消息发布以后就结束动作,接收者可以订阅自己感兴趣的消息. 除了发布/订阅模式还有一种和它很类似的,消息队列,是一种典型的面向消息中间件的系统.许多消息系统都会同时支持发布/订阅和消息队列模型,例如Java Message Service(JMS) 参见:维基百科 二.redis的发布/订阅 我们从一个简单的示例开始,首

python之上下文管理、redis的发布订阅

使用with打开文件的方式,是调用了上下文管理的功能 1 #打开文件的两种方法: 2 3 f = open('a.txt','r') 4 5 with open('a.txt','r') as f 6 7 实现使用with关闭socket 8 import contextlib 9 import socket 10 11 @contextlib.contextmanage 12 def Sock(ip,port): 13 socket = socket.socket() 14 socket.bi

发布-订阅模式

1.什么是发布订阅模式 发布订阅模式 又叫观察者模式,他是定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变,所有依赖他的对象都将得到通知. 在javascript开发中,我们一般用事件模型来替代传统的发布-订阅模式. 2.Dom事件 实际上,只要我们曾经在dom节点上绑定过事件函数,那么我们就曾经使用过发布-订阅模式. document.getElementById('test').addEventListener('click',function(){ alert(2)},fasle

C# 委托和事件 与 观察者模式(发布-订阅模式)讲解 by天命

使用面向对象的思想 用c#控制台代码模拟猫抓老鼠 我们先来分析一下猫抓老鼠的过程 1.猫叫了 2.所有老鼠听到叫声,知道是哪只猫来了 3.老鼠们逃跑,边逃边喊:"xx猫来了,快跑啊!我是老鼠xxx" 一  双向耦合的代码 首先需要一个猫类Cat 一个老鼠类Rat 和一个测试类Program 老鼠类的代码如下 //老鼠类 public class Rat { public string Name { get; set; } //老鼠的名字 public Cat MyCat { get;

JavaScript设计模式与开发实践---读书笔记(8) 发布-订阅模式

发布-订阅模式又叫观察者模式,它定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都将得到通知. 发布-订阅模式可以广泛应用于异步编程中,这是一种替代传递回调函数的方案. 可以取代对象之间硬编码的通知机制,一个对象不用再显式地调用另外一个对象的某个接口. 自定义事件 首先要指定好谁充当发布者: 然后给发布者添加一个缓存列表,用于存放回调函数以便通知订阅者: 最后发布消息时,发布者会遍历这个缓存列表,依次触发里面存放的订阅者回调函数. 另外,我们还可以往回调函数里填入

观察者模式和发布/订阅模式的区别

在事件总线(EventBus)的架构设计中,用到了发布/订阅模式,但发现和观察者模式挺接近,有时容易发生混淆,现试图分清一下他们的关系. 观察者模式的角色为观察者(observer)和主题(subject)对象,observer需要观察subject时,需先到subject里面进行注册(subject对象持有observer对象的集合句柄),然后,当subject对象的内部状态发生变化时,把这个变化通知所有的观察者. 发布.订阅模式的角色为发布者(publisher)和订阅者(subscribe