基于Redis的消息订阅/发布

在工业生产设计中,我们往往需要实现一个基于消息订阅的模式,用来对非定时的的消息进行监听订阅。

这种设计模式在 总线设计模式中得到体现。微软以前的WCF中实现了服务总线 ServiceBus的设计模式。然并卵。WCF已经好像是上个世纪的产物................

基于事件订阅的模式,比如 EventBus类的组件产品。但是往往设计比较复杂。

如果依赖于 Redis做事件消息推送。那就大大简化了这种设计模式,而且性能也比较客观。

Redis在 2.0之后的版本中 实现了 事件推送的  pub/sub命令

PSUBSCRIBE订阅一个或多个符合给定模式的频道
PUBLISH将信息message 发送到指定的频道channel
PUBSUB是一个查看订阅与发布系统状态的内省命令
PUBSUB CHANNELS pattern 列出当前的活跃频道
PUBSUB NUMSUB channel-1 channel-N 返回给定频道的订阅者数量
PUBSUB NUMPAT 返回订阅模式的数量
PUNSUBSCRIBE 指示客户端退订所有给定模式
SUBSCRIBE 订阅给定的一个或多个频道的信息
UNSUBSCRIBE 指示客户端退订给定的频道

P 开头的 (pattern)支持通配符模式。

简单例子(来自:http://www.yiibai.com/redis/redis_pub_sub.html)

以下举例说明如何发布用户的概念工作。在下面的例子给出一个客户端订阅一个通道名为redisChat

redis 127.0.0.1:6379> SUBSCRIBE redisChat

Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "redisChat"
3) (integer) 1

现在,两个客户端都发布在同一个通道名redisChat消息及以上的订阅客户端接收消息。

redis 127.0.0.1:6379> PUBLISH redisChat "Redis is a great caching technique"

(integer) 1

redis 127.0.0.1:6379> PUBLISH redisChat "Learn redis by tutorials point"

(integer) 1

1) "message"
2) "redisChat"
3) "Redis is a great caching technique"
1) "message"
2) "redisChat"
3) "Learn redis by tutorials point"

上面的代码简单的演示了,订阅信道,向指定的信道发布消息。然后消息推送到订阅者。

在 redis-cli客户端中 推送消息的时候,返回成功发送到订阅者的数目。

如:(integer) 1

原理:

RedisServer包含两个重要的结构:
1. channels:实际上就是一个key-value的Map结构,key为订阅地频道,value为Client的List
2. patterns:存放模式+client地址的列表

 

流程:从pubsub_channels中找出跟publish中channel相符的clients-list,然后再去pubsub_patterns中找出每一个相符的pattern和client。向这些客户端发送publish的消息。

订阅信道

消息推送

在C#中的实现

基于

ServiceStack.Redis

 1             Task.Factory.StartNew(() =>
 2             {
 3                 var client = new RedisClient("192.168.1.100", 6379, "你的密码");
 4                 try
 5                 {
 6                     var isOpen = client.Ping();
 7                     if (isOpen == false)
 8                     {
 9                         return;
10                     }
11                     var sub1 = client.CreateSubscription();
12
13                     //接受消息的委托
14                     sub1.OnMessage = (chanel, message) =>
15                     {
16                         Console.WriteLine("chanel is :{0}", chanel);
17                         Console.WriteLine("message is :{0}", message);
18                     };
19                     sub1.SubscribeToChannels(new string[] { "testchat" });//注意:订阅信道的时候 会开启阻塞模式,所以,需要将监听放到单独的线程里
20                 }
21                 catch (Exception)
22                 {
23
24                     throw;
25                 }
26
27
28
29
30             });

注意:在程序终止或者类的实例被销毁的时候,请将订阅者实例注销掉,否则,在redis中一直存在这个订阅者。

1 使用idispose 显示释放

2 使用析构函数 CLR回收的时候 释放

sub1.Dispose();

例如:

 1  public void Dispose()
 2         {
 3             Dispose(true);
 4             GC.SuppressFinalize(this);
 5         }
 6
 7         protected virtual void Dispose(bool disposing)
 8         {
 9             if (disposing)
10             {
11                 if (redisClient != null)
12                     redisClient.Dispose();
13                 if (subscription != null)
14                     subscription.Dispose();
15                 if (log != null)
16                     log.Dispose();
17             }
18         }
19
20         ~RedisEx()
21         {
22             Dispose(false);
23         } 

