使用NODEJS+REDIS开发一个消息队列以及定时任务处理

作者:RobanLee

原创文章,转载请注明: 萝卜李 http://www.robanlee.com

源码在这里: https://github.com/robanlee123/RobCron

时间有限,就不详细注释,有问题或者意见欢迎@我,也欢迎大家批评指正.

本文所必须的一些资料如下:

1. NODEJS ==> 可以去NODEJS.ORG下载最新的源码.
2. Redis ==> Redis.io
3. KUE ==> Nodejs的一个开源队列系统
4. NODE-SCHEDULE ==> NODEJS 一个开源调度系统

废话不多说,先来介绍任务的流程:

1. NODEJS或者PHP或者其他语言 写入REDIS 一个计划任务, 比如每分钟做某件事,这里就用SAYHELLO来代替好了
2. 使用NODEJS读取这个任务,并将它转换为NODE的调度任务(node-schedule 来完成)
3. 调度器[node-schedule]根据设定的规则来分发任务.
4. KUE接受任务,并且加入列队,执行.
5. DONE

STEP1: 创建一个任务

/**
 * Add task
 * @author [email protected]
 */

//加载函数,集中加载一些LIB,这个源码请参照最后的附属文件
var loader = require(‘./loader‘);  

function addTask(opts){
        new loader(this); 
        
        //默认设置
        this.opts = {
                keyIDs:‘schedule:job:ids‘,
                keyLists:‘schedule:job:list‘,
                keyJob:‘schedule:job:‘
        }
        
        //合并配置,类似JQUERY: extend
        this.mergeParams(opts);  
};

//Merge options
addTask.prototype.mergeParams = function ( param ){
        if(undefined === this.opts ) {
                return false;
        }
        
        for(var x in param) { 
                if(param[x] != undefined && ‘‘ != param[x]) {
                        this.opts[x] = param[x];
                }
        }
};

//添加数据方法
addTask.prototype.pushData = function ( data ){ 
        if(undefined == data ) {
                console.log(‘--ERROR:data is null‘);
                return false;
        }
        this.getIncr.call(this,function(response,obj){
                var id = response;
                obj.redisClient.rpush(obj.opts.keyLists,id,function(err,response){
                        if(err) throw err;
                });
         
                data.id = id;
                var m = obj.redisClient.multi();
                for(var x in data) {
                        m.hset( obj.opts.keyJob+id,x,data[x] );
                }
                
                m.exec(function(err,response){
                        if(err) throw err; 
                        console.log(‘[info] Task: [‘+data.name+‘] has been set successful!‘);
                });   
               
        }); 
};

//获取REDIS目前的自增ID
addTask.prototype.getIncr = function (callBack){
        var obj = this; 
        this.redisClient.incr(this.opts.keyIDs,function(err,response){
                console.log("[info] Current id is : " + response);
                callBack(response, obj);
        });
};

加载这个lib 写入一个DEMO:

var data = { 
        ‘name‘:‘taskDemo‘,
        ‘created‘:Date.now(),
        ‘state‘:1,
        ‘type‘:‘untitled‘,
        ‘rule‘:‘*/1 * * * *‘  //这个任务规则可以为CRONTAB的规则,这个表示每分钟执行一次
};

var job = new addTask();
job.pushData(data);

执行这个脚本,如果一切正常,你会看到如下信息:

NODEJS 输出:

REDIS:

接下来就是获取数据,并且转换为调度任务了,

源码:

var loader = require(‘./loader‘);
var taskLog = require("./TaskLog");

function scheduleTask(){
        new loader(this); 
        this.opts = {
                keyIDs:‘schedule:job:ids‘,
                keyLists:‘schedule:job:list‘,
                keyJob:‘schedule:job:‘
        }
        
        this.task = {
                taskDemo:undefined
        };
        
        //监听取消任务操作
        this.listenCancel();
};

