MINA粘包断包专题研究

一、前述:

近期做项目用到了MINA,其中遇到了一个断包与粘包的问题,困扰了我一天一夜,经过一天一夜的思索与查看其他大牛分享的资料,现将我在解决这一问题过程中的一些心得与解决问题的方法记录下来,供广大IT兄弟姐妹们参考,如有不对或欠妥之处,请指证。请不要吝惜分享您的技术,作为中国IT软件工程师,一定要想到多一个人掌握IT技术,不会给你增加一个竞争对手,如果认为会给你增加竞争对手,这种想法是非常狭隘的,自私自利的。只有分享,大家共同的技术提高了,才能激发出更多的思维解决更加棘手的技术难点,希望大家永远Open,为咱们中国软件技术的振兴尽一分心力。

二、概念

1、什么是粘包,粘包是如何产生的?

指TCP协议中,发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾。

造成的可能原因:

(1)发送端需要等缓冲区满才发送出去,造成粘包。

(2)接收方不及时接收缓冲区的包,造成多个包接收。

2、什么是断包,断包是如何产生的?

也就是数据不全,比如包太大,就把包分解成多个小包,多次发送,导致每次接收数据都不全。

三、在protobuf中客户端如何接收服务器端响应的数据?

客户端在接收数据时需要考虑以下几种情况:

1. 一个ip包中只包含一个完整消息

2.
一个ip包中包含一个完整消息和另一个消息的一部分

3.
一个ip包中包含一个消息的一部分

4.
一个ip包中包含两个完整的数据消息或更多

四、编写解包类,解析服务器端响应的数据。

    1、思路

MINA中的CumulativeProtocolDecoder类,已经对断包粘包的问题做了挺完美的处理,我们可以编写一个SocketDecoder类继承自CumulativeProtocolDecoder类,然后实现CumulativeProtocolDecoder中的抽象方法,doDecode()。

    2、CumulativeProtocolDecoder粘包、断包问题解决的原理

   ( 1)、当doDecode()方法返回true时, CumulativeProtocolDecoder的方法会首先判断你是否在doDecode()方法中从内部的IoBuffer缓冲区读取了数据,如果没有,则会抛出非法的状态异常,也就是你的doDecode()方法返回true就表示你已经消费了本次数据(相当于聊天室中一个完整的消息已经读取完毕),进一步说,也就是此时你必须已经消费过内部的IoBuffer缓冲区数据(哪怕是消费了一个字节的数据)。如果验证通过,那么CumulativeProtocolDecoder会检查缓冲区内是否还有数据未读取,如果有就继续调用doDecode()方法,没有就停止对doDecode()的调用,直到有新的数据被缓冲。

 
( 2)、当你的doDecode()方法返回false时,CumulativeProtocolDecoder会停止对doDecode()方法的调用,但此时如果本次数据还有未读取完的,就将含有剩余数据的IoBuffer缓冲区保存到IoSession中,以便下一次数据到来时可以从IoSession中提取合并。如果发现本次数据全都读取完毕,则清空IoBuffer缓冲区。

( 3)、原理归纳

 
总的来说:当你认为读取到的数据已经够解码了,那么就返回true,否则就返回false。这个CumulativeProtocolDecoder其实最重要的工作就是给你完成了数据的累积,因为这个工作是很烦锁的。

 
doDecode()方法返回true:CumulativeProtocolDecoder会再次调用decoder,并把剩余的数据发下来。

 
doDecode()方法
返回false:CumulativeProtocolDecoder就不处理剩余的,当有新数据包来的时候把剩余的和新的拼接在一起然后再调用decode()方法。

   (4)累积是什么意思

累积是服务器端响应的数据分为多个IP包发送到客户端的时候,它会多次去调用doDecode()方法,doDecode()方法的执行也是由于他的父类CumulativeProtocolDecoder决定的,大家看源码去走读一下就会明白,在这里不做过多解释。明确一点,IP包发送到客户端之后,doDecode()方法会多次调用,多次调用是为了把IP包拼装成完整的服务器端响应的数据。

五、CumulativeProtocolDecoder源码

