从零开始开发IM(即时通讯)服务端(二)

好消息:IM1.0.0版本已经上线啦,支持特性

  • 私聊发送文本/文件
  • 已发送/已送达/已读回执
  • 支持使用ldap登录
  • 支持接入外部的登录认证系统
  • 提供客户端jar包,方便客户端开发

github链接: https://github.com/yuanrw/IM

本篇将带大家从零开始搭建一个轻量级的IM服务端,IM的整体设计思路和架构在我的上篇博客中已经讲过了,没看过的同学请点击从零开始开发IM(即时通讯)服务端

这篇将给大家带来更多的细节实现。我将从三个方面来阐述如何构建一个完整可靠的IM系统。

  1. 可靠性
  2. 安全性
  3. 存储设计

可靠性

什么是可靠性?对于一个IM系统来说,可靠的定义至少是不丢消息消息不重复不乱序,满足这三点,才能说有一个好的聊天体验。

不丢消息

我们先从不丢消息开始讲起。

首先复习一下上一篇设计的服务端架构

我们先从一个简单例子开始思考:当Alice给Bob发送一条消息时,可能要经过这样一条链路:

  1. client-->connecter
  2. connector-->transfer
  3. transfer-->connector
  4. connector-->client

在这整个链路中的每个环节都有可能出问题,虽然tcp协议是可靠的,但是它只能保证链路层的可靠,无法保证应用层的可靠。

例如在第一步中,connector收到了从client发出的消息,但是转发给transfer失败,那么这条消息Bob就无法收到,而Alice也不会意识到消息发送失败了。

如果Bob状态是离线,那么消息链路就是:

  1. client-->connector
  2. connector-->transfer
  3. transfer-->mq

如果在第三步中,transfer收到了来自connector的消息,但是离线消息入库失败,
那么这个消息也是传递失败了。
为了保证应用层的可靠,我们必须要有一个ack机制,使发送方能够确认对方收到了这条消息。

具体的实现,我们模仿tcp协议做一个应用层的ack机制。

tcp的报文是以字节(byte)为单位的,而我们以message单位。

发送方每次发送一个消息,就要等待对方的ack回应,在ack确认消息中应该带有收到的id以便发送方识别。

其次,发送方需要维护一个等待ack的队列。 每次发送一个消息之后,就将消息和一个计时器入队。

另外存在一个线程一直轮询队列,如果有超时未收到ack的,就取出消息重发。

超时未收到ack的消息有两种处理方式:

  1. 和tcp一样不断发送直到收到ack为止。
  2. 设定一个最大重试次数,超过这个次数还没收到ack,就使用失败机制处理,节约资源。例如如果是connector长时间未收到client的ack,那么可以主动断开和客户端的连接,剩下未发送的消息就作为离线消息入库,客户端断连后尝试重连服务器即可。

不重复、不乱序

有的时候因为网络原因可能导致ack收到较慢,发送方就会重复发送,那么接收方必须有一个去重机制。
去重的方式是给每个消息增加一个唯一id。这个唯一id并不一定是全局的,只需要在一个会话中唯一即可。例如某两个人的会话,或者某一个群。如果网络断连了,重新连接后,就是新的会话了,id会重新从0开始。

接收方需要在当前会话中维护收到的最后一个消息的id,叫做lastId
每次收到一个新消息, 就将id与lastId作比较看是否连续,如果不连续,就放入一个暂存队列 queue中稍后处理。

例如:

  • 当前会话的lastId=1,接着服务器收到了消息msg(id=2),可以判断收到的消息是连续的,就处理消息,将lastId修改为2。
  • 但是如果服务器收到消息msg(id=3),就说明消息乱序到达了,那么就将这个消息入队,等待lastId变为2后,(即服务器收到消息msg(id=2)并处理完了),再取出这个消息处理。