官方推荐这种写法

 var clientsManager = new PooledRedisClientManager(new string[] { "密码@192.168.1.200:6379" });
            var redisPubSub = new RedisPubSubServer(clientsManager, new string[] { "testchat" })
            {
                OnMessage = (channel, msg) => {
                    Console.WriteLine("方式2订阅演示.............");
                    Console.WriteLine("Received ‘{0}‘ from ‘{1}‘", msg, channel);
                }
            }.Start();

To use RedisPubSubServer, initialize it with the channels you want to subscribe to and assign handlers for each of the events you want to handle.

At a minimum you‘ll want to handle OnMessage:

Calling Start() after it‘s initialized will get it to start listening and processing any messages published to the subscribed channels.

官方文档:https://github.com/ServiceStack/ServiceStack.Redis

附加文档

redis PUB/SUB(发布/订阅)

PSUBSCRIBE(订阅一个或多个符合给定模式的频道)

PSUBSCRIBE pattern [pattern …] 
订阅一个或多个符合给定模式的频道。 
每个模式以* 作为匹配符,比如it* 匹配所有以it 开头的频道( it.news 、it.blog 、it.tweets 等等), 
news.* 匹配所有以news. 开头的频道( news.it 、news.global.today 等等),诸如此类。 
可用版本: >= 2.0.0 
时间复杂度: O(N),N 是订阅的模式的数量。 
返回值: 接收到的信息(请参见下面的代码说明)。

redis> psubscribe news.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe" # 返回值的类型:显示订阅成功
2) "news.*" # 订阅的模式
3) (integer) 1 # 目前已订阅的模式的数量

1) "pmessage" # 返回值的类型:信息
2) "news.*" # 信息匹配的模式
3) "news.it" # 信息本身的目标频道
4) "Google buy Motorola" # 信息的内容

PUBLISH(将信息message 发送到指定的频道channel )

PUBLISH channel message 
将信息message 发送到指定的频道channel 。 
可用版本: >= 2.0.0 
时间复杂度: O(N+M),其中N 是频道channel 的订阅者数量,而M 则是使用模式订阅(subscribed 
patterns) 的客户端的数量。 
返回值: 接收到信息message 的订阅者数量。

# 对没有订阅者的频道发送信息
redis> publish bad_channel "can any body hear me?"
(integer) 0
# 向有一个订阅者的频道发送信息
redis> publish msg "good morning"
(integer) 1
# 向有多个订阅者的频道发送信息
redis> publish chat_room "hello~ everyone"
(integer) 3

PUBSUB(是一个查看订阅与发布系统状态的内省命令)

PUBSUB [argument [argument …]] 
PUBSUB 是一个查看订阅与发布系统状态的内省命令,它由数个不同格式的子命令组成,以下将分别对这 
些子命令进行介绍。 
可用版本: >= 2.8.0

PUBSUB CHANNELS [pattern] (列出当前的活跃频道)

列出当前的活跃频道。 
活跃频道指的是那些至少有一个订阅者的频道,订阅模式的客户端不计算在内。 
pattern 参数是可选的: 
• 如果不给出pattern 参数,那么列出订阅与发布系统中的所有活跃频道。 
• 如果给出pattern 参数,那么只列出和给定模式pattern 相匹配的那些活跃频道。 
复杂度: O(N) ,N 为活跃频道的数量(对于长度较短的频道和模式来说,将进行模式匹配的复杂度视为常 
数)。 
返回值: 一个由活跃频道组成的列表。

# client-1 订阅news.it 和news.sport 两个频道
client-1> SUBSCRIBE news.it news.sport
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news.it"
3) (integer) 1
1) "subscribe"
2) "news.sport"
3) (integer) 2
# client-2 订阅news.it 和news.internet 两个频道
client-2> SUBSCRIBE news.it news.internet
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news.it"
3) (integer) 1
1) "subscribe"
2) "news.internet"
3) (integer) 2
# 首先, client-3 打印所有活跃频道
# 注意,即使一个频道有多个订阅者,它也只输出一次,比如news.it
client-3> PUBSUB CHANNELS
1) "news.sport"
2) "news.internet"
3) "news.it"
# 接下来, client-3 打印那些与模式news.i* 相匹配的活跃频道
# 因为news.sport 不匹配news.i* ,所以它没有被打印
redis> PUBSUB CHANNELS news.i*
1) "news.internet"
2) "news.it"

PUBSUB NUMSUB [channel-1 … channel-N] (返回给定频道的订阅者数量)

返回给定频道的订阅者数量,订阅模式的客户端不计算在内。 
复杂度: O(N) ,N 为给定频道的数量。 
返回值: 一个多条批量回复( Multi-bulk reply),回复中包含给定的频道,以及频道的订阅者数量。格式为:频道 channel-1 ,channel-1 的订阅者数量,频道 channel-2 ,channel-2 的订阅者数量,诸如此类。 
回复中频道的排列顺序和执行命令时给定频道的排列顺序一致。不给定任何频道而直接调用这个命令也是可以的,在这种情况下,命令只返回一个空列表。