/*
 *  Licensed to the Apache Software Foundation (ASF) under one
 *  or more contributor license agreements.  See the NOTICE file
 *  distributed with this work for additional information
 *  regarding copyright ownership.  The ASF licenses this file
 *  to you under the Apache License, Version 2.0 (the
 *  "License"); you may not use this file except in compliance
 *  with the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing,
 *  software distributed under the License is distributed on an
 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 *  KIND, either express or implied.  See the License for the
 *  specific language governing permissions and limitations
 *  under the License.
 *
 */
package org.apache.mina.filter.codec;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.service.TransportMetadata;
import org.apache.mina.core.session.AttributeKey;
import org.apache.mina.core.session.IoSession;

/**
 * A {@link ProtocolDecoder} that cumulates the content of received
 * buffers to a <em>cumulative buffer</em> to help users implement decoders.
 * <p>
 * If the received {@link IoBuffer} is only a part of a message.
 * decoders should cumulate received buffers to make a message complete or
 * to postpone decoding until more buffers arrive.
 * <p>
 * Here is an example decoder that decodes CRLF terminated lines into
 * <code>Command</code> objects:
 * <pre>
 * public class CrLfTerminatedCommandLineDecoder
 *         extends CumulativeProtocolDecoder {
 *
 *     private Command parseCommand(IoBuffer in) {
 *         // Convert the bytes in the specified buffer to a
 *         // Command object.
 *         ...
 *     }
 *
 *     protected boolean doDecode(
 *             IoSession session, IoBuffer in, ProtocolDecoderOutput out)
 *             throws Exception {
 *
 *         // Remember the initial position.
 *         int start = in.position();
 *
 *         // Now find the first CRLF in the buffer.
 *         byte previous = 0;
 *         while (in.hasRemaining()) {
 *             byte current = in.get();
 *
 *             if (previous == '\r' && current == '\n') {
 *                 // Remember the current position and limit.
 *                 int position = in.position();
 *                 int limit = in.limit();
 *                 try {
 *                     in.position(start);
 *                     in.limit(position);
 *                     // The bytes between in.position() and in.limit()
 *                     // now contain a full CRLF terminated line.
 *                     out.write(parseCommand(in.slice()));
 *                 } finally {
 *                     // Set the position to point right after the
 *                     // detected line and set the limit to the old
 *                     // one.
 *                     in.position(position);
 *                     in.limit(limit);
 *                 }
 *                 // Decoded one line; CumulativeProtocolDecoder will
 *                 // call me again until I return false. So just
 *                 // return true until there are no more lines in the
 *                 // buffer.
 *                 return true;
 *             }
 *
 *             previous = current;
 *         }
 *
 *         // Could not find CRLF in the buffer. Reset the initial
 *         // position to the one we recorded above.
 *         in.position(start);
 *
 *         return false;
 *     }
 * }
 * </pre>
 * <p>
 * Please note that this decoder simply forward the call to
 * {@link #doDecode(IoSession, IoBuffer, ProtocolDecoderOutput)} if the
 * underlying transport doesn't have a packet fragmentation.  Whether the
 * transport has fragmentation or not is determined by querying
 * {@link TransportMetadata}.
 *
 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
 */
public abstract class CumulativeProtocolDecoder extends ProtocolDecoderAdapter {

    private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");

    /**
     * Creates a new instance.
     */
    protected CumulativeProtocolDecoder() {
        // Do nothing
    }

