pomelo的push机制(channel)和源码解读

原文来自:http://blog.csdn.net/xufeng0991/article/details/45499665

尊重原创,转载请注明出处,谢谢!

pomelo是网易开源的服务器架构,通讯类型分为四种: request, response, notify, push。

前面两种都可是使用pomelo.request实现,notify由pomelo.notify实现, 这里只看push是如何实现的。

一 ChannelService

ChannelService是由pomelo默认加载组件channel创建的。 可以使用如下方法获得:

app.get('channelService');

ChannelService中提供了以下几个常用的方法:

1 createChannel(name);

name: channel名

创建一个指定名称的channel。

ChannelService.prototype.createChannel = function(name) {
    if (this.channels[name]) {
        return this.channels[name];
    }

    var c = new Channel(name, this);
    this.channels[name] = c;
    return c;
};

2 getChannel(name, create);

name: channel名

create: 如果不存在, 是否创建

获取一个指定用户名的channel, 如果create为true, 不存在会创建一个。

ChannelService.prototype.getChannel = function(name, create) {
    var channel = this.channels[name];
    if (!channel && !!create) {
        channel = this.channels[name] = new Channel(name, this);
    }
    return channel;
};

3 destroyChannel(name);

name: channel名

使用name, 销毁一个指定的channel。

ChannelService.prototype.destroyChannel = function(name) {
    delete this.channels[name];
};

4 pushMessageByUids(route, msg, uids, opts, cb);

route: 前端消息监听方法的路由

msg: 发送给前端的消息内容

uids: 消息接收者 {

uid: userId,

sid: frontendServerId

},

uid为session.bind(uid);

指定的

opts: 可选的自定义参数

cb: callback方法

给指定uid推送消息。

ChannelService.prototype.pushMessageByUids = function(route, msg, uids, opts, cb) {
    if (typeof route !== 'string') {
        cb = opts;
        opts = uids;
        uids = msg;
        msg = route;
        route = msg.route;
    }

    if (!cb && typeof opts === 'function') {
        cb = opts;
        opts = {};
    }

    if (!uids || uids.length === 0) {
        utils.invokeCallback(cb, new Error('uids should not be empty'));
        return;
    }
    var groups = {},
        record;
    for (var i = 0, l = uids.length; i < l; i++) {
        record = uids[i];
        add(record.uid, record.sid, groups);
    }

    sendMessageByGroup(this, route, msg, groups, opts, cb);
};

5 broadcast(stype, route, msg, opts, cb)

stype: 前端服务器类型

route: 前端消息监听方法的路由

msg: 发送给前端的消息内容

opts: 可选的自定义参数

cb: callback方法

给指定服务器上所有的链接者推送消息。

ChannelService.prototype.broadcast = function(stype, route, msg, opts, cb) {
    var app = this.app;
    var namespace = 'sys';
    var service = 'channelRemote';
    var method = 'broadcast';
    var servers = app.getServersByType(stype);

    if (!servers || servers.length === 0) {
        // server list is empty
        utils.invokeCallback(cb);
        return;
    }

    var count = servers.length;
    var successFlag = false;

    var latch = countDownLatch.createCountDownLatch(count, function() {
        if (!successFlag) {
            utils.invokeCallback(cb, new Error('broadcast fails'));
            return;
        }
        utils.invokeCallback(cb, null);
    });

    var genCB = function() {
        return function(err) {
            if (err) {
                logger.error('[broadcast] fail to push message, err:' + err.stack);
                latch.done();
                return;
            }
            successFlag = true;
            latch.done();
        };
    };

    opts = {
        type: 'broadcast',
        userOptions: opts || {}
    };

    // for compatiblity
    opts.isBroadcast = true;
    if (opts.userOptions) {
        opts.binded = opts.userOptions.binded;
        opts.filterParam = opts.userOptions.filterParam;
    }

    for (var i = 0, l = count; i < l; i++) {
        app.rpcInvoke(servers[i].id, {
            namespace: namespace,
            service: service,
            method: method,
            args: [route, msg, opts]
        }, genCB());
    }
};

二 Channel

上面是ChannelService对象提供的一些操作channel的方法, 下面是Channel对象的方法。

1 add(uid, sid)

uid: 前端连接的uid

sid: 前端连接到的服务器id

将uid添加到channel中。