# client-1 订阅 news.it 和 news.sport 两个频道
client-1> SUBSCRIBE news.it news.sport
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news.it"
3) (integer) 1
1) "subscribe"
2) "news.sport"
3) (integer) 2
# client-2 订阅 news.it 和 news.internet 两个频道
client-2> SUBSCRIBE news.it news.internet
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news.it"
3) (integer) 1
1) "subscribe"
2) "news.internet"
3) (integer) 2
# client-3 打印各个频道的订阅者数量
client-3> PUBSUB NUMSUB news.it news.internet news.sport news.music
1) "news.it" # 频道
2) "2" # 订阅该频道的客户端数量
3) "news.internet"
4) "1"
5) "news.sport"
6) "1"
7) "news.music" # 没有任何订阅者
8) "0"

PUBSUB NUMPAT (返回订阅模式的数量)

注意,这个命令返回的不是订阅模式的客户端的数量,而是客户端订阅的所有模式的数量总和。 
复杂度: O(1) 。 
返回值: 一个整数回复( Integer reply)。

# client-1 订阅 news.* 和 discount.* 两个模式
client-1> PSUBSCRIBE news.* discount.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "news.*"
3) (integer) 1
1) "psubscribe"
2) "discount.*"
3) (integer) 2
# client-2 订阅 tweet.* 一个模式
client-2> PSUBSCRIBE tweet.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "tweet.*"
3) (integer) 1
# client-3 返回当前订阅模式的数量为 3
client-3> PUBSUB NUMPAT
(integer) 3
# 注意,当有多个客户端订阅相同的模式时,相同的订阅也被计算在 PUBSUB NUMPAT 之内
# 比如说,再新建一个客户端 client-4 ,让它也订阅 news.* 频道
client-4> PSUBSCRIBE news.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "news.*"
3) (integer) 1
# 这时再计算被订阅模式的数量,就会得到数量为 4
client-3> PUBSUB NUMPAT
(integer) 4
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

PUNSUBSCRIBE (指示客户端退订所有给定模式)

PUNSUBSCRIBE [pattern [pattern …]] 
指示客户端退订所有给定模式。 
如果没有模式被指定,也即是,一个无参数的 PUNSUBSCRIBE 调用被执行,那么客户端使用PSUBSCRIBE 
命令订阅的所有模式都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的模式。 
可用版本: >= 2.0.0 
时间复杂度: O(N+M) ,其中 N 是客户端已订阅的模式的数量,M 则是系统中所有客户端订阅的模式的数量。 
返回值: 这个命令在不同的客户端中有不同的表现。

SUBSCRIBE (订阅给定的一个或多个频道的信息)

订阅给定的一个或多个频道的信息。 
可用版本: >= 2.0.0 
时间复杂度: O(N),其中 N 是订阅的频道的数量。 
返回值: 接收到的信息 (请参见下面的代码说明)。

# 订阅 msg 和 chat_room 两个频道
# 1 - 6 行是执行 subscribe 之后的反馈信息
# 第 7 - 9 行才是接收到的第一条信息
# 第 10 - 12 行是第二条
redis> subscribe msg chat_room
Reading messages... (press Ctrl-C to quit)
1) "subscribe" # 返回值的类型:显示订阅成功
2) "msg" # 订阅的频道名字
3) (integer) 1 # 目前已订阅的频道数量
1) "subscribe"
2) "chat_room"
3) (integer) 2
1) "message" # 返回值的类型:信息
2) "msg" # 来源 (从那个频道发送过来)
3) "hello moto" # 信息内容
1) "message"
2) "chat_room"
3) "testing...haha"

UNSUBSCRIBE (指示客户端退订给定的频道)

UNSUBSCRIBE [channel [channel …]] 
指示客户端退订给定的频道。 
如果没有频道被指定,也即是,一个无参数的 UNSUBSCRIBE 调用被执行,那么客户端使用SUBSCRIBE 命令订阅的所有频道都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的频道。 
可用版本: >= 2.0.0 
时间复杂度: O(N) ,N 是客户端已订阅的频道的数量。 
返回值: 这个命令在不同的客户端中有不同的表现。

参考文档:

http://blog.csdn.net/u011506468/article/details/47337839

http://my.oschina.net/itblog/blog/601284

http://www.oschina.net/code/snippet_584165_52231

http://redis.io/topics/pubsub

