如何实现从 Redis 中订阅消息转发到 WebSocket 客户端

PHP 的redis扩展是阻塞式 IO ,使用订阅/发布模式时,会导致整个进程进入阻塞。因此必须使用Swoole\Redis异步客户端来实现。

实例代码

 1 $server = new swoole_websocket_server("0.0.0.0", 9501);
 2
 3 $server->on(‘workerStart‘, function ($server, $workerId) {
 4     $client = new swoole_redis;
 5     $client->on(‘message‘, function (swoole_redis $client, $result) use ($server) {
 6         if ($result[0] == ‘message‘) {
 7             foreach($server->connections as $fd) {
 8                 $server->push($fd, $result[1]);
 9             }
10         }
11     });
12     $client->connect(‘127.0.0.1‘, 6379, function (swoole_redis $client, $result) {
13         $client->subscribe(‘msg_0‘);
14     });
15 });
16
17 $server->on(‘open‘, function ($server, $request) {
18
19 });
20
21 $server->on(‘message‘, function (swoole_websocket_server $server, $frame) {
22     $server->push($frame->fd, "hello");
23 });
24
25 $server->on(‘close‘, function ($serv, $fd) {
26
27 });
28
29 $server->start();

实现过程

  • 在进程启动(onWorkerStart)时创建了Swoole\Redis客户端,连接到Redis服务器
  • 连接成功后,订阅msg_0主题的消息
  • 当有新的message时,Swoole\Redis会触发onMessage事件回调
  • 在这个回调函数中使用$server->connections遍历服务器所有的连接,发送消息
时间: 2024-10-22 12:22:59

如何实现从 Redis 中订阅消息转发到 WebSocket 客户端的相关文章

关于OC中消息转发机制的理解以及在项目中的实际应用

<span style="font-family: Arial, Helvetica, sans-serif; background-color: rgb(255, 255, 255);">关于OC中的消息转发机制想必大家都很了解,现在来温习一下:</span> 一.什么是消息转发? @selector 是什么? 1一种类型 SEL 2代表你要发送的消息(方法), 跟字符串有点像, 也可以互转.: NSSelectorFromString()   /   NSS

ios底层开发消息机制(四)消息转发

消息转发 若想令类能理解某条消息,我们必须以程序码实现出对应的方法才行.但是,在编译期向类发送了其无法解读的消息并不会报错,因为在运行期可以继续向类中添加方法,所以编译器在编译时还无法确知类中到底会不会有某个方法实现.当对象接收到无法解读的消息后,就会启动“消息转发”(message forwarding)机制,程序员可经由此过程告诉对象应该如何处理未知消息. 你可能早就遇到过经由消息转发流程所处理的消息了,只是未加留意.如果在控制台中看到下面这种提示信息,那就说明你曾向某个对象发送过一条其无法

《Effective Objective-C 2.0》—(第11-14条)—运行时动态绑定、objc_msgSend、消息转发机制

第11条:理解objc_msgSend的作用 在对象上调用方法是OC中经常使用的功能.用OC术语来说这叫做:"传递消息"(pass a message).消息有"名称"(name)或者"选择子"(selector),可以接收参数,而且可能还有返回值. 由于OC是C的超集,所以最好理解C语言的函数调用方式.C语言使用"静态绑定",就是说在编译期就能决定运行时所应调用的函数.以下列代码为例: #import <stdio.h

runtime之消息转发

前言 在上一篇文章中我们初尝了runtime的黑魔法,可以在程序编译阶段就获取到成员变量的名字,特性以及动态的给对象增加属性等等,在接下来中我们进一步了解OC的消息发送机制.如果之前没接触过runtime的同学建议先看看:上一篇<runtime之玩转成员变量> OC的消息发送机制是早有耳闻,鉴于自己一直觉得是很底层的东西需要花大量的时候去学习研究它所以一直都是蠢蠢欲动.同样不做过多铺垫,直接切入吧.当我们使用OC对象调用一个方法的时候,比如这样:[lisi  sayHello];  程序运行的

redis中使用java脚本实现分布式锁

转载于:http://www.itxuexiwang.com/a/shujukujishu/redis/2016/0216/115.html?1455860390 edis被大量用在分布式的环境中,自然而然分布式环境下的锁如何解决,立马成为一个问题.例如我们当前的手游项目,服务器端是按业务模块划分服务器的,有应用服,战斗服等,但是这两个vm都有可能同时改变玩家的属性,这如果在同一个vm下面,就很容易加锁,但如果在分布式环境下就没那么容易了,当然利用redis现有的功能也有解决办法,比如redis

开源的.NET发布-订阅消息服务器Laharsub

Laharsub是一种开源的.NET发布-订阅消息服务器,用于实时的web应用程序,像聊天.在线写作.新闻或者股票交易更新等等. Laharsub是一种构建在三层架构之上的发布-订阅消息服务器: 前端--客户端,中间层--web服务,后端--带有发布-订阅功能和存储能力的系统. 客户端一般是浏览器,但是可以是所有已知能够做出HTTP请求的程序. 中间层是一种WCF的HTTP服务,它会从客户端接收消息,并向其发送消息,而后端会包含真正的与消息相关的逻辑. 客户端可以创建主题,并通过RESTful

redis中的发布订阅(Pub/Sub)

这里使用nodejs的redis模块说明,具体可见https://www.npmjs.com/package/redis,先来通过一个简单的例子了解下redis中的Pub/Sub具体怎么实现吧.. var express = require('express'); var router = express.Router(); var redis = require("redis"); /* GET home page. */ router.get('/', function(req,

Sprint Boot如何基于Redis发布订阅实现异步消息系统的同步调用?

前言 在很多互联网应用系统中,请求处理异步化是提升系统性能一种常用的手段,而基于消息系统的异步处理由于具备高可靠性.高吞吐量的特点,因而在并发请求量比较高的互联网系统中被广泛应用.与此同时,这种方案也带来了调用链路处理上的问题,因为大部分应用请求都会要求同步响应实时处理结果,而由于请求的处理过程已经通过消息异步解耦,所以整个调用链路就变成了异步链路,此时请求链路的发起者如何同步拿到响应结果,就需要进行额外的系统设计考虑. 为了更清晰地理解这个问题,小码哥以最近正在做的共享单车的IOT系统为例,给

RedisRepository封装—Redis发布订阅以及StackExchange.Redis中的使用

本文版权归博客园和作者本人吴双共同所有,转载请注明本Redis系列分享地址.http://www.cnblogs.com/tdws/tag/NoSql/ Redis Pub/Sub模式 基本介绍 Redis发布订阅—Pub/Sub模式或者说是观察者模式.我想大家即使没有使用过,也已经耳熟能详了. 先简单举例说明下应用场景,在场景中我们可以分析到其优势在哪. 比如你的线上应用应用,你想设置一个日志报警系统,当应用出现异常的时候,立马发送通知给你,可能是短信的形式,也可能是邮件的形式.当然如果只将报