因此,判断消息是否重复只需要判断msgId>lastId && !queue.contains(msgId)即可。如果收到重复的消息,可以判断是ack未送达,就再发送一次ack。

接收方收到消息后完整的处理流程如下:

伪代码如下:

class ProcessMsgNode{
    /**
     * 接收到的消息
     */
    private Message message;
    /**
     * 处理消息的方法
     */
    private Consumer<Message> consumer;
}

public CompletableFuture<Void> offer(Long id,Message     message,Consumer<Message> consumer) {
    if (isRepeat(id)) {
    //消息重复
        sendAck(id);
        return null;
    }
    if (!isConsist(id)) {
    //消息不连续
        notConsistMsgMap.put(id, new ProcessMsgNode(message, consumer));
        return null;
    }
    //处理消息
    return process(id, message, consumer);
}

private CompletableFuture<Void> process(Long id, Message message, Consumer<Message> consumer) {
    return CompletableFuture
        .runAsync(() -> consumer.accept(message))
        .thenAccept(v -> sendAck(id))
        .thenAccept(v -> lastId.set(id))
        .thenComposeAsync(v -> {
            Long nextId = nextId(id);
            if (notConsistMsgMap.containsKey(nextId)) {
                //队列中有下个消息
                ProcessMsgNode node = notConsistMsgMap.get(nextId);
                return process(nextId, node.getMessage(), consumer);
            } else {
                //队列中没有下个消息
                CompletableFuture<Void> future = new CompletableFuture<>();
                future.complete(null);
                return future;
            }
        })
        .exceptionally(e -> {
            logger.error("[process received msg] has error", e);
            return null;
        });
}

安全性

无论是聊天记录还是离线消息,肯定都会在服务端存储备份,那么消息的安全性,保护客户的隐私也至关重要。
因此所有的消息都必须要加密处理。
在存储模块里,维护用户信息和关系链有两张基础表,分别是im_user用户表和im_relation关系链表。

  • im_user表用于存放用户常规信息,例如用户名密码等,结构比较简单。
  • im_relation表用于记录好友关系,结构如下:
CREATE TABLE `im_relation` (
  `id` bigint(20) COMMENT '关系id',
  `user_id1` varchar(100) COMMENT '用户1id',
  `user_id2` varchar(100) COMMENT '用户2id',
  `encrypt_key` char(33) COMMENT 'aes密钥',
  `gmt_create` timestamp DEFAULT CURRENT_TIMESTAMP,
  `gmt_update` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `USERID1_USERID2` (`user_id1`,`user_id2`)
);
  • user_id1user_id2是互为好友的用户id,为了避免重复,存储时按照user_id1<user_id2的顺序存,并且加上联合索引。
  • encrypt_key是随机生成的密钥。当客户端登录时,就会从数据库中获取该用户的所有的relation,存在内存中,以便后续加密解密。
  • 当客户端给某个好友发送消息时,取出内存中该关系的密钥,加密后发送。同样,当收到一条消息时,取出相应的密钥解密。

客户端完整登录流程如下:

  1. client调用rest接口登录。
  2. client调用rest接口获取该用户所有relation
  3. client向connector发送greet消息,通知上线。
  4. connector拉取离线消息推送给client。
  5. connector更新用户session。

那为什么connector要先推送离线消息再更新session呢?我们思考一下如果顺序倒过来会发生什么:

  1. 用户Alice登录服务器
  2. connector更新session
  3. 推送离线消息
  4. 此时Bob发送了一条消息给Alice

如果离线消息还在推送的过程中,Bob发送了新消息给Alice,服务器获取到Alice的session,就会立刻推送。这时新消息就有可能夹在一堆离线消息当中推过去了,那这时,Alice收到的消息就乱序了。

而我们必须保证离线消息的顺序在新消息之前。

那么如果先推送离线消息,之后才更新session。在离线消息推送的过程中,Alice的状态就是“未上线”,这时Bob新发送的消息只会入库im_offlineim_offline表中的数据被读完之后才会“上线”开始接受新消息。这也就避免了乱序。

