使用Nodejs实现聊天服务器

两年前在项目中使用nodejs+socket.io+redis实现的聊天和推送服务器,
基本上几百行代码就实现了整个功能,在项目中单服务器单进程可以跑到支持
5000人左右同时在线。

主要思路

  1. 用户上线后,根据用户的userid和socket,保存到一个全局的map中
  2. 发送消息时,根据对方的userid找到对应的socket,通过socket.emit发送消息给对方
  3. 如果对方不在线,在redis中保留消息,在对方上线后推送给用户
  4. 用户下线后,从全局的map中删除对应的用户socket
  5. 由于需要保持长连接,客户端需要定时发心跳给服务端,所以定义了一个心跳消息,可以5分钟发一次
  • 用户上线

    io.sockets.on(‘connection‘, function (socket) {    var address = socket.handshake.address;    console.log(Date() + " New connection from " + address.address + ":" + address.port);    socket.on(‘login‘, function (userinfo) {        userid = userinfo.myAuraId        var address = socket.handshake.address;        var deviceid = userinfo.deviceId        console.log(Date() + " Login from " + address.address + ":" + address.port + " " + userid + " " + deviceid);        old_socket = sockets[userid]        if (old_socket && old_socket.deviceid && deviceid && old_socket.deviceid != deviceid) {            old_socket.relogin = 1            old_socket.emit(‘logout‘)            console.log("logout " + old_socket.userid + " " + old_socket.deviceid)        }
    
            if (old_socket && old_socket != socket) {            old_socket.disconnect()        }
    
            socket.relogin = 0        socket.userid = userid        socket.deviceid = deviceid		//发送离线消息        send_store_msg(socket, userid)
    
            sockets[userid] = socket
    
            //通知业务服务器,用户已登录        pub.publish("login_message_channel", JSON.stringify(userinfo))
    
        })
  • 发送消息
    socket.on(‘chat‘, function (msg, ack) {    //process_msg(msg)    pub.publish("chat_filter_channel", JSON.stringify(msg))    socket.userid = msg.from    sockets[socket.userid] = socket    if (ack) {        ack(1)    }
    
    })
  • 接收和回应心跳
    socket.on(‘hb‘, function (msg, ack) {    if (ack) {        ack(1)    }})
  • 用户下线,删除对应的socket
    socket.on("disconnect", function () {    var address = socket.handshake.address;    console.log(Date() + " Disconnect from " + address.address + ":" + address.port);    if (!socket.relogin) {        delete sockets[socket.userid]    }})

推送服务器

实现了聊天服务器后,对推送来说就很简单了

  1. 在redis里开个channel,业务服务器往这个channel里publish数据
  2. nodejs subscribe这个channel监听数据,找到对应用户发送消息即可。
  3. 用户不在线,可能也需要对这个离线推送消息做保留,具体看业务定义
var notification = redis.createClient()notification.subscribe("notification")

// check redis notifcation channelnotification.on("message", function (pattern, msg) {    var msgobj = JSON.parse(msg)    var keys = msgobj.toWho    var needStore = msgobj.needStore    for (index in keys) {        var key = keys[index]        if (!needStore) {            if (sockets[key]) {                sockets[key].emit("notification", msg)            }        }        else {            var list = []            store.hget("nodejs_notification", key, function (e, v) {                if (v) {                    list = JSON.parse(v);                }                list.push(msg)                var msglist = JSON.stringify(list)                store.hset("nodejs_notification", key, msglist, function (e, r) {                })            })

            if (sockets[key]) {                send_notification(sockets[key], msg)            }

        }    }})

function send_notification(socket, notif) {    socket.emit("notification", notif, function ack() {        store.hdel("nodejs_notification", socket.userid)    })}

客户端Lib库

服务端是使用socket.io实现,基本上socket.io的Lib都能兼容

以下是推荐的两个客户端Lib:

其他语言版本,可以在github搜索socket.io,找到对应的Lib库

Server端全部代码

//var io = require(‘socket.io‘).listen(80)var app = require(‘http‘).createServer(handler)    , io = require(‘socket.io‘).listen(app)