    /**
     * Cumulates content of <tt>in</tt> into internal buffer and forwards
     * decoding request to {@link #doDecode(IoSession, IoBuffer, ProtocolDecoderOutput)}.
     * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
     * and the cumulative buffer is compacted after decoding ends.
     *
     * @throws IllegalStateException if your <tt>doDecode()</tt> returned
     *                               <tt>true</tt> not consuming the cumulative buffer.
     */
    public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {

        if (!session.getTransportMetadata().hasFragmentation()) {
            while (in.hasRemaining()) {
            	// 判断是否符合解码要求,不符合则中断并返回
                if (!doDecode(session, in, out)) {
                    break;
                }
            }

            return;
        }

        boolean usingSessionBuffer = true;
        // 取得上次断包数据
        IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);
        // If we have a session buffer, append data to that; otherwise
        // use the buffer read from the network directly.
        if (buf != null) {// 如果有断包数据
            boolean appended = false;
            // Make sure that the buffer is auto-expanded.
            if (buf.isAutoExpand()) {
                try {
                	// 将断包数据和当前传入的数据进行拼接
                    buf.put(in);
                    appended = true;
                } catch (IllegalStateException e) {
                    // A user called derivation method (e.g. slice()),
                    // which disables auto-expansion of the parent buffer.
                } catch (IndexOutOfBoundsException e) {
                    // A user disabled auto-expansion.
                }
            }

            if (appended) {
                buf.flip();// 如果是拼接的数据,将buf置为读模式
            } else {
                // Reallocate the buffer if append operation failed due to
                // derivation or disabled auto-expansion.
            	//如果buf不是可自动扩展的buffer,则通过数据拷贝的方式将断包数据和当前数据进行拼接
                buf.flip();
                IoBuffer newBuf = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true);
                newBuf.order(buf.order());
                newBuf.put(buf);
                newBuf.put(in);
                newBuf.flip();
                buf = newBuf;

                // Update the session attribute.
                session.setAttribute(BUFFER, buf);
            }
        } else {
            buf = in;
            usingSessionBuffer = false;
        }

        for (;;) {
            int oldPos = buf.position();
            boolean decoded = doDecode(session, buf, out);// 进行数据的解码操作
            if (decoded) {
            	// 如果符合解码要求并进行了解码操作,则当前position和解码前的position不可能一样
                if (buf.position() == oldPos) {
                    throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");
                }
                // 如果已经没有数据,则退出循环
                if (!buf.hasRemaining()) {
                    break;
                }
            } else {
            	// 如果不符合解码要求,则退出循环
                break;
            }
        }

        // if there is any data left that cannot be decoded, we store
        // it in a buffer in the session and next time this decoder is
        // invoked the session buffer gets appended to
        if (buf.hasRemaining()) {
            if (usingSessionBuffer && buf.isAutoExpand()) {
                buf.compact();
            } else {
            	//如果还有没处理完的数据(一般为断包),刚将此数据存入session中,以便和下次数据进行拼接。
                storeRemainingInSession(buf, session);
            }
        } else {
            if (usingSessionBuffer) {
                removeSessionBuffer(session);
            }
        }
    }

    /**
     * Implement this method to consume the specified cumulative buffer and
     * decode its content into message(s).
     *
     * @param in the cumulative buffer
     * @return <tt>true</tt> if and only if there's more to decode in the buffer
     *         and you want to have <tt>doDecode</tt> method invoked again.
     *         Return <tt>false</tt> if remaining data is not enough to decode,
     *         then this method will be invoked again when more data is cumulated.
     * @throws Exception if cannot decode <tt>in</tt>.
     */
    protected abstract boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception;

    /**
     * Releases the cumulative buffer used by the specified <tt>session</tt>.
     * Please don't forget to call <tt>super.dispose( session )</tt> when
     * you override this method.
     */
    @Override
    public void dispose(IoSession session) throws Exception {
        removeSessionBuffer(session);
    }

    private void removeSessionBuffer(IoSession session) {
        session.removeAttribute(BUFFER);
    }

    private void storeRemainingInSession(IoBuffer buf, IoSession session) {
        final IoBuffer remainingBuf = IoBuffer.allocate(buf.capacity()).setAutoExpand(true);

        remainingBuf.order(buf.order());
        remainingBuf.put(buf);

        session.setAttribute(BUFFER, remainingBuf);
    }
}

六、自定义SocketDecoder类的写法(我在项目中的解码类):

 
     1、思路梳理

 
      
(1)当Socket长连接成功之后,客户端向服务器端发起接口调用,请求相关业务数据。

(2)服务器端响应,将业务数据write到IoSession的IoBuffer中。

(3)客户端考虑四种情况(第三大点中已经列出,不再累述)

(4)客户端从IP包包头中读取到响应数据的总长度

(5)接下来再通过调用IoBuffer类的remaining()方法,获取到IP包中数据的长度。

(6)判断IP包中数据的长度,是否等于数据的总长度,如果是小于响应数据的总长度,则说明还有IP包未收完全,因此return false,继续接收IP包,让其缓存进IoBuffer缓冲区中,进行数据拼接,直到将数据拼接成完整的服务器端响应的数据为止。