时间: 2024-10-14 16:55:18

基于Redis的消息订阅/发布的相关文章

redis的消息订阅发布介绍

1.redis的消息订阅发布: 进程间的一种消息通信模式:发送者(pub)发送信息,订阅者(sub)接收信息. 注: 图1为 三个客户端 client2.client5.client1 通过 subscribe 命令订阅 频道 channel1 ,图二为 当有新消息通过 publish 命令发送给频道 channel1时,这个消息就会被发送给订阅它的三个客户端. 2.消息订阅发布的相关命令: PSUBSCRIBE pattern [pattern...]: 订阅一个或者多个符合给定模式的频道 P

Redis的消息订阅及发布及事务机制

Redis的消息订阅及发布及事务机制 订阅发布 SUBSCRIBE PUBLISH 订阅消息队列及发布消息. # 首先要打开redis-cli shell窗口 一个用于消息发布 一个用于消息订阅 # SUBSCRIBE 订阅一个频道,如果频道不存在 就新增一个 # 返回参数 表示 第一个是命令 第二个是频道名称 第三个表示当前订阅该频道的数量 127.0.0.1:6379> SUBSCRIBE mychannel Reading messages... (press Ctrl-C to quit

Spring基于事件驱动模型的订阅发布模式代码实例详解

代码下载地址:http://www.zuidaima.com/share/1791499571923968.htm 原文:Spring基于事件驱动模型的订阅发布模式代码实例详解 事件驱动模型简介 事件驱动模型也就是我们常说的观察者,或者发布-订阅模型:理解它的几个关键点: 首先是一种对象间的一对多的关系:最简单的如交通信号灯,信号灯是目标(一方),行人注视着信号灯(多方): 当目标发送改变(发布),观察者(订阅者)就可以接收到改变: 观察者如何处理(如行人如何走,是快走/慢走/不走,目标不会管的

关于laravel5 消息订阅/发布的理解初

laravel5.4感觉官网文档说滴不够详细...安装predis官网很详细,这里略过.... 生成命令 直接使用 Artisan 命令 make:command,该命令会在 app/Console/Commands 目录下创建一个新的命令类.如果该目录不存在,不用担心,它将会在你首次运行 Artisan 命令 make:command 时被创建.生成的命令将会包含默认的属性设置以及所有命令都共有的方法, 这里我生成一个RedisSubscribe.php类,执行下面命令: php artisa

【并发】9、借助redis 实现生产消费,消息订阅发布模式队列

这个就是一个消息可以被多次消费的范例了 其实这个实现的方式可以参考我之前的设计模式,观察者模式 https://www.cnblogs.com/cutter-point/p/5249780.html 不过有一点需要注意一下啊,这个消息发布的时候,好像是不支持字节数据的,里面好像会对字节进行转换,这样的结果就是导致我最后无法吧相应的字节转换成我之前序列化的对象 不知道是不是ObjectInputStream和ObjectOutputStream实现不是很好的原因,还是什么,反正反序列化的时候,有些

Java实现Redis的消息订阅和发布

1.  首先需要一个消息监听器类 package com.sogou.baike.testimport.testSubscribe; import redis.clients.jedis.JedisPubSub; /** * Created by denglinjie on 2016/6/29. */ public class RedisMsgPubSubListener extends JedisPubSub { @Override public void unsubscribe() { su

websocket+nodejs+redis实现消息订阅和发布系统

其实我很懒,不想打字,代码已上传到码云,请点此处. 有疑问请一下扫描二维码,加我微信: 原文地址:https://www.cnblogs.com/don-yang/p/8931102.html

C# 数据推送 实时数据推送 轻量级消息订阅发布 多级消息推送 分布式推送

前言 本文将使用一个NuGet公开的组件技术来实现数据订阅推送功能,由服务器进行推送数据,客户端订阅指定的数据后,即可以接收服务器推送过来的数据,包含了自动重连功能,使用非常方便 nuget地址:https://www.nuget.org/packages/HslCommunication/            github地址:https://github.com/dathlin/HslCommunication                                 如果喜欢可以s

消息订阅发布系统Apache Kafka分布式集群环境搭建和简单测试

一.什么是kafka? kafka是LinkedIn开发并开源的一个分布式MQ系统,现在是Apache的一个孵化项目.在它的主页描述kafka为一个高吞吐量的分布式(能将消息分散到不同的节点上)MQ.Kafka仅仅由7000行Scala编写,据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB) 二.kafka的官方网站在哪里? http://kafka.apache.org/ 三.在哪里下载?需要哪些组件的支持? kafka2.9.2在下面的地址可以下载: