基于NIO的消息路由的实现(四) 服务端通讯主线程(2)断包和粘包的处理

本来我打算单独开一章,专门说明粘包和断包,但是觉得这个事儿我在做的时候挺头疼的,但是对于别人或许不那么重要,于是就在这里写吧。

那么何谓粘包、何谓断包呢?

  • 粘包:我们知道客户端在写入报文给服务端的时候,首先要将需要写入的内容写入Buffer,以ByteBuffer为例,如果你Buffer定义的足够大,并且你发送的报文足够快,此时就会产生粘包现象,举例来说 你发送一个 报文“ M|A”,然后你有发送了一个“M|B”,如果产生粘包,服务端从缓冲区里面读出的就是“M|AM|B”,这样的字符串;也就是说,客户端的第一条报文和第二条报文粘在了一起。
  • 断包:断包往往是在粘包之后产生的,按照刚才的例子,假设你的缓冲区大小设置为4(当然没人会设置这么小的缓冲区,举例子,凑合看吧),如果你发送的报文足够快,就会产生发送给服务器的报文变为这样:第一个包“M|AM”,第二个包“|B”

在大多数的NIO例子中,均不包括此过程的处理,而且很多的例子中也不会浮现这个情况,甚至程序上线,如果系统压力不大,这样的情况都出现的很少(尤其是断包)。值得庆幸的是,这两种情况,我均重现了,我在客户端不做任何停顿的情况下,for循环发送10万条报文给服务端,当我的缓冲区服务端缓冲区设置为4096,客户端缓冲区设置为1024的时候,出现的频率还是蛮高的,可以加大缓冲区来减少断包的情况发生,但是不能避免,粘包则是必然发生的。

好、我回答在第7小点中提到的问题,为什么要在通讯协议的外层在加上四位?这四位就是用来标记我报文指令的长度的,一旦我知道了这个长度,我就可以根据长度对断包和粘包进行相关的处理。具体代码如下:

