一、前述:
近期做项目用到了MINA,其中遇到了一个断包与粘包的问题,困扰了我一天一夜,经过一天一夜的思索与查看其他大牛分享的资料,现将我在解决这一问题过程中的一些心得与解决问题的方法记录下来,供广大IT兄弟姐妹们参考,如有不对或欠妥之处,请指证。请不要吝惜分享您的技术,作为中国IT软件工程师,一定要想到多一个人掌握IT技术,不会给你增加一个竞争对手,如果认为会给你增加竞争对手,这种想法是非常狭隘的,自私自利的。只有分享,大家共同的技术提高了,才能激发出更多的思维解决更加棘手的技术难点,希望大家永远Open,为咱们中国软件技术的振兴尽一分心力。
二、概念
1、什么是粘包,粘包是如何产生的?
指TCP协议中,发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾。
造成的可能原因:
(1)发送端需要等缓冲区满才发送出去,造成粘包。
(2)接收方不及时接收缓冲区的包,造成多个包接收。
2、什么是断包,断包是如何产生的?
也就是数据不全,比如包太大,就把包分解成多个小包,多次发送,导致每次接收数据都不全。
三、在protobuf中客户端如何接收服务器端响应的数据?
客户端在接收数据时需要考虑以下几种情况:
1. 一个ip包中只包含一个完整消息
2.
一个ip包中包含一个完整消息和另一个消息的一部分
3.
一个ip包中包含一个消息的一部分
4.
一个ip包中包含两个完整的数据消息或更多
四、编写解包类,解析服务器端响应的数据。
1、思路
MINA中的CumulativeProtocolDecoder类,已经对断包粘包的问题做了挺完美的处理,我们可以编写一个SocketDecoder类继承自CumulativeProtocolDecoder类,然后实现CumulativeProtocolDecoder中的抽象方法,doDecode()。
2、CumulativeProtocolDecoder粘包、断包问题解决的原理
( 1)、当doDecode()方法返回true时, CumulativeProtocolDecoder的方法会首先判断你是否在doDecode()方法中从内部的IoBuffer缓冲区读取了数据,如果没有,则会抛出非法的状态异常,也就是你的doDecode()方法返回true就表示你已经消费了本次数据(相当于聊天室中一个完整的消息已经读取完毕),进一步说,也就是此时你必须已经消费过内部的IoBuffer缓冲区数据(哪怕是消费了一个字节的数据)。如果验证通过,那么CumulativeProtocolDecoder会检查缓冲区内是否还有数据未读取,如果有就继续调用doDecode()方法,没有就停止对doDecode()的调用,直到有新的数据被缓冲。
( 2)、当你的doDecode()方法返回false时,CumulativeProtocolDecoder会停止对doDecode()方法的调用,但此时如果本次数据还有未读取完的,就将含有剩余数据的IoBuffer缓冲区保存到IoSession中,以便下一次数据到来时可以从IoSession中提取合并。如果发现本次数据全都读取完毕,则清空IoBuffer缓冲区。
( 3)、原理归纳
总的来说:当你认为读取到的数据已经够解码了,那么就返回true,否则就返回false。这个CumulativeProtocolDecoder其实最重要的工作就是给你完成了数据的累积,因为这个工作是很烦锁的。
doDecode()方法返回true:CumulativeProtocolDecoder会再次调用decoder,并把剩余的数据发下来。
doDecode()方法返回false:CumulativeProtocolDecoder就不处理剩余的,当有新数据包来的时候把剩余的和新的拼接在一起然后再调用decode()方法。
(4)累积是什么意思
累积是服务器端响应的数据分为多个IP包发送到客户端的时候,它会多次去调用doDecode()方法,doDecode()方法的执行也是由于他的父类CumulativeProtocolDecoder决定的,大家看源码去走读一下就会明白,在这里不做过多解释。明确一点,IP包发送到客户端之后,doDecode()方法会多次调用,多次调用是为了把IP包拼装成完整的服务器端响应的数据。
五、CumulativeProtocolDecoder源码
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.mina.filter.codec; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.service.TransportMetadata; import org.apache.mina.core.session.AttributeKey; import org.apache.mina.core.session.IoSession; /** * A {@link ProtocolDecoder} that cumulates the content of received * buffers to a <em>cumulative buffer</em> to help users implement decoders. * <p> * If the received {@link IoBuffer} is only a part of a message. * decoders should cumulate received buffers to make a message complete or * to postpone decoding until more buffers arrive. * <p> * Here is an example decoder that decodes CRLF terminated lines into * <code>Command</code> objects: * <pre> * public class CrLfTerminatedCommandLineDecoder * extends CumulativeProtocolDecoder { * * private Command parseCommand(IoBuffer in) { * // Convert the bytes in the specified buffer to a * // Command object. * ... * } * * protected boolean doDecode( * IoSession session, IoBuffer in, ProtocolDecoderOutput out) * throws Exception { * * // Remember the initial position. * int start = in.position(); * * // Now find the first CRLF in the buffer. * byte previous = 0; * while (in.hasRemaining()) { * byte current = in.get(); * * if (previous == '\r' && current == '\n') { * // Remember the current position and limit. * int position = in.position(); * int limit = in.limit(); * try { * in.position(start); * in.limit(position); * // The bytes between in.position() and in.limit() * // now contain a full CRLF terminated line. * out.write(parseCommand(in.slice())); * } finally { * // Set the position to point right after the * // detected line and set the limit to the old * // one. * in.position(position); * in.limit(limit); * } * // Decoded one line; CumulativeProtocolDecoder will * // call me again until I return false. So just * // return true until there are no more lines in the * // buffer. * return true; * } * * previous = current; * } * * // Could not find CRLF in the buffer. Reset the initial * // position to the one we recorded above. * in.position(start); * * return false; * } * } * </pre> * <p> * Please note that this decoder simply forward the call to * {@link #doDecode(IoSession, IoBuffer, ProtocolDecoderOutput)} if the * underlying transport doesn't have a packet fragmentation. Whether the * transport has fragmentation or not is determined by querying * {@link TransportMetadata}. * * @author <a href="http://mina.apache.org">Apache MINA Project</a> */ public abstract class CumulativeProtocolDecoder extends ProtocolDecoderAdapter { private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer"); /** * Creates a new instance. */ protected CumulativeProtocolDecoder() { // Do nothing } /** * Cumulates content of <tt>in</tt> into internal buffer and forwards * decoding request to {@link #doDecode(IoSession, IoBuffer, ProtocolDecoderOutput)}. * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt> * and the cumulative buffer is compacted after decoding ends. * * @throws IllegalStateException if your <tt>doDecode()</tt> returned * <tt>true</tt> not consuming the cumulative buffer. */ public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { if (!session.getTransportMetadata().hasFragmentation()) { while (in.hasRemaining()) { // 判断是否符合解码要求,不符合则中断并返回 if (!doDecode(session, in, out)) { break; } } return; } boolean usingSessionBuffer = true; // 取得上次断包数据 IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER); // If we have a session buffer, append data to that; otherwise // use the buffer read from the network directly. if (buf != null) {// 如果有断包数据 boolean appended = false; // Make sure that the buffer is auto-expanded. if (buf.isAutoExpand()) { try { // 将断包数据和当前传入的数据进行拼接 buf.put(in); appended = true; } catch (IllegalStateException e) { // A user called derivation method (e.g. slice()), // which disables auto-expansion of the parent buffer. } catch (IndexOutOfBoundsException e) { // A user disabled auto-expansion. } } if (appended) { buf.flip();// 如果是拼接的数据,将buf置为读模式 } else { // Reallocate the buffer if append operation failed due to // derivation or disabled auto-expansion. //如果buf不是可自动扩展的buffer,则通过数据拷贝的方式将断包数据和当前数据进行拼接 buf.flip(); IoBuffer newBuf = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true); newBuf.order(buf.order()); newBuf.put(buf); newBuf.put(in); newBuf.flip(); buf = newBuf; // Update the session attribute. session.setAttribute(BUFFER, buf); } } else { buf = in; usingSessionBuffer = false; } for (;;) { int oldPos = buf.position(); boolean decoded = doDecode(session, buf, out);// 进行数据的解码操作 if (decoded) { // 如果符合解码要求并进行了解码操作,则当前position和解码前的position不可能一样 if (buf.position() == oldPos) { throw new IllegalStateException("doDecode() can't return true when buffer is not consumed."); } // 如果已经没有数据,则退出循环 if (!buf.hasRemaining()) { break; } } else { // 如果不符合解码要求,则退出循环 break; } } // if there is any data left that cannot be decoded, we store // it in a buffer in the session and next time this decoder is // invoked the session buffer gets appended to if (buf.hasRemaining()) { if (usingSessionBuffer && buf.isAutoExpand()) { buf.compact(); } else { //如果还有没处理完的数据(一般为断包),刚将此数据存入session中,以便和下次数据进行拼接。 storeRemainingInSession(buf, session); } } else { if (usingSessionBuffer) { removeSessionBuffer(session); } } } /** * Implement this method to consume the specified cumulative buffer and * decode its content into message(s). * * @param in the cumulative buffer * @return <tt>true</tt> if and only if there's more to decode in the buffer * and you want to have <tt>doDecode</tt> method invoked again. * Return <tt>false</tt> if remaining data is not enough to decode, * then this method will be invoked again when more data is cumulated. * @throws Exception if cannot decode <tt>in</tt>. */ protected abstract boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception; /** * Releases the cumulative buffer used by the specified <tt>session</tt>. * Please don't forget to call <tt>super.dispose( session )</tt> when * you override this method. */ @Override public void dispose(IoSession session) throws Exception { removeSessionBuffer(session); } private void removeSessionBuffer(IoSession session) { session.removeAttribute(BUFFER); } private void storeRemainingInSession(IoBuffer buf, IoSession session) { final IoBuffer remainingBuf = IoBuffer.allocate(buf.capacity()).setAutoExpand(true); remainingBuf.order(buf.order()); remainingBuf.put(buf); session.setAttribute(BUFFER, remainingBuf); } }
六、自定义SocketDecoder类的写法(我在项目中的解码类):
1、思路梳理
(1)当Socket长连接成功之后,客户端向服务器端发起接口调用,请求相关业务数据。
(2)服务器端响应,将业务数据write到IoSession的IoBuffer中。
(3)客户端考虑四种情况(第三大点中已经列出,不再累述)
(4)客户端从IP包包头中读取到响应数据的总长度
(5)接下来再通过调用IoBuffer类的remaining()方法,获取到IP包中数据的长度。
(6)判断IP包中数据的长度,是否等于数据的总长度,如果是小于响应数据的总长度,则说明还有IP包未收完全,因此return false,继续接收IP包,让其缓存进IoBuffer缓冲区中,进行数据拼接,直到将数据拼接成完整的服务器端响应的数据为止。
(7)当IoBuffer缓冲区中的数据长度等于服务器端响应数据的总长度,则开始按照协议文件对数据进行解析,解析完成之后,注意要返回true,返回true,表明结束当前数据的解析,也表明消费掉了IoBuffer缓中区中存放的数据。
说明一下:这里面的协议文件指的就是服务器端接口编写人员编写的名字xxx.proto的文件,这个文件里面规定了协议头、CRC校验码、接口请求的参数与响应的字段等等信息(这块儿可以与服务器端的人员好好沟通下)。
2、示例代码
package com.goodwin.finance.net.socket; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.util.ArrayList; import java.util.List; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; import android.util.Log; import com.goodwin.finance.protobuf.HqProtos.tagAnsHead; import com.goodwin.finance.protobuf.HqProtos.tagCommon; import com.goodwin.finance.protobuf.HqProtos.tagCommon.MF; import com.goodwin.finance.protobuf.HqProtos.tagCommon.SF; import com.goodwin.finance.protobuf.HqProtos.tagResponseDataArchitecture; import com.goodwin.finance.protobuf.HqProtos.tagResponseHeartbeat; import com.goodwin.finance.protobuf.HqProtos.tagResponseICSortFastData; import com.goodwin.finance.protobuf.HqProtos.tagResponseL1StockMinuteData; import com.goodwin.finance.util.DataUtil; import com.google.protobuf.InvalidProtocolBufferException; /** * 将行情服务器返回的数据进行解包处理,并重新编码 * @author Administrator */ public class SocketDecoder extends CumulativeProtocolDecoder { // 日志输出标记 private static final String TAG = "SocketDecoder"; // 字符流转码对象 private final CharsetDecoder charsetDecoder; // IP包包头的字节数 public static final int MSG_HEADER_LENGTH = 4; // IP包中CRC的长度 public static final int CRC_LENGTH = 2; public SocketDecoder(String charset) { this.charsetDecoder = Charset.forName(charset).newDecoder(); } @Override protected synchronized boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { //解析数据包头部信息,获取到该响应数据的总长度 if (HqClient.time == -1) { deCodeHeader(session, in); HqClient.time = 1; } //1、开始接收第一个数据包 int dataLength = in.remaining(); if (dataLength >= MSG_HEADER_LENGTH + CRC_LENGTH) { //如果当前读取的缓冲区中的数据,小于响应数据的总长度,则让缓冲区继续累积数据,一直累积到数据接收完成,即缓冲区中数据的长度等于响应数据的总长度时,则开始解析. if (dataLength < HqClient.contentSize) { Log.i(TAG, "=======================缓冲数据区缓冲大小:" + dataLength + "====================="); return false;//接收下一数据包,累积至缓冲区,直至接收完成. } else {//如果当前缓冲区的数据等于 发送内容的长度,则开始解析数据,否则继续缓冲 byte[] dataB = new byte[dataLength]; in.get(dataB); HqClient.dataBuf.put(dataB); HqClient.dataBuf.flip(); byte[] headSizeB = new byte[4]; HqClient.dataBuf.get(headSizeB); byte[] crcB = new byte[2]; HqClient.dataBuf.get(crcB); int headsize = DataUtil.byteArray2int(headSizeB); byte[] headBytes = new byte[headsize];// 请求头tagReqHead HqClient.dataBuf.get(headBytes); tagAnsHead reqHead = tagAnsHead.parseFrom(headBytes); tagCommon comm = reqHead.getComm(); int contentsize = comm.getContentsize(); byte[] contentBytes = new byte[contentsize];// 请求内容 HqClient.dataBuf.get(contentBytes); List<Object> hqDatas = new ArrayList<Object>(); if (comm.getMainFunction().equals(MF.Stock)) { if (comm.getSubFunction().equals(SF.dat_architecture)) {// 数据框架 tagResponseDataArchitecture dataArchitecture = tagResponseDataArchitecture.parseFrom(contentBytes); hqDatas.add(dataArchitecture); } else if (comm.getSubFunction().equals(SF.baojialiebiao)) {//报价列表 tagResponseICSortFastData icSortFastData = tagResponseICSortFastData.parseFrom(contentBytes); hqDatas.add(icSortFastData); } else if (comm.getSubFunction().equals(SF.l1RealTime)) {//多个股票分时线 tagResponseL1StockMinuteData l1StockMinuteData = tagResponseL1StockMinuteData.parseFrom(contentBytes); hqDatas.add(l1StockMinuteData); } else if (comm.getSubFunction().equals(SF.heartbeat)) { tagResponseHeartbeat heartbeat = tagResponseHeartbeat.parseFrom(contentBytes); hqDatas.add(heartbeat); } } else if (comm.getMainFunction().equals(MF.Notify)) { hqDatas.add(reqHead); } HqClient.totalLength = -1; HqClient.contentSize = -1; HqClient.time = -1;// HqClient.dataBuf.flip();//极限设为位置,位置设为0。 HqClient.dataBuf.clear();//不改变极限,位置设为0。 out.write(hqDatas); Log.i(TAG, "=======================缓冲区数据解析完毕:" + dataLength + "====================="); return true; } } return false; } /** * 解析包头 * @throws InvalidProtocolBufferException */ private void deCodeHeader(IoSession session, IoBuffer in) throws InvalidProtocolBufferException { if (in.remaining() >= MSG_HEADER_LENGTH + CRC_LENGTH) { int totalLength = HqClient.totalLength; byte[] headSizeB = new byte[4]; in.get(headSizeB); HqClient.dataBuf.put(headSizeB); byte[] crcB = new byte[2]; in.get(crcB); HqClient.dataBuf.put(crcB); int headsize = DataUtil.byteArray2int(headSizeB); byte[] headBytes = new byte[headsize]; in.get(headBytes); HqClient.dataBuf.put(headBytes); tagAnsHead reqHead = tagAnsHead.parseFrom(headBytes); tagCommon comm = reqHead.getComm(); int contentsize = comm.getContentsize(); totalLength = 4 + 2 + headsize + contentsize; HqClient.contentSize = contentsize; HqClient.totalLength = totalLength; Log.i(TAG, "===================================信息头长度为:" + headsize + "==================================="); Log.i(TAG, "===================================信息内容长度为:" + contentsize + "==================================="); Log.i(TAG, "===================================信息总长度为:" + totalLength + "==================================="); } } }
七、后记
文章写得较为散乱,但却是总结了多篇文章之精华,结合自己的实践提炼而成。如有不足之处,敬请各位IT精英们指正。
结尾之余想到一点东东:服务器端为什么会将数据分割成多个IP包进行发送?这个涉及到网络相关的东东,小的概念上捣腾的不是十分清楚,需要大家自己查寻,或者是复习下《计算机网络》相关的东东,呵呵,多扯了!!!