存储设计

存储离线消息

当用户不在线时,离线消息必然要存储在服务端,等待用户上线再推送。理解了上一个小节后,离线消息的存储就非常容易了。增加一张离线消息表im_offline,表结构如下:

CREATE TABLE `im_offline` (
  `id` int(11) COMMENT '主键',
  `msg_id` bigint(20) COMMENT '消息id',
  `msg_type` int(2) COMMENT '消息类型',
  `content` varbinary(5000) COMMENT '消息内容',
  `to_user_id` varchar(100) COMMENT '收件人id',
  `has_read` tinyint(1) COMMENT '是否阅读',
  `gmt_create` timestamp COMMENT '创建时间',
  PRIMARY KEY (`id`)
);

msg_type用于区分消息类型(chat,ack),content加密后的消息内容以byte数组的形式存储。
用户上线时按照条件to_user_id=用户id拉取记录即可。

防止离线消息重复推送

我们思考一下多端登录的情况,Alice有两台设备同时登陆,在这种并发的情况下,我们就需要某种机制来保证离线消息只被读取一次。

这里利用CAS机制来实现:

  1. 首先取出所有has_read=false的字段。
  2. 检查每条消息的has_read值是否为false,如果是,则改为true。这是原子操作。
update im_offline set has_read = true where id = ${msg_id} and has_read = false
  1. 修改成功则推送,失败则不推送。

相信到这里,同学们已经可以自己动手搭建一个完整可用的IM服务端了。更多问题欢迎评论区留言~~

IM1.0.0版本已上线,github链接:
https://github.com/yuanrw/IM
觉得对你有帮助请点个star吧~!

原文地址:https://www.cnblogs.com/yuanrw/p/11518268.html

时间: 2024-10-07 13:38:09

从零开始开发IM(即时通讯)服务端(二)的相关文章

使用tomcat方式实现websocket即时通讯服务端讲解

第一种方案:使用Tomcat的方式实现 tomcat版本要求:tomcat7.0+.需要支持Javaee7 导入javeee-api的jar(如果已经引入可以忽略): pom.xml中加入: <dependency> <groupId>javax</groupId> <artifactId>javaee-api</artifactId> <version>7.0</version> <scope>provide

Golang 在电商即时通讯服务建设中的实践

马蜂窝技术原创文章,更多干货请搜索公众号:mfwtech ?即时通讯(IM)功能对于电商平台来说非常重要,特别是旅游电商. 从商品复杂性来看,一个旅游商品可能会包括用户在未来一段时间的衣.食.住.行等方方面面:从消费金额来看,往往单次消费额度较大:对目的地的陌生.在行程中可能的问题,这些因素使用户在购买前.中.后都存在和商家沟通的强烈需求.可以说,一个好用的 IM 可以在一定程度上对企业电商业务的 GMV 起到促进作用. 本文我们将结合马蜂窝旅游电商 IM 服务的发展历程,重点介绍基于 Go 的

.net平台 基于 XMPP协议的即时消息服务端简单实现

.net平台 基于 XMPP协议的即时消息服务端简单实现 昨天抽空学习了一下XMPP,在网上找了好久,中文的资料太少了所以做这个简单的例子,今天才完成.公司也正在准备开发基于XMPP协议的即时通讯工具所以也算是打一个基础吧!如果你还没有了解过XMPP请先阅读附录中链接的文章,本实例是基agsXMPP上开发的,agsXMPP是C#写的支持开源XMPP协议软件,我们可以在agsXMPP上快速构建自已的即时通讯平台,我的这个例子只是修改了服务器端,因为agsXMPP本身自带的服务器端没有实现聊天功能.

Android即时通讯服务,类似QQ的聊天工具,源码分享