Channel.prototype.add = function(uid, sid) {
    if (this.state > ST_INITED) {
        return false;
    } else {
        var res = add(uid, sid, this.groups);
        if (res) {
            this.records[uid] = {
                sid: sid,
                uid: uid
            };
        }
        return res;
    }
};

2 leave(uid, sid)

uid: 用户的uid

sid: 前端连接到的服务器id

将uid从channel中移除。

Channel.prototype.leave = function(uid, sid) {
    if (!uid || !sid) {
        return false;
    }
    delete this.records[uid];
    var res = deleteFrom(uid, sid, this.groups[sid]);
    if (this.groups[sid] && this.groups[sid].length === 0) {
        delete this.groups[sid];
    }
    return res;
};

3 getMembers()

获取当前channel中所有的用户。

Channel.prototype.getMembers = function() {
    var res = [],
        groups = this.groups;
    var group, i, l;
    for (var sid in groups) {
        group = groups[sid];
        for (i = 0, l = group.length; i < l; i++) {
            res.push(group[i]);
        }
    }
    return res;
};

4 getMember(uid)

uid: 用户的uid

获取指定用户的uid。

Channel.prototype.getMember = function(uid) {
    return this.records[uid];
};

5 destroy()

销毁channel。

Channel.prototype.destroy = function() {
    this.state = ST_DESTROYED;
    this.__channelService__.destroyChannel(this.name);
};

6 pushMessage(route, msg, opts, cb)

route: 前端消息监听方法的路由

msg: 发送给前端的消息内容

opts: 可选的自定义参数

cb: callback方法

给当前channel中所有的用户推送消息。

Channel.prototype.pushMessage = function(route, msg, opts, cb) {
    if (this.state !== ST_INITED) {
        utils.invokeCallback(new Error('channel is not running now'));
        return;
    }

    if (typeof route !== 'string') {
        cb = opts;
        opts = msg;
        msg = route;
        route = msg.route;
    }

    if (!cb && typeof opts === 'function') {
        cb = opts;
        opts = {};
    }

    sendMessageByGroup(this.__channelService__, route, msg, this.groups, opts, cb);
};

三 小结

从上面提供的这些方法可以看出, 可以有两种方式实现推送:

1 匿名Channel

就是不需要创建Channel, 直接使用channelService.pushMessageByUids和channelService.broadcast推送;

示例:

var uidArray = new Array();
uidObject.uid = "session uid";
uidObject.sid = "connector-server-1";
uidArray.push(uidObject);

channelService.pushMessageByUids('onMsg', {
    msg: msg
}, uidArray, function(err) {
    if (err) {
        console.log(err);
        return;
    }
});

channelService.broadcast('connector', 'onMsg', msg, {
    binded: true
}, function(err) {
    if (err) {
        console.log(err);
    }
});

2 显式Channel

需要使用channel.createChannel或channel.getChannel先获得一个Channel, 然后使用Chanel.pushMessage推送。

示例:

//创建Channel
var channelName = 'allPushChannel';
var channel = this.channelService.getChannel

//把用户添加到channel 里面
if (!!channel) {
    channel.add(uid, sid);
}

//根据Channel名字推送消息
var channelName = 'allPushChannel';
var pushChannel = this.channelService.getChannel(channelName, false);
pushChannel.pushMessage('onMsg', {
    msg: msg
}, function(err) {
    if (err) {
        console.log(err);
    } else {
        console.log('push ok');
    }
});

参考

官方API: http: //pomelo.netease.com/api.html

cnodejs: https: //cnodejs.org/topic/51b531bef78196a85c4f0c89

时间: 2024-11-09 02:13:19

pomelo的push机制(channel)和源码解读的相关文章

《Netty5.0架构剖析和源码解读》【PDF】下载

<Netty5.0架构剖析和源码解读>[PDF]下载链接: https://u253469.pipipan.com/fs/253469-230062545 内容简介 Netty 是个异步的事件驱动网络应用框架,具有高性能.高扩展性等特性.Netty提供了统一的底层协议接口,使得开发 者从底层的网络协议 (比如 TCP/IP. UDP) 中解脱出来. 就使用来说, 开发者只要参考Netty 提供的若干例子和它的指南文档,就可以放手开发基于 Netty 的服务端程序了. 在Java 社区,最知名的