scheduleTask.prototype.setScheduleTask = function (data,obj){ 
        this.task[data.name] =  this.libs[‘node-schedule‘].scheduleJob(data.rule,function(){
                obj.setQueue(data);
                console.log(‘[info] Task :‘ + data.name + ‘ has been set in queue!‘);
        });
         
};

scheduleTask.prototype.setQueue = function (datas){
        
        var jobs = this.libs.kue.createQueue(); 
        jobs.create(datas.name,{
                ‘name:‘:datas.name,
                ‘state‘:1
        }).save();
         
        console.log("[info] Task ["+datas.name+"] has been queued!");
        
        this.setLog(datas);
};

scheduleTask.prototype.setLog = function (responseData){
        var logData = {
                jobid:responseData.id,
                name:responseData.name,
                result:1
        };
                                        
        new taskLog(logData);
        console.log("[info] Task has been loged");
};

scheduleTask.prototype.getJob = function (){
        this.getJobIndex.call(this,function(response,obj){
                for(var x in response ) {
                        obj.redisClient.hgetall(obj.opts.keyJob+response[x],function(err,data){
                                console.log("[info] Task:["+data.name+"] has been loaded!");
                                obj.setScheduleTask(data, obj);
                        });
                }
        });
};

scheduleTask.prototype.getJobIndex = function(callBack){
        //Read tasks from <list schedule:job:list>
        var o = this;
        this.redisClient.lrange(this.opts.keyLists,0,-1,function(err,response){
                if(err) throw err;
                callBack(response, o);
        });
};

scheduleTask.prototype.listenCancel = function (){
        var job = this.libs.kue.createQueue();
        var that = this;

        job.process(‘cancelJob‘,function(data,done){  
                that.task[data.data.data].cancel();
                console.log(‘[info] Task: ‘+data.data.data + ‘ has been canceled‘) ;
                done();
        });
}

执行代码:

var x = new scheduleTask();
x.getJob();

等待一分钟后,NODEJS控制台会输出(这个任务在没有取消的情况下,每分钟都会执行):

第二分钟:

REDIS 现在的数据:

这个数据中增加了KUE的一些任务, q:job:[]:inactive 这个就标识任务还未被执行,执行后的任务状态有

complete active failed delay 四种

至此,就只剩下执行任务的步骤了

var loader = require(‘./loader‘);