app.listen(80);

function handler(req, res) {    if (req.url == "/monitor") {        res.writeHead(200);        res.end("OK");    }    else {        res.writeHead(404);        res.end();    }}

//io.disable(‘heartbeats‘)//io.set(‘heartbeats‘, false);io.set(‘transports‘, [‘websocket‘, ‘xhr-polling‘]);io.set(‘heartbeat timeout‘, 5 * 60)io.set(‘heartbeat interval‘, 4 * 60)io.set(‘close timeout‘, 1 * 30);io.set("log level", 1)io.set("browser client", false)io.set("browser client cache", false)io.set("browser client cache", false)var redis = require("redis")var pub = redis.createClient()var store = redis.createClient()var snschat = redis.createClient()var notification = redis.createClient()var PUSH_TO_IOS_DELAY_TIME = 120000snschat.subscribe("snschat");notification.subscribe("notification")var sockets = {}

pub.on("error", function (err) {    console.log("Error " + err);});

store.on("error", function (err) {    console.log("Error " + err);});

snschat.on("error", function (err) {    console.log("Error " + err);});

function send_msg_delay(socket) {    store.hget("chat_history", socket.userid, function (e, v) {        if (v) {            list = JSON.parse(v);            if (list.length > 0) {                var msg = JSON.stringify(list)                socket.isSendingChatMessage = false                send_msg(socket, msg)            }        }    })}

function send_msg(socket, msg) {    //delay for 5 sec    if (socket.isSendingChatMessage) {        setTimeout(function () {            send_msg_delay(socket)        }, 5000)        return    }    socket.isSendingChatMessage = true

    //start send    var callSendToIOS = sendToIOSDealy(socket.userid, PUSH_TO_IOS_DELAY_TIME)    socket.emit("chat", msg, function ack(size) {        clearTimeout(callSendToIOS)        store.hget("chat_history", socket.userid, function (e, v) {            if (v) {                list = JSON.parse(v);                //console.log("size="+size)                if (list.length == size) {                    store.hdel("chat_history", socket.userid, function (e, r) {                    })                }                else if (size < list.length) {                    list = list.splice(size)                    var msglist = JSON.stringify(list)                    store.hset("chat_history", socket.userid, msglist, function (e, r) {                    })                }            }            socket.isSendingChatMessage = false

        })    })}

function sendToIOSDealy(toWho, time) {    return setTimeout(function () {        sendToIOS(toWho)    }, time)}

function sendToIOS(toWho) {    var obj = {"toWho": toWho}    var msg = JSON.stringify(obj)    console.log("delay send to ios channel:" + msg)    pub.publish("chat_message_channel", msg)}

function send_notification(socket, notif) {    socket.emit("notification", notif, function ack() {        store.hdel("nodejs_notification", socket.userid)    })}

function send_store_msg(socket, userid) {

    if (socket.isSendStoreMsg) {        return;    }

    socket.isSendingChatMessage = false

    store.hget("chat_history", userid, function (e, msg) {        if (msg) {            send_msg(socket, msg)            store.hdel("chat_history", socket.userid, function (e, r) {            })        }    })

    store.hget("nodejs_notification", userid, function (e, msg) {        if (msg) {            var msglist = JSON.parse(msg)            for (var i = 0; i < msglist.length; i++) {                send_notification(socket, msglist[i])            }            //socket.emit("notification", msg)            //store.hdel("nodejs_notification", userid)        }    })    socket.isSendStoreMsg = true}

function saveToChatHistory(msg) {    var list = []    store.hget("chat_history", msg.to, function (e, v) {        if (v) {            list = JSON.parse(v);        }        list.push(msg)        var msglist = JSON.stringify(list)        store.hset("chat_history", msg.to, msglist, function (e, r) {        })    })}

function pushToChatHistoryChannel(msg) {    var msgStr = JSON.stringify(msg)    pub.publish("chat_message_history_channel", msgStr)}

function process_msg(msg) {    var list = []    store.hget("chat_history", msg.to, function (e, v) {        if (v) {            list = JSON.parse(v);        }        list.push(msg)        var msglist = JSON.stringify(list)        store.hset("chat_history", msg.to, msglist, function (e, r) {        })        if (sockets[msg.to]) {            send_msg(sockets[msg.to], msglist)        }        else {            sendToIOS(msg.to)        }        pushToChatHistoryChannel(msg)    })}

// check redis notifcation channelnotification.on("message", function (pattern, msg) {    var msgobj = JSON.parse(msg)    var keys = msgobj.toWho    var needStore = msgobj.needStore    for (index in keys) {        var key = keys[index]        if (!needStore) {            if (sockets[key]) {                sockets[key].emit("notification", msg)            }        }        else {            var list = []            store.hget("nodejs_notification", key, function (e, v) {                if (v) {                    list = JSON.parse(v);                }                list.push(msg)                var msglist = JSON.stringify(list)                store.hset("nodejs_notification", key, msglist, function (e, r) {                })            })

            if (sockets[key]) {                send_notification(sockets[key], msg)            }

        }    }})