(7)当IoBuffer缓冲区中的数据长度等于服务器端响应数据的总长度,则开始按照协议文件对数据进行解析,解析完成之后,注意要返回true,返回true,表明结束当前数据的解析,也表明消费掉了IoBuffer缓中区中存放的数据。

说明一下:这里面的协议文件指的就是服务器端接口编写人员编写的名字xxx.proto的文件,这个文件里面规定了协议头、CRC校验码、接口请求的参数与响应的字段等等信息(这块儿可以与服务器端的人员好好沟通下)。

 
      2、示例代码

package com.goodwin.finance.net.socket;

import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.ArrayList;
import java.util.List;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;

import android.util.Log;

import com.goodwin.finance.protobuf.HqProtos.tagAnsHead;
import com.goodwin.finance.protobuf.HqProtos.tagCommon;
import com.goodwin.finance.protobuf.HqProtos.tagCommon.MF;
import com.goodwin.finance.protobuf.HqProtos.tagCommon.SF;
import com.goodwin.finance.protobuf.HqProtos.tagResponseDataArchitecture;
import com.goodwin.finance.protobuf.HqProtos.tagResponseHeartbeat;
import com.goodwin.finance.protobuf.HqProtos.tagResponseICSortFastData;
import com.goodwin.finance.protobuf.HqProtos.tagResponseL1StockMinuteData;
import com.goodwin.finance.util.DataUtil;
import com.google.protobuf.InvalidProtocolBufferException;

/**
 * 将行情服务器返回的数据进行解包处理,并重新编码
 * @author Administrator
 */
public class SocketDecoder extends CumulativeProtocolDecoder {
	// 日志输出标记
	private static final String TAG = "SocketDecoder";
	// 字符流转码对象
	private final CharsetDecoder charsetDecoder;
	// IP包包头的字节数
	public static final int MSG_HEADER_LENGTH = 4;
	// IP包中CRC的长度
	public static final int CRC_LENGTH = 2;

	public SocketDecoder(String charset) {
		this.charsetDecoder = Charset.forName(charset).newDecoder();
	}

	@Override
	protected synchronized boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
		//解析数据包头部信息,获取到该响应数据的总长度
		if (HqClient.time == -1) {
			deCodeHeader(session, in);
			HqClient.time = 1;
		}
		//1、开始接收第一个数据包
		int dataLength = in.remaining();