/**
     * 处理断包和粘包现象
     *
     * @param socketChannel
     * @param byteBuffer
     */
    private void handlePacket(SocketChannel socketChannel, ByteBuffer byteBuffer) {
        //标记读取缓冲区起始位置
        int location = 0;
        //如果缓冲区从0到limit的数量大于包体大小标记数字
        while (byteBuffer.remaining() > PACKET_HEAD_LENGTH) {
            //包体大小标记
            String strBsize;
            //如果endPacket的字节length大于0,则证明:断包的前一截为包含包头和包体的;
            if (endPacketStr.getBytes().length > 0) {

                String strPacket = endPacketStr.substring(PACKET_HEAD_LENGTH) + new String(byteBuffer.array(), 0, remainBodySize);
                byteBuffer.position(remainBodySize);
                location = remainBodySize;
//                                    if(logger.isDebugEnabled()) {
                logger.info("【断包处理】(包含包体)合并后的报文:" + strPacket + ",缓冲区的position:" + location);
//                                    }
                offerPacket(socketChannel, strPacket);
                //处理完毕,清理断包的前一截,以便于下次使用;
                endPacketStr = "";
                //清理后一截报文的字节数标记;
                remainBodySize = 0;
                continue;
                //如果endBufferStr的字节length大于0,则证明:断包的前一截仅包含包头或包头的一部分,不包含包体;
            } else if (endBufferStr.getBytes().length > 0) {

                strBsize = (new StringBuffer(endBufferStr).append(new
                        String(byteBuffer.array(), location, PACKET_HEAD_LENGTH - endBufferStr.getBytes().length))).toString();

                //移动缓冲区position
                byteBuffer.position(PACKET_HEAD_LENGTH - endBufferStr.getBytes().length);
                location = byteBuffer.position();
                //得到包体大小
                int byteBufferSize = Integer.parseInt(strBsize.trim());
                //进行报文合并,把保存的仅包含包头或包头一部分的前一截与后一截合并
                String strPacket = endBufferStr + (new String(byteBuffer.array(), PACKET_HEAD_LENGTH - endBufferStr.getBytes().length, byteBufferSize));
                byteBuffer.position(location + byteBufferSize);//将缓冲区的位置移动到下一个包体大小标记位置
                location = byteBuffer.position();
                logger.info("【断包处理】(不包含包体)合并后的报文:" + strPacket + ",缓冲区的position:" + location);
                offerPacket(socketChannel, strPacket);
                endBufferStr = "";
                continue;
                //进入正常处理(规范的报文处理,不考虑断包)
            } else {
                strBsize = new String(byteBuffer.array(), location, PACKET_HEAD_LENGTH);
                //移动缓冲区position
                byteBuffer.position(location + PACKET_HEAD_LENGTH);
            }
            if (logger.isDebugEnabled()) {
                logger.debug("收到客户端包体大小:" + strBsize + ",查看position变化:" + byteBuffer.position());
            }
            //得到包体大小
            int byteBufferSize = Integer.parseInt(strBsize.trim());
            //如果从缓冲区当前位置到limit大于包体大小,证明粘包了,进行包体处理。等于则为正常包体,不存在粘包现象。
            if (byteBuffer.remaining() >= byteBufferSize) {

                String strPacket = endBufferStr + (new String(byteBuffer.array(), PACKET_HEAD_LENGTH + location, byteBufferSize));
                byteBuffer.position(location + PACKET_HEAD_LENGTH + byteBufferSize);//将缓冲区的位置移动到下一个包体大小标记位置
                if (logger.isDebugEnabled()) {
                    logger.debug("收到客户端包体内容:" + strPacket + ",2查看position变化:" + byteBuffer.position());
                }
                //将字符串报文封装为类
                offerPacket(socketChannel, strPacket);
                location = byteBuffer.position();//设定读取缓冲区起始位置
                //如果缓冲区当前位置到limit小于包体,证明断包了,进行断包处理
            } else {

                endPacketStr = new String(byteBuffer.array(), location, byteBuffer.limit() - location);
                remainBodySize = Integer.parseInt(endPacketStr.substring(0, PACKET_HEAD_LENGTH).trim()) - endPacketStr.getBytes().length + PACKET_HEAD_LENGTH;
                //已经找到断包前半截,所以把整个buffer的position调整至最后,不再处理。等待新的key进入
                byteBuffer.position(byteBuffer.limit());
                logger.info("处理断包仅包含完整包头的尾部报文,缓冲区位置:" + location + ",缓冲区limit:" + byteBuffer.limit() + ",包含完全包头的剩余字符:" + endPacketStr + ",bodySize:" + remainBodySize);

            }
        }
        //处理仅包含包头前一截的报文;
        if (byteBuffer.remaining() > 0) {

            //缓冲区中剩余的仅包含包头前一截的报文
            endBufferStr = new String(byteBuffer.array(), location, byteBuffer.limit() - location);

            logger.info("处理断包仅包含包头前一截的尾部报文,缓冲区位置:" + location + ",缓冲区limit:" + byteBuffer.limit() + ",不包含完全包头的剩余字符:" + endBufferStr);
            //移动缓冲区指针到最后,代表已经保存了前一截报文,无需再进行处理;
            byteBuffer.position(byteBuffer.limit());
        }
        //我也不知道这是否有用,能不能释放内存资源
        byteBuffer.clear();
    }

这块儿很可能有不合理的地方,因为对于一个接近40岁的程序员来说,逻辑在头脑中已经比较混乱了。我知道要对如下几种情况进行处理:

1、粘包,粘包比较好处理,主要是根据包头的前四位,确定包体的大小,然后移动buffer的位置(position),把整个包读出来放入队列就行了;

2、断包:断包分为两种情况,第一种从包头开始就断了,这是你无法获得包体大小,需要把前面的一截保存起来,就必须等下一个报文来了之后,把他们连在一起,然后再做处理;第二种,已经读到完整的包头,仍然需要把前面一截保存起来,确定后面还有多少,然后再处理;我利用了三个类成员:

//断包处理,前一截包含完整包头;
private String endPacketStr = "";
//断包处理,前一截不包含完整包头;
private String endBufferStr = "";
//断包处理,前一截包含完整包头时,包体的大小标记;
private int remainBodySize = 0;

注意这些类的成员需要在使用后,清空,以便于下次使用,否则就乱套了。这块儿代码,我写完就没再看过,挺费神。如果有人能提供更好地办法,不胜感激。

时间: 2024-11-23 07:34:02

基于NIO的消息路由的实现(四) 服务端通讯主线程(2)断包和粘包的处理的相关文章

基于NIO的消息路由的实现(四) 服务端通讯主线程(1)