// check redis snschat channelsnschat.on("message", function (pattern, data) {    msg = JSON.parse(data)    process_msg(msg)})

io.sockets.on(‘connection‘, function (socket) {    var address = socket.handshake.address;    console.log(Date() + " New connection from " + address.address + ":" + address.port);    socket.on(‘login‘, function (userinfo) {        userid = userinfo.myAuraIdvar address = socket.handshake.address;var deviceid = userinfo.deviceIdconsole.log(Date() + " Login from " + address.address + ":" + address.port + " " + userid + " " + deviceid);        old_socket = sockets[userid]if (old_socket && old_socket.deviceid && deviceid && old_socket.deviceid != deviceid) {            old_socket.relogin = 1            old_socket.emit(‘logout‘)console.log("logout " + old_socket.userid + " " + old_socket.deviceid)        }

if (old_socket && old_socket != socket) {            old_socket.disconnect()        }

        socket.relogin = 0        socket.userid = userid        socket.deviceid = deviceid

        send_store_msg(socket, userid)

        sockets[userid] = socket        pub.publish("login_message_channel", JSON.stringify(userinfo))

    })

    socket.on(‘geo‘, function (geo, ack) {if (geo.myAuraId) {var now = new Date()            pub.publish("geo", JSON.stringify({                geo: geo, time: now.getTime()            }))            socket.userid = geo.myAuraId            sockets[socket.userid] = socket

if (ack) {                ack(1)                send_store_msg(socket, userid)

            }        }    })

    socket.on(‘chat‘, function (msg, ack) {//process_msg(msg)        pub.publish("chat_filter_channel", JSON.stringify(msg))        socket.userid = msg.from        sockets[socket.userid] = socketif (ack) {            ack(1)        }

    })

    socket.on(‘hb‘, function (msg, ack) {if (ack) {            ack(1)        }    })

    socket.on("disconnect", function () {var address = socket.handshake.address;console.log(Date() + " Disconnect from " + address.address + ":" + address.port);if (!socket.relogin) {delete sockets[socket.userid]        }    })
时间: 2024-12-11 07:26:22

使用Nodejs实现聊天服务器的相关文章

nodejs向远程服务器发送post请求----融云Web SDK/客户端获取token

最近要用到一个叫融云的及时通讯的SDK,在获取token这个步骤的时候有点卡顿,以防以后碰到类似的问题,再此记录一下. 客户端通过融云 SDK 每次连接服务器时,都需要向服务器提供 Token,以便验证身份,流程如下: 流程如下: 1.客户端获取用户id,并向服务器请求token(注意这里的服务器不是融云的服务器,而是客户端的服务端) 2.客户端的服务端接收到token请求后,向融云的服务器请求token 3.融云服务器接受到token请求,返回token给客户端的服务端. 4.客户端的服务端接

Nodejs创建TCP服务器

Nodejs创建TCP服务器 1.Nodejs提供了net模块给我们,所以我们创建TCP服务器很简单: 1 require('net').createServer(function(socket) { 2 // new connection 3 socket.on('data', function(data) { 4 // got data 5 }); 6 socket.on('end', function(data) { 7 // connection closed 8 }); 9 socke

使用GoWorld游戏服务器引擎轻松实现分布式聊天服务器

GoWorld游戏服务器引擎简介 GoWorld是一款开源的分布式可扩展的游戏服务器引擎,使用Go语言(Golang)编写.它采用类似BigWorld的结构,使用了简化的场景-对象框架.以一个典型的MMORPG为例,每个服务器上会有多个场景,每个场景里可以包含多个对象,这些对象包括玩家.NPC.怪物等.GoWorld服务器可以将场景分配到在不同的进程甚至不同的机器上,从而使得游戏服务器的负载是可扩展的. 开源分布式游戏服务器引擎:https://github.com/xiaonanln/gowo

通过python 构建一个简单的聊天服务器

构建一个 Python 聊天服务器 一个简单的聊天服务器 现在您已经了解了 Python 中基本的网络 API:接下来可以在一个简单的应用程序中应用这些知识了.在本节中,将构建一个简单的聊天服务器.使用 Telnet,客户机可以连接到 Python 聊天服务器上,并在全球范围内相互进行通信.提交到聊天服务器的消息可以由其他人进行查看(以及一些管理信息,例如客户机加入或离开聊天服务器).这个模型如图 1 所示. 图 1. 聊天服务器使用 select 方法来支持任意多个客户机 聊天服务器的一个重要

Nodejs创建HTTPS服务器

Nodejs创建HTTPS服务器 从零开始nodejs系列文章,将介绍如何利Javascript做为服务端脚本,通过Nodejs框架web开发.Nodejs框架是基于V8的引擎,是目前速度最快的Javascript引擎.chrome浏览器就基于V8,同时打开20-30个网页都很流畅.Nodejs标准的web开发框架Express,可以帮助我们迅速建立web站点,比起PHP的开发效率更高,而且学习曲线更低.非常适合小型网站,个性化网站,我们自己的Geek网站!! 关于作者 张丹(Conan), 程

使用ServerSocket建立聊天服务器(二)

-------------siwuxie095 工程名:TestMyServerSocket 包名:com.siwuxie095.socket 类名:MyServerSocket.java(主类).ServerListener.java.ChatSocket.java.ChatManager.java 工程结构目录如下: MyServerSocket.java(主类): package com.siwuxie095.socket; /** * 聊天服务器,不仅能向客户端发送数据,也能从客户端读取

定制的Server-Sent Events 聊天服务器

1 //匿名聊天服务器 2 //将新的消息POST到/chat地址,或者以GET形式从通一个URL获取文本或事件流 3 //创建一个GET请求到"/"来返回一个简单的HTML文件,这个文件包括客户端聊天UI 4 5 var http = require('http'); 6 7 //聊天客户端使用的HTML文件 8 var clientUI = require('fs').readFileSync("chatClient.html"); 9 var emulatio

7.持久聊天服务器

服务器准备部分: https://technet.microsoft.com/zh-cn/library/dn951388 https://technet.microsoft.com/zh-cn/library/gg398495 1.服务器基础配置和软件同标准版前端 2.安装 消息队列 3.标准版前端可以并置持久聊天服务器,企业版不可以并置 4.持久聊天服务器的数据存储和合规数据存储,需要SQL数据库支持,测试可以使用后端数据库(RTC实例), 持久聊天服务器角色安装部分: 1.使用拓扑生成器添

27.app后端搭建聊天服务器的经历

现在,聊天功能已经成了社交app的标配了.但是,众多web开发出生的程序员对聊天相关的服务的不了解,带来了很多开发上的困扰.在这篇文章中,根据下面3个方面,谈谈聊天服务. 1.      聊天服务的技术选型 2.      开发社交app中,实现聊天服务踩过的坑 3.      那些著名app的聊天服务 1. 聊天服务的技术选型 需要开发聊天服务,首先要选择用到的协议,现在,常用的聊天协议有: (1)      xmpp,一个基于xml的消息协议,被广泛应用于Gtalk,Facebook,但缺点