如何使用NODEJS+REDIS开发一个消息队列

作者: RobanLee

原创文章,转载请注明: 萝卜李 http://www.robanlee.com

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们>。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。其中较为成熟的MQ产品有IBM WEBSPHERE MQ。

以上介绍仍旧来自百度百科.

消息队列产品对比

目前比较流行的MQ有2种,ActiveMQ 以及 RabbitMQ , RabbitMQ性能号称能够达到每秒10000,而REDIS官方的压力测试值在7-8万之间,而且是去掉了网络IO操作,真实情况我估计在每秒2-3万的并发操作,但这个数目对于一般的应用应该足够了.

Redis如何支持消息队列?

在新版本的redis v2.6以上以及以上版本开始支持 subscribe 以及 publish 操作,  subscribe订阅一个频道,publish可以像频道广播消息. 这个机制最老的应用应该是算是聊天室了.

Sub/Pub 模式固然很好用,但是同样有一个问题,就是如果有多个人订阅了同一频道,而这个频道的数据只能被一个接收方处理,不能够重复处理,这时该怎么办?

解决方法有2种,

1. publish  将数据写入到一个list or sorted list 队列,写完成后开始给终端广播消息,告诉大家,有新的数据等待处理,这个时候,谁能pop到数据,就是谁处理,这个操作是原子性的,也就是说不会被重复处理.

2. 使用阻塞模式, redis提供了blpop brpop这种操作,也就是一直阻塞一个队列,直到有数据来. 这种模式保证了数据的原子性,而且使应用程序可以支持分布式多台机器部署.

Sub/Pub模式 (sub.js):

var redis = require("redis");
var client = redis.createClient(6379, ‘127.0.0.1‘, {connect_timeout: 1});

//订阅一个频道
var sub = function(c) {
    var c = c || ‘roban:test:channel‘;
    client.subscribe(c,function(e){
        console.log(‘starting subscribe channel:‘+c);
    });
};

//订阅一个频道
sub();

//处理错误,如果出现错误,或者服务器断开了链接,等待恢复时,继续订阅这个频道
client.on(‘error‘, function(error) {
    console.log(error);
    sub();
});

//订阅处理函数
client.on(‘message‘,function(err,response){
    console.log(response);
});

打开redis命令行,输入以下命令:

publish roban:test:channel hello

发布这条信息后,sub端会输出以下信息:

Robans-Pro:node robanlee$ node demo.js
starting subscribe channel:roban:test:channel
hello
时间: 2024-11-09 17:25:49

如何使用NODEJS+REDIS开发一个消息队列的相关文章

使用NODEJS+REDIS开发一个消息队列以及定时任务处理

作者:RobanLee 原创文章,转载请注明: 萝卜李 http://www.robanlee.com 源码在这里: https://github.com/robanlee123/RobCron 时间有限,就不详细注释,有问题或者意见欢迎@我,也欢迎大家批评指正. 本文所必须的一些资料如下: 1. NODEJS ==> 可以去NODEJS.ORG下载最新的源码.2. Redis ==> Redis.io3. KUE ==> Nodejs的一个开源队列系统4. NODE-SCHEDULE

Redis实现简单消息队列

任务异步化 打开浏览器,输入地址,按下回车,打开了页面.于是一个HTTP请求(request)就由客户端发送到服务器,服务器处理请求,返回响应(response)内容. 我们每天都在浏览网页,发送大大小小的请求给服务器.有时候,服务器接到了请求,会发现他也需要给另外的服务器发送请求,或者服务器也需要做另外一些事情,于是最初们发送的请求就被阻塞了,也就是要等待服务器完成其他的事情. 更多的时候,服务器做的额外事情,并不需要客户端等待,这时候就可以把这些额外的事情异步去做.从事异步任务的工具有很多.

skynet源码学习 - 从全局队列中弹出/压入一个消息队列过程

学习云风的skynet源码,简单记录下. void skynet_globalmq_push(struct message_queue * queue) { struct global_queue *q= Q; uint32_t tail = GP(__sync_fetch_and_add(&q->tail,1)); // only one thread can set the slot (change q->queue[tail] from NULL to queue) if (!_

基于Redis实现分布式消息队列(汇总目录)

基于Redis实现分布式消息队列(1)– 缘起 http://blog.csdn.net/stationxp/article/details/45595733 基于Redis实现分布式消息队列(2)– 分布式消息队列功能设计 http://blog.csdn.net/stationxp/article/details/45596619 基于Redis实现分布式消息队列(3)– Redis功能分析 http://blog.csdn.net/stationxp/article/details/457

skynet源代码学习 - 从全局队列中弹出/压入一个消息队列过程

学习云风的skynet源代码,简单记录下. void skynet_globalmq_push(struct message_queue * queue) { struct global_queue *q= Q; uint32_t tail = GP(__sync_fetch_and_add(&q->tail,1)); // only one thread can set the slot (change q->queue[tail] from NULL to queue) if (!

使用Redis实现异步消息队列

工作中涉及到redis异步消费,查阅资料,记录下~ 使用Redis实现异步消息队列 https://blog.csdn.net/b540969928/article/details/78406791 异步消息队列 https://blog.csdn.net/qq_33589510/article/details/77126852 原文地址:https://www.cnblogs.com/meixiaoqiu/p/10238227.html

如果让你写一个消息队列,该如何进行架构设计啊?说一下你的思路

1.面试官心里分析 其实聊到这个问题,一般面试官要考察两块: (1)你有没有对某一个消息队列做过较为深入的原理的了解,或者从整体了解把握住一个mq的架构原理 (2)看看你的设计能力,给你一个常见的系统,就是消息队列系统,看看你能不能从全局把握一下整体架构设计,给出一些关键点出来 说实话,我一般面类似问题的时候,大部分人基本都会蒙,因为平时从来没有思考过类似的问题,大多数人就是平时埋头用,从来不去思考背后的一些东西.类似的问题,我经常问的还有,如果让你来设计一个spring框架你会怎么做?如果让你

基于Redis实现分布式消息队列(1)

1.为什么需要消息队列? 当系统中出现"生产"和"消费"的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异. 举个例子:业务系统触发短信发送申请,但短信发送模块速度跟不上,需要将来不及处理的消息暂存一下,缓冲压力. 再举个例子:调远程系统下订单成本较高,且因为网络等因素,不稳定,攒一批一起发送. 再举个栗子,交互模块5:00到24:00和电商系统联通,和内部ERP断开.1:00到4:00和ERP联通,和电商系统断开. 再举个例子,服务员点菜

基于redis的延迟消息队列设计

需求背景 用户下订单成功之后隔20分钟给用户发送上门服务通知短信 订单完成一个小时之后通知用户对上门服务进行评价 业务执行失败之后隔10分钟重试一次 类似的场景比较多 简单的处理方式就是使用定时任务 假如数据比较多的时候 有的数据可能延迟比较严重,而且越来越多的定时业务导致任务调度很繁琐不好管理. 队列设计 目前可以考虑使用rabbitmq来满足需求 但是不打算使用,因为目前太多的业务使用了另外的MQ中间件. 开发前需要考虑的问题? 及时性 消费端能按时收到 同一时间消息的消费权重 可靠性 消息