function execTask(){
        new loader(this); 
        
        var job = this.libs.kue.createQueue();
        job.process(‘taskDemo‘,function(data,done){
                console.log(‘[info] Task:‘+data.type+‘#‘+data.id+‘ has been executed successful!‘);
                
                
                //DONE之前可以做你想要做的事情
                
                done(); //千万别忘记调用此方法
        });
        
        
}

//添加一个取消定时任务的KUE任务
execTask.prototype.addCancelJob = function (){
        var job =this.libs.kue.createQueue();
        job.create(‘cancelJob‘, {data:‘taskDemo‘}).save();
        console.log(‘[info] Task: cancelJob has been send!‘);
}

执行这个脚本:

var et = new execTask();

//取消定时任务
et.addCancelJob();

执行后会有2个结果

1. 程序会执行当前列队里的任务.

2. 定时任务会被取消,下一分钟后任务不会再由SCHEDULE分配

任务执行结果:

取消任务的回应:

注意最后一行…

时间: 2024-10-10 06:17:02

使用NODEJS+REDIS开发一个消息队列以及定时任务处理的相关文章

如何使用NODEJS+REDIS开发一个消息队列

作者: RobanLee 原创文章,转载请注明: 萝卜李 http://www.robanlee.com MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们>.消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过 队列来通信.队列的使用除去了接收和发送应用程序同时执行的要求.其

Redis实现简单消息队列

任务异步化 打开浏览器,输入地址,按下回车,打开了页面.于是一个HTTP请求(request)就由客户端发送到服务器,服务器处理请求,返回响应(response)内容. 我们每天都在浏览网页,发送大大小小的请求给服务器.有时候,服务器接到了请求,会发现他也需要给另外的服务器发送请求,或者服务器也需要做另外一些事情,于是最初们发送的请求就被阻塞了,也就是要等待服务器完成其他的事情. 更多的时候,服务器做的额外事情,并不需要客户端等待,这时候就可以把这些额外的事情异步去做.从事异步任务的工具有很多.

skynet源码学习 - 从全局队列中弹出/压入一个消息队列过程

学习云风的skynet源码,简单记录下. void skynet_globalmq_push(struct message_queue * queue) { struct global_queue *q= Q; uint32_t tail = GP(__sync_fetch_and_add(&q->tail,1)); // only one thread can set the slot (change q->queue[tail] from NULL to queue) if (!_

基于Redis实现分布式消息队列(汇总目录)

基于Redis实现分布式消息队列(1)– 缘起 http://blog.csdn.net/stationxp/article/details/45595733 基于Redis实现分布式消息队列(2)– 分布式消息队列功能设计 http://blog.csdn.net/stationxp/article/details/45596619 基于Redis实现分布式消息队列(3)– Redis功能分析 http://blog.csdn.net/stationxp/article/details/457

skynet源代码学习 - 从全局队列中弹出/压入一个消息队列过程

学习云风的skynet源代码,简单记录下. void skynet_globalmq_push(struct message_queue * queue) { struct global_queue *q= Q; uint32_t tail = GP(__sync_fetch_and_add(&q->tail,1)); // only one thread can set the slot (change q->queue[tail] from NULL to queue) if (!

使用Redis实现异步消息队列

工作中涉及到redis异步消费,查阅资料,记录下~ 使用Redis实现异步消息队列 https://blog.csdn.net/b540969928/article/details/78406791 异步消息队列 https://blog.csdn.net/qq_33589510/article/details/77126852 原文地址:https://www.cnblogs.com/meixiaoqiu/p/10238227.html

如果让你写一个消息队列,该如何进行架构设计啊?说一下你的思路

1.面试官心里分析 其实聊到这个问题,一般面试官要考察两块: (1)你有没有对某一个消息队列做过较为深入的原理的了解,或者从整体了解把握住一个mq的架构原理 (2)看看你的设计能力,给你一个常见的系统,就是消息队列系统,看看你能不能从全局把握一下整体架构设计,给出一些关键点出来 说实话,我一般面类似问题的时候,大部分人基本都会蒙,因为平时从来没有思考过类似的问题,大多数人就是平时埋头用,从来不去思考背后的一些东西.类似的问题,我经常问的还有,如果让你来设计一个spring框架你会怎么做?如果让你

基于Redis实现分布式消息队列(1)

1.为什么需要消息队列? 当系统中出现"生产"和"消费"的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异. 举个例子:业务系统触发短信发送申请,但短信发送模块速度跟不上,需要将来不及处理的消息暂存一下,缓冲压力. 再举个例子:调远程系统下订单成本较高,且因为网络等因素,不稳定,攒一批一起发送. 再举个栗子,交互模块5:00到24:00和电商系统联通,和内部ERP断开.1:00到4:00和ERP联通,和电商系统断开. 再举个例子,服务员点菜

基于redis的延迟消息队列设计

需求背景 用户下订单成功之后隔20分钟给用户发送上门服务通知短信 订单完成一个小时之后通知用户对上门服务进行评价 业务执行失败之后隔10分钟重试一次 类似的场景比较多 简单的处理方式就是使用定时任务 假如数据比较多的时候 有的数据可能延迟比较严重,而且越来越多的定时业务导致任务调度很繁琐不好管理. 队列设计 目前可以考虑使用rabbitmq来满足需求 但是不打算使用,因为目前太多的业务使用了另外的MQ中间件. 开发前需要考虑的问题? 及时性 消费端能按时收到 同一时间消息的消费权重 可靠性 消息