Android即时通讯服务,类似QQ的聊天工具 基于bmob开发完成 下面有源码分享,敬请下载,多多支持回复 1.1      手机必须要有网络 1.2      手机必须要有SIM卡 1.3      手机必须要挂载有SDCAR 源码在此 链接: http://pan.baidu.com/s/1jGCIFsy 密码: 9eqc

纯正商业级应用-Node.js Koa2开发微信小程序服务端

第1章 前言.导学与node.js如何理解Node.js?前端到底要不要学习Node.js?本课程能让你学到什么? 第2章 Koa2的那点事儿与异步编程模型Koa非常的精简,基本上,没有经过二次开发的Koa根本“不能”用.本章我们讲解Koa的重要特性,理解什么是洋葱模型?以及在KOA中如何进行异步编程?很多同学都了解以上知识点,但听完本章,你会有一些不一样的理解,比如:为什么要有洋葱模型?没有会怎样?Koa中间件一定是异步的吗? ... 第3章 路由系统的改造Koa-router需要进行一些改造

开源jabber(XMPP)架设内部即时通讯服务的解决方案

Jabber 是著名的即时通讯服务服务器,它是一个自由开源软件,能让用户自己架即时通讯服务器,可以在Internet上应用,也可以在局域网中应用.    XMPP(可扩展消息处理现场协议)是基于可扩展标记语言(XML)的协议,它用于即时消息(IM)以及在线现场探测.它在促进服务器之间的准即时操作.这个协议可能最终允许因特网用户向因特网上的其他任何人发送即时消息,即使其操作系统和浏览器不同.XMPP的技术来自于Jabber,其实它是 Jabber的核心协定,所以XMPP有时被误称为Jabber协议

TCP/IP异步通讯服务端实现方法

近期做了个TCP/IP异步通讯服务端实现方法,也是在网上胡乱搜索,然找了个自认为比较好的,然后封装一下,供后面自个使用,也供大家参考,如有不好的地方,欢迎指正,谢谢! 下面说一下这个方法里面的几个知识点: 1.托管 这个东西真心好用,虽然不知道具体怎么弄的,托管可以实现一个对象中的方法交由其他对象实现,而且可以同时触发多个方法,组件的触发函数就是由托管实现的,具体实现如下: 先声明一个托管的方法类型 public delegate void RecieveMsg(string IP_addr,

C#开发BIMFACE系列10 服务端API之获取文件下载链接

通过BIMFACE控制台或者调用服务接口上传文件成功后,默认场景下需要下载该源文件,下载文件一般需要知道文件的下载链接即可.BIMACE平台提供了“获取文件下载链接”的服务接口.下面详细介绍其使用方法. 请求地址:GET https://file.bimface.com/download/url 说明:应用通过该接口获取文件的下载地址,然后下载文件.下载地址有效时间是5分钟. 参数: 请求 path(示例):https://file.bimface.com/download/url?fileId

C#开发BIMFACE系列11 服务端API之源文件删除

通过BIMFACE控制台或者调用服务接口上传文件成功后,如果不再需要该文件,则可以通过BIMFACE平台提供的“源文件删除”服务接口删除具体的文件.下面详细介绍其使用方法. 请求地址:DELETE https://file.bimface.com/file 说明:根据文件ID删除文件 参数: 请求 path(示例):https://file.bimface.com/file?fileId=1418750515413120 请求 header(示例):"Authorization: Bearer

C#开发BIMFACE系列14 服务端API之批量获取转换状态详情

系列目录     [已更新最新开发文章,点击查看详细] 上一篇<C#开发BIMFACE系列13 服务端API之获取转换状态>中介绍了根据文件ID查询单个文件的转换状态. 本文介绍批量获取转换状态详情. 请求地址:POST https://api.bimface.com/translateDetails 说明:应用发起转换以后,可以根据筛选条件,通过该接口批量查询转换状态详情 参数: 请求 path(示例):https://api.bimface.com/translateDetails 请求