		if (dataLength >= MSG_HEADER_LENGTH + CRC_LENGTH) {
			//如果当前读取的缓冲区中的数据,小于响应数据的总长度,则让缓冲区继续累积数据,一直累积到数据接收完成,即缓冲区中数据的长度等于响应数据的总长度时,则开始解析.
			if (dataLength < HqClient.contentSize) {
				Log.i(TAG, "=======================缓冲数据区缓冲大小:" + dataLength + "=====================");
				return false;//接收下一数据包,累积至缓冲区,直至接收完成.
			} else {//如果当前缓冲区的数据等于 发送内容的长度,则开始解析数据,否则继续缓冲
				byte[] dataB = new byte[dataLength];
				in.get(dataB);
				HqClient.dataBuf.put(dataB);

				HqClient.dataBuf.flip();
				byte[] headSizeB = new byte[4];
				HqClient.dataBuf.get(headSizeB);

				byte[] crcB = new byte[2];
				HqClient.dataBuf.get(crcB);

				int headsize = DataUtil.byteArray2int(headSizeB);
				byte[] headBytes = new byte[headsize];// 请求头tagReqHead
				HqClient.dataBuf.get(headBytes);

				tagAnsHead reqHead = tagAnsHead.parseFrom(headBytes);
				tagCommon comm = reqHead.getComm();
				int contentsize = comm.getContentsize();

				byte[] contentBytes = new byte[contentsize];// 请求内容
				HqClient.dataBuf.get(contentBytes);

				List<Object> hqDatas = new ArrayList<Object>();
				if (comm.getMainFunction().equals(MF.Stock)) {
					if (comm.getSubFunction().equals(SF.dat_architecture)) {// 数据框架
						tagResponseDataArchitecture dataArchitecture = tagResponseDataArchitecture.parseFrom(contentBytes);
						hqDatas.add(dataArchitecture);
					} else if (comm.getSubFunction().equals(SF.baojialiebiao)) {//报价列表
						tagResponseICSortFastData icSortFastData = tagResponseICSortFastData.parseFrom(contentBytes);
						hqDatas.add(icSortFastData);
					} else if (comm.getSubFunction().equals(SF.l1RealTime)) {//多个股票分时线
						tagResponseL1StockMinuteData l1StockMinuteData = tagResponseL1StockMinuteData.parseFrom(contentBytes);
						hqDatas.add(l1StockMinuteData);
					} else if (comm.getSubFunction().equals(SF.heartbeat)) {
						tagResponseHeartbeat heartbeat = tagResponseHeartbeat.parseFrom(contentBytes);
						hqDatas.add(heartbeat);
					}
				} else if (comm.getMainFunction().equals(MF.Notify)) {
					hqDatas.add(reqHead);
				}

				HqClient.totalLength = -1;
				HqClient.contentSize = -1;
				HqClient.time = -1;//
				HqClient.dataBuf.flip();//极限设为位置,位置设为0。
				HqClient.dataBuf.clear();//不改变极限,位置设为0。

				out.write(hqDatas);
				Log.i(TAG, "=======================缓冲区数据解析完毕:" + dataLength + "=====================");
				return true;
			}
		}
		return false;
	}

	/**
	 * 解析包头
	 * @throws InvalidProtocolBufferException
	 */
	private void deCodeHeader(IoSession session, IoBuffer in) throws InvalidProtocolBufferException {
		if (in.remaining() >= MSG_HEADER_LENGTH + CRC_LENGTH) {

			int totalLength = HqClient.totalLength;

			byte[] headSizeB = new byte[4];
			in.get(headSizeB);
			HqClient.dataBuf.put(headSizeB);

			byte[] crcB = new byte[2];
			in.get(crcB);
			HqClient.dataBuf.put(crcB);

			int headsize = DataUtil.byteArray2int(headSizeB);
			byte[] headBytes = new byte[headsize];
			in.get(headBytes);
			HqClient.dataBuf.put(headBytes);

			tagAnsHead reqHead = tagAnsHead.parseFrom(headBytes);
			tagCommon comm = reqHead.getComm();
			int contentsize = comm.getContentsize();
			totalLength = 4 + 2 + headsize + contentsize;
			HqClient.contentSize = contentsize;
			HqClient.totalLength = totalLength;

			Log.i(TAG, "===================================信息头长度为:" + headsize + "===================================");
			Log.i(TAG, "===================================信息内容长度为:" + contentsize + "===================================");
			Log.i(TAG, "===================================信息总长度为:" + totalLength + "===================================");
		}
	}
}

七、后记

文章写得较为散乱,但却是总结了多篇文章之精华,结合自己的实践提炼而成。如有不足之处,敬请各位IT精英们指正。

结尾之余想到一点东东:服务器端为什么会将数据分割成多个IP包进行发送?这个涉及到网络相关的东东,小的概念上捣腾的不是十分清楚,需要大家自己查寻,或者是复习下《计算机网络》相关的东东,呵呵,多扯了!!!

时间: 2024-10-08 09:19:52

MINA粘包断包专题研究的相关文章

NIO框架之MINA源码解析(四):粘包与断包处理及编码与解码

1.粘包与段包 粘包:指TCP协议中,发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾.造成的可能原因: 发送端需要等缓冲区满才发送出去,造成粘包 接收方不及时接收缓冲区的包,造成多个包接收 断包:也就是数据不全,比如包太大,就把包分解成多个小包,多次发送,导致每次接收数据都不全. 2.消息传输的格式 消息长度+消息头+消息体  即前N个字节用于存储消息的长度,用于判断当前消息什么时候结束. 消息头+消息体    即固定长度的消息,前几个字节为消息

Mina框架断包、粘包问题解决方案