一.简单介绍: 服务端通讯主线程是消息路由服务的启动类,其主要作用如下: 1.初始化相关配置: 2.根据配置的ip和port创建tcp服务: 3.接收客户端连接,并给客户端分配令牌: 4.接收客户端的登录请求,并将客户端相关信息(令牌.客户端登录标识.最后访问时间.当前token所使用的通道,保存到缓冲区) 5.接收客户端的报文请求,并添加到通讯队列,等待处理: 6.接收来自各处的指令发送请求,并发送至相关通道: 二.详细介绍: 1.启动方法:首先加载配置信息:然后启动主线程.通讯报文消费线程(

基于NIO的消息路由的实现(三)服务端与客户端结构

一.服务器端结构: 如图所示: 指令类和报文类:对下行的指令和上行的报文进行了类的封装,分别实现IOrder和IPacket接口,继承Order,Packet基类: 服务主线程:接受客户端连接,将客户端发送的报文投递到通讯队列中,发送指令给客户端:保存连接对象(GVConnection) 通讯队列CQ:存储客户端发送过来的报文,此报文由通讯主线程放入: 通讯队列消费者:对通讯队列中的报文进行处理,该做什么做什么,如果是短消息,则放入消息队列MQ中单独处理: 消息队列消费者:对MQ中的短消息进行处

基于NIO的消息路由的实现(七)客户端的一些实现,维持链路,断线重连

一.客户端代码存在的必要性以及我认为需要解决的问题 就NIO通讯本身而言完全没必要分开,其实客户端代码和服务端代码可以放到一起.但是在业务上是分开的.我在做nio的时候思考了许多我自己认为应该解决的问题:主要的如下: 1.链路维护(心跳): 定期的向服务端发送维持链路报文,获得服务端的响应,以证明其仍然在存活状态:同时服务端会记录客户端每次维持链路的时间,用于服务端对通道的超时 判断: 2.断线重连: 一种情况是正常断线,目前我利用对channel的read返回来进行判断: 另一种是非正常短线,

基于NIO的消息路由的实现(六)报文队列的处理

一.报文队列的处理: 如果将多路复用器获取到的所有事件,阻塞式的同步处理,那恐怕会严重影响selector的性能,所以我把从客户端接收到的大部分消息,都放入了队列中,然后另外启动队列的消费线程对消息进行异步的处理:具体如下: 1.通讯报文队列消费者:在selector对read事件的处理过程中,我在最后都把客户端发送的报文放入了一个叫CQUEUE的队列中,具体定义如下,CQUEUE是所有客户端发送报文的队列,在CQUEUE队列中的消费者线程中,我又对M类报文进行了对垒处理,放入了另一个队列MQU

Handler消息传递机制(四)子线程接收主线程发送的消息

package com.example.looper; import android.app.Activity; import android.os.Bundle; import android.os.Handler; import android.os.Looper; import android.os.Message; import android.view.View; import android.view.View.OnClickListener; import android.widg

Handler详解系列(四)——利用Handler在主线程与子线程之间互发消息

MainActivity如下: package cc.c; import android.app.Activity; import android.os.Bundle; import android.os.Handler; import android.os.Looper; import android.os.Message; import android.widget.TextView; /** * Demo描述: * * 示例步骤如下: * 1 子线程给子线程本身发送消息 * 2 收到1的消

Handler具体解释系列(四)——利用Handler在主线程与子线程之间互发消息

MainActivity例如以下: package cc.c; import android.app.Activity; import android.os.Bundle; import android.os.Handler; import android.os.Looper; import android.os.Message; import android.widget.TextView; /** * Demo描写叙述: * * 演示样例过程例如以下: * 1 子线程给子线程本身发送消息 *

IOS 推送消息 php做推送服务端

IOS推送消息是许多IOS应用都具备的功能,最近也在研究这个功能,参考了很多资料终于搞定了,下面就把步骤拿出来分享下: iOS消息推送的工作机制可以简单的用下图来概括: Provider是指某个iPhone软件的Push服务器,APNS是Apple Push Notification Service的缩写,是苹果的服务器. 上图可以分为三个阶段: 第一阶段:应用程序把要发送的消息.目的iPhone的标识打包,发给APNS. 第二阶段:APNS在自身的已注册Push服务的iPhone列表中,查找有

Java 消息推送------GoEasy实现服务端推送和web端推送

项目中需要消息推送,又想较低开发成本,具体需求:角色用户在后台管理页面发布一个消息,所有用这个系统的用户无论在哪个页面都能及时收到他发布的消息,后来我在网上查询到了一个第三方的免费推送服务-GoEasy push, 它可以满足我的需求,下面是如何用GoEasy进行信息推送及接收: 第一种:Java服务器端推送,web端接收推送信息 步骤: 从GoEasy官网下载jar包,并放到项目中. https://cdn.goeasy.io/sdk/goeasy-0.1.jar Java代码来了,你没有看错