seaJs原理分析和源码解读(上)

seaJs原理解读和源码分析(上)

Mybatis(四):MyBatis核心组件介绍原理解析和源码解读

Mybatis核心成员 Configuration        MyBatis所有的配置信息都保存在Configuration对象之中,配置文件中的大部分配置都会存储到该类中 SqlSession            作为MyBatis工作的主要顶层API,表示和数据库交互时的会话,完成必要数据库增删改查功能 Executor               MyBatis执行器,是MyBatis 调度的核心,负责SQL语句的生成和查询缓存的维护 StatementHandler 封装了JDBC

Selenium系列(十) - 针对Select下拉框的操作和源码解读

如果你还想从头学起Selenium,可以看看这个系列的文章哦! https://www.cnblogs.com/poloyy/category/1680176.html 其次,如果你不懂前端基础知识,需要自己去补充哦,博主暂时没有总结(虽然我也会,所以我学selenium就不用复习前端了哈哈哈...) 首先,将下面html代码保存到一个文件中 后续的代码小案例都是访问此html的<!DOCTYPE html> <html lang="en"> <head&

蚂蚁金服分布式链路跟踪组件 SOFATracer 数据上报机制和源码分析 | 剖析

2019新春支付宝红包技术大揭秘在线峰会将于03-07日开始,点击这里报名届时即可参与大牛互动. SOFAScalable Open Financial Architecture 是蚂蚁金服自主研发的金融级分布式中间件,包含了构建金融级云原生架构所需的各个组件,是在金融场景里锤炼出来的最佳实践.SOFATracer 是一个用于分布式系统调用跟踪的组件,通过统一的 TraceId 将调用链路中的各种网络调用情况以日志的方式记录下来,以达到透视化网络调用的目的,这些链路数据可用于故障的快速发现,服务

AFNetworking 3.0 源码解读 总结

终于写完了 AFNetworking 的源码解读.这一过程耗时数天.当我回过头又重头到尾的读了一篇,又有所收获.不禁让我想起了当初上学时的种种情景.我们应该对知识进行反复的记忆和理解.下边是我总结的 AFNetworking 中能够学到的知识点. 1.枚举(enum) 使用原则:当满足一个有限的并具有统一主题的集合的时候,我们就考虑使用枚举.这在很多框架中都验证了这个原则.最重要的是能够增加程序的可读性. 示例代码: /** * 网络类型 (需要封装为一个自己的枚举) */ typedef NS

15分钟带你了解前端工程师必知的javascript设计模式(附详细思维导图和源码)

15分钟带你了解前端工程师必知的javascript设计模式(附详细思维导图和源码) 前言 设计模式是一个程序员进阶高级的必备技巧,也是评判一个工程师工作经验和能力的试金石.设计模式是程序员多年工作经验的凝练和总结,能更大限度的优化代码以及对已有代码的合理重构.作为一名合格的前端工程师,学习设计模式是对自己工作经验的另一种方式的总结和反思,也是开发高质量,高可维护性,可扩展性代码的重要手段. 我们所熟知的金典的几大框架,比如jquery, react, vue内部也大量应用了设计模式, 比如观察

Gson 源码解读

开源库地址:https://github.com/google/gson 解读版本:2.7 Gson是一个可以用来将Java对象转换为JSON字符串的Java库.当然,它也可以把JSON字符串转换为等价的Java对象.网上已经有了不少可将Java对象转换成JSON的开源项目.但是,大多数都要求你在Java类中加入注解,如果你无法修改源码的话就比较坑爹了,此外大多数开源库并没有对泛型提供完全的支持.于是,Gson在这两个重要的设计目标下诞生了.Gson可以作用于任意的Java对象(包括接触不到源码

seajs 源码解读

之前面试时老问一个问题seajs 是怎么加载js 文件的 在网上找一些资料,觉得这个写的不错就转载了,记录一下,也学习一下 seajs 简单介绍 seajs是前端应用模块化开发的一种很好的解决方案.对于多人协作开发的.复杂庞大的前端项目尤其有用.简单的介绍不多说,大家可以到seajs的官网seajs.org参看介绍.本文主要简单地解读一下seajs的源码和模块化原理.如果有描述不实的地方,希望大家指正和交流.注:本文的解析是基于seajs的2.2.1版本. 目录结构 解压seajs之后的src目