Mina框架断包.粘包问题解决方案 Apache Mina Server 是一个网络通信应用框架,也就是说,它主要是对基于TCP/IP.UDP/IP协议栈的通信框架(当然,也可以提供JAVA 对象的序列化服务.虚拟机管道通信服务等),Mina 可以帮助我们快速开发高性能.高扩展性的网络通信应用,Mina 提供了事件驱动.异步(Mina 的异步IO 默认使用的是JAVA NIO 作为底层支持)操作的编程模型. 在mina中,一般的应用场景用TextLine的Decode和Encode就够用了(Te

mina websocket 粘包、断包、(丢包)解决心得

被这3个(其实是2个)问题坑惨了,目前没发现存在丢包问题,之前认为的丢包问题事实是不存在的. 粘包和断包的情况是存在的,这两个问题不怕,只要发送接收到的数据包顺序没有被打乱颠倒,一切都好办. 容易掉的坑:acceptor.getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool())); 这个东西容易导致断包的处理顺序被颠倒. 断包只要不处理,累积够了,可以继续再处理.

基于NIO的消息路由的实现(四) 服务端通讯主线程(2)断包和粘包的处理

本来我打算单独开一章,专门说明粘包和断包,但是觉得这个事儿我在做的时候挺头疼的,但是对于别人或许不那么重要,于是就在这里写吧. 那么何谓粘包.何谓断包呢? 粘包:我们知道客户端在写入报文给服务端的时候,首先要将需要写入的内容写入Buffer,以ByteBuffer为例,如果你Buffer定义的足够大,并且你发送的报文足够快,此时就会产生粘包现象,举例来说 你发送一个 报文" M|A",然后你有发送了一个"M|B",如果产生粘包,服务端从缓冲区里面读出的就是"

mina的编码和解码以及断包的处理,发送自定义协议,仿qq聊天,发送xml或json和

最近一段时间以来,mina很火,和移动开发一样,异常的火爆.前面写了几篇移动开发的文章,都还不错,你们的鼓励就是我最大的动力.好了,废话少说.我们来看下tcp通讯吧. tcp通讯对于java来说是很简单的.就是socket,也就是大家常说的套接字.大家不要把它看的很难.说白了tcp通讯其实就是数据流的读写.一条输入流,一条输出流.分别复杂发消息和接收消息. 明白了这些,ok,我们来看看我写的例子吧.先看服务器端的测试类的源码: package com.minaqq.test; import co

C# SOCKET 粘包、断包处理(一)

一直是用JAVA,关于SOCKET方面,JAVA有一个不错的框架MINA2,对于粘包.断包的处理有这个良好的处理,个人需要写的代码并不太多. 而C#.因为了解不多,也没去看第三方的SOCKET框架,所以只好根据MSDN提示,自己去实现了. 在代码之前,我们先说说处理中会碰到的情况如何: 1.先假设数据包的格式如下: 包长度(4字节)MD5签名(32字节)客户端ID(5字节)数据类型(5字节)数据ID(32字节)数据内容(动态长度.不固定) |---------------------------

Java 粘包/半包 原理与拆包实战(史上最全)

疯狂创客圈 Java 聊天程序[ 亿级流量]实战系列之13 [博客园 总入口 ] 本文的源码工程:Netty 粘包/半包原理与拆包实战 源码 本实例是<Netty 粘包/半包原理与拆包实战> 一文的源代码工程. 写在前面 大家好,我是作者尼恩. 为了完成了一个高性能的 Java 聊天程序,在前面的文章中,尼恩已经再一次的进行了通讯协议的重新选择. 这就是:放弃了大家非常熟悉的json 格式,选择了性能更佳的 Protobuf协议. 在上一篇文章中,并且完成了Netty 和 Protobuf协议

[编织消息框架][设计协议]解决粘包半包(下)

接下来介绍netty如何切割分包 学习目的,了解处理业务,方便以后脱离依赖 读者如果不感兴趣或看不懂可以先忽略,难度比较大 LengthFieldBasedFrameDecoder.class public LengthFieldBasedFrameDecoder( ByteOrder byteOrder, //大小端模式 默认大端 ByteOrder BIG_ENDIAN int maxFrameLength, //包Frame netty叫帧概念 最大上限 int lengthFieldOf

根据首尾字节的tcp分包断包算法

这个算是我的一点小总结吧,放出来分享给大家,原来在网上找这种算法都找了N久没找到,自己写也是走了许多弯路,就放出来遛一遛吧 大家将就这个看看, 这是其中的一个主要的方法,其余的我就不放出来了,其中的IndexTag和endTag分别是首尾字节 1 public List<Gnss808RequesInfo> Filter(ref byte[] message) 2 { 3 var list = new List<Gnss808RequesInfo>(); 4 //是否退出循环 5