mina解决粘包,找不到解码器,数据帧重传的问题

好久没写这个mina了,为了对之前的一篇博文Mina传输大数组,多路解码,粘包问题的处理  进行更进一步的补充,特此再来补说明。特别解决三个问题:

1,大数组粘包     在上篇的博文中提到用累积性解码器解决传输大数组的问题,还有可能出现粘包,解决方法是对decode方法进行了改进:

    public MessageDecoderResult decode(IoSession session, IoBuffer in,
                                       ProtocolDecoderOutput out) throws Exception {

        Context ctx =getContext(session);//获取session  的context

        long matchCount=ctx.getMatchLength();//目前已获取的数据
        long length=ctx.getLength();//数据总长度
        IoBuffer buffer=ctx.getBuffer();//数据存入buffer

        //第一次取数据
        if(length==0){
            length=in.getLong();
            //保存第一次获取的长度
            ctx.setLength(length);
            matchCount=in.remaining();
        }
        else{
            matchCount+=in.remaining();
        }
        ctx.setMatchLength(matchCount);
        if (in.hasRemaining()) {// 如果buff中还有数
///////////////////改进的部分//////////////////////////////
            if(matchCount< length) {
                buffer.put(in);// 添加到保存数据的buffer中
            }
            if (matchCount >= length) {// 如果已经发送的数据的长度>=目标数据的长度,则进行解码
                final byte[] b = new byte[(int) length];
                byte[] temp = new byte[(int) length];
                in.get(temp,0, (int) (length-buffer.position()));//最后一次in的数据可能有多的
                buffer.put(temp);
/////////////////////////////////////////////////////////
                // 一定要添加以下这一段,否则不会有任何数据,因为,在执行in.put(buffer)时buffer的起始位置已经移动到最后,所有需要将buffer的起始位置移动到最开始
                buffer.flip();
                buffer.get(b);
                <span style="font-family: Consolas, 'Courier New', Courier, mono, serif; line-height: 18px;">    </span><span class="comment" style="margin: 0px; padding: 0px; border: none; color: rgb(0, 130, 0); font-family: Consolas, 'Courier New', Courier, mono, serif; line-height: 18px;">////自己解码的部分///////</span><span style="margin: 0px; padding: 0px; border: none; font-family: Consolas, 'Courier New', Courier, mono, serif; line-height: 18px;"> </span>

                ctx.reset();//清空
                return MessageDecoderResult.OK;

            } else {
                ctx.setBuffer(buffer);
                return MessageDecoderResult.NEED_DATA;
            }
        }
        return MessageDecoderResult.NEED_DATA;
    }

2,如果上传的数据有两种类型,也就是说有两个大数组,两个数组不停地轮流向通道中发数据,处理过程中也出现了不少问题。

首先,毋庸置疑肯定要用累积性解码器,在通信网速良好,信息无误的情况下,一切是美好的;

但是,在实际中,项目的要求是要针对两种数据的看重程度不一样的,将两种数组分别说成是数组A,数组B,看重程度如下:

数组A必须准确无误的收到。若成功收到回复一个成功应答标志,否则发一个失败标志;同时,服务器若收到成功标志发下一个数据组B,若收到失败标志则将当前数组再发一次;

而数组B,不管是否有没有成功收到,都回复一种标志,服务器收到该标志后直接发数组A;

在这种模式下,难点就在于数组B 的接收有丢失,这时还停留在B的解码器中,而这是服务器直接发来了数组A;在保证数据不丢失的情况下,如何实现数据的跳转?

具体实现是借助全局变量和通道空闲来实现,直接贴代码说明:

数组A的传输解码器:

public class ADecoder implements MessageDecoder {

    private final AttributeKey CONTEXT = new AttributeKey(getClass(),
            "context");
    private int k;
    private ReceiveRight mReceiveRight = new ReceiveRight();
    private ReceiveWrong mReceiveWrong = new ReceiveWrong();

    @Override
    public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
        if(Constants.flag ) {//如果在B解码器中,收到了一部分A数据,该部分数据有区分解码器的标志
            Constants.buffer.flip();
            Constants.buffer.limit(Constants.positionValue);
            byte headtail = Constants.buffer.get();
            byte functionCode = Constants.buffer.get();
            if (functionCode == 0x51 || functionCode == 0x53) {
                Constants.Isstop=false;
                return MessageDecoderResult.OK;

            } else {
                Constants.Isstop=true;
                return MessageDecoderResult.NOT_OK;
            }
        }else {
            if (in.remaining() < 2) {
                return MessageDecoderResult.NEED_DATA;
            } else {

                byte functionCode = in.get();
                if (functionCode == 0x51 || functionCode == 0x53) {
                    Constants.Isstop=false;
                    return MessageDecoderResult.OK;
                } else {
                    Constants.Isstop=true;
                    return MessageDecoderResult.NOT_OK;
                }
            }
        }
    }

    @Override
    public MessageDecoderResult decode(IoSession session, IoBuffer in,
                                       final ProtocolDecoderOutput out) throws Exception {
        Constants.ctx = getContext(session);//获取session  的context
        long matchCount = Constants.ctx.getMatchLength();//目前已获取的数据
        long length = Constants.ctx.getLength();//数据总长度
        IoBuffer buffer = Constants.ctx.getBuffer();//数据存入buffer

        if (Constants.flag) {//<span style="font-family: Arial, Helvetica, sans-serif;">如果在B解码器中,收到了一部分A数据,要先将这部分数据存入累积性解码器的buffer中,以拼凑完整</span>

            Constants.buffer.flip();
            Constants.buffer.limit(Constants.positionValue);
            buffer.put(Constants.buffer);
            matchCount=Constants.positionValue;
            Constants.buffer.clear();
            Constants.flag=false;
            Constants.positionValue=0;
        }

///////////////////////////////////////////////////
        matchCount += in.remaining();
        Log.d("abcd", "共收到字节:" + String.valueOf(matchCount));
        Constants.ctx.setMatchLength(matchCount);

        if (in.hasRemaining()) {// 如果in中还有数据
            if(matchCount< length) {
                buffer.put(in);// 添加到保存数据的buffer中
            }
            if (matchCount >= length) {// 如果已经发送的数据的长度>=目标数据的长度,则进行解码
                final byte[] b = new byte[1614];
                byte[] temp = new byte[1614];
                in.get(temp,0, (int) (length-buffer.position()));//最后一次in的数据可能有多的
                buffer.put(temp);
                // 一定要添加以下这一段,否则不会有任何数据,因为,在执行in.put(buffer)时buffer的起始位置已经移动到最后,所有需要将buffer的起始位置移动到最开始
                buffer.flip();
                buffer.get(b);

               //解码部分
                        Constants.NotFill = false;//收成功,NotFill表示没满的变量
                        k++;
                        Log.d("sucess", "成功次数:" + String.valueOf(k));

                }
                Constants.ctx.reset();
                return MessageDecoderResult.OK;
            } else {
                Constants.ctx.setBuffer(buffer);
                Constants.NotFill = true;
                return MessageDecoderResult.NEED_DATA;
            }
        }
        return MessageDecoderResult.NEED_DATA;
    }

    @Override
    public void finishDecode(IoSession ioSession, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {

    }

数组B的解码器

public class BDecoder implements MessageDecoder {

    private final AttributeKey CONTEXT = new AttributeKey(getClass(),
            "context");
    private ComputePara computePara=new ComputePara();
    private ReceiveRight mReceiveRight = new ReceiveRight();
    private ReceiveWrong mReceiveWrong = new ReceiveWrong();

    @Override
    public MessageDecoderResult decodable(IoSession session, IoBuffer in) {

        if(Constants.flag ){//区分解码器的标志出现在上一次剩余数据中
            Constants.buffer.limit(Constants.positionValue);
            Constants.buffer.flip();
            byte functionCode = Constants.buffer.get();

            if (functionCode == 0x52) {
                Constants.Isstop=false;
                return MessageDecoderResult.OK;
            } else {
                Constants.Isstop=true;
                return MessageDecoderResult.NOT_OK;
            }
        }else {
            if (in.remaining() < 2) {
                return MessageDecoderResult.NEED_DATA;
            } else {

                byte functionCode = in.get();

                if (functionCode == 0x52) {
                    Constants.Isstop=false;
                    return MessageDecoderResult.OK;

                } else {
                    Constants.Isstop=true;
                    return MessageDecoderResult.NOT_OK;
                }
            }
        }
    }
    @Override
    public MessageDecoderResult decode(IoSession session, IoBuffer in,
                                       final ProtocolDecoderOutput out) throws Exception {

        if(Constants.IsJump&&(!Constants.Backfail)){
            Constants.IsJump=false;
            Constants.flag = true;
            Constants.positionValue=in.limit();
            Constants.buffer.clear();
            Constants.buffer.put(in);
            Log.d("back", "jump");
            return MessageDecoderResult.OK;
        }

        Constants.ctxBack = getContext(session);//获取session  的context
        long matchCount = Constants.ctxBack .getMatchLength();//目前已获取的数据
        long length = Constants.ctxBack .getLength();//数据总长度
        IoBuffer buffer = Constants.ctxBack .getBuffer();//数据存入buffer

///////////////////////////////////////////////////
        matchCount += in.remaining();
        Log.d("back", "共收到字节:" + String.valueOf(matchCount));
        Constants.ctxBack .setMatchLength(matchCount);
        if (in.hasRemaining()) {// 如果in中还有数据
            if(matchCount< length) {
                buffer.put(in);// 添加到保存数据的buffer中
            }
            if (matchCount >= length) {// 如果已经发送的数据的长度>=目标数据的长度,则进行解码
                final byte[] b = new byte[1561];
                byte[] temp = new byte[1561];
                in.get(temp,0, (int) (length-buffer.position()));//最后一次in的数据可能有多的
                buffer.put(temp);
                // 一定要添加以下这一段,否则不会有任何数据,因为,在执行in.put(buffer)时buffer的起始位置已经移动到最后,所有需要将buffer的起始位置移动到最开始
                buffer.flip();
                buffer.get(b);
                 //解码过程

                Constants.ctxBack .reset();
                return MessageDecoderResult.OK;
            } else {
                Constants.ctxBack .setBuffer(buffer);
                Constants.Backfail=true;
                return MessageDecoderResult.NEED_DATA;
            }
        }
        return MessageDecoderResult.OK;
    }

    @Override
    public void finishDecode(IoSession ioSession, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {

    }
/**
<pre name="code" class="java">public static boolean NotFill=false;//A数据没有收满
public static boolean Backfail=false;//B数据接收失败
public static boolean IsJump=false;/B谱数据接收失败h后让解码器跳转到A频谱
public static int positionValue=0;
public static boolean flag=false;
public static boolean Isstop=false;//判断是否有解码器

*/
<pre name="code" class="java">@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
    super.sessionIdle(session, status);
    final ReceiveWrong mReceiveWrong = new ReceiveWrong();
    final ReceiveRight mReceiveRight = new ReceiveRight();
    //A数据超时重传
    if (Constants.NotFill) {
        Constants.FPGAsession.write(mReceiveWrong);
        Constants.NotFill = false;
        Constants.ctx.reset();
        Constants.failCount++;
        Log.d("trans", "重传次数:" + Constants.failCount);
    }
//B数据传输失败
    if (Constants.Backfail) {
        Constants.FPGAsession.write(mReceiveWrong);
        Constants.Backfail = false;
        Constants.IsJump = true;
        Constants.ctxBack.reset();
    }
    if (Constants.Isstop) {
        Constants.FPGAsession.write(mReceiveWrong);
        Constants.Isstop = false;

    }
}
</pre><pre code_snippet_id="1691756" snippet_file_name="blog_20160521_13_6359721" name="code" class="java">在处理上述问题的时候,又相继出现了一个问题,那就是找不到解码器,导致IObuffer中出现了大量垃圾数据,解码器的不断循环,于是专门写了一个用来清除垃圾书据的解码器,
<pre name="code" class="java">public class ClearDecoder implements MessageDecoder {
    @Override
    public MessageDecoderResult decodable(IoSession ioSession, IoBuffer ioBuffer) {
        if(Constants.Isstop) {//<span style="font-family: Arial, Helvetica, sans-serif;">Isstop=false;//判断是否有解码器,该标志时在找不到解码器时置为true</span>

            ioBuffer.sweep();

           int n=ioBuffer.remaining();
            Constants.buffer.sweep();//buffer中的是错误帧,此时也找不到解码器
            Constants.flag=false;
            Log.d("abcd", "clear解码器清空数据");
        }
        return MessageDecoderResult.OK;
    }

    @Override
    public MessageDecoderResult decode(IoSession ioSession, IoBuffer ioBuffer, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {
        return null;
    }

    @Override
    public void finishDecode(IoSession ioSession, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {

    }
}

</pre><pre code_snippet_id="1691756" snippet_file_name="blog_20160521_16_3786210" name="code" class="java">
				
时间: 2024-08-13 07:33:42

mina解决粘包,找不到解码器,数据帧重传的问题的相关文章

解决粘包和拆包问题

解决粘包和拆包问题 上一篇我们介绍了如果使用Netty来开发一个简单的服务端和客户端,接下来我们来讨论如何使用解码器来解决TCP的粘包和拆包问题 我们知道,TCP是以一种流的方式来进行网络转播的,当tcp三次握手简历通信后,客户端服务端之间就建立了一种通讯管道,我们可以想象成自来水管道,流出来的水是连城一片的,是没有分界线的. TCP底层并不了解上层的业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分. 所以对于我们应用层而言.我们直观是发送一个个连续完整TCP数据包的,而在底层就可

用readn与written实现解决粘包问题

总结一些我们在平时使用vim编辑器的一些实用功能,后续会一直更新. 1.  visual插件 visual插件其实相当于一个书签作用,比如我们在一篇很长的源代码文件中,我们可以标记一行,然后后来我们再想回到这一行时,只需要一个快捷键就能迅速定位到这一行,非常方便,不用不停地往上或往下翻. 1.1  常用命令 1.  mm标记一个标签: 2.  F2回到被标记的那一行: 3.  连续按两次mm就可以取消标签: 4.  shift+F2可以在几个标签来回切换: 2.  emmet.vim插件 emm

NetworkComms c#通信框架与Java的Netty框架通信 解决粘包问题

上次写了一篇文章  基于networkcomms V3通信框架的c#服务器与java客户端进行通信之Protobuf探讨 其中没有解决粘包问题,抛砖引玉,文章得到了失足程序员 老师的点评,并给出了解决方案:[最多评论]java netty socket库和自定义C#socket库利用protobuf进行通信完整实例(10/591) » 于是马上开始学习,并把c#服务器端换成了我比较熟悉的networkcomms v3 c#通信框架(商业版,本文并不提供) ,以方便与已经存在的系统进行整合. 客户

10.python网络编程(解决粘包问题 part 2)

一.什么时候会产生粘包现象. 只有在使用tcp协议的情况下才会产生粘包现象!udp协议永远不会! 发送端可以1k1k的把数据发送出去,接收端,可以2k2k的的去接收数据,一次可能会接收3k,也有可能1次接收6k. TCP协议是面向流的协议,这也是容易出现粘包问题的原因.而UDP是面向消息的协议,每个UDP段都是一条消息,应用程序必须以消息为单位提取数据,不能一次提取任意字节的数据,这一点和TCP是很不同的.怎样定义消息呢?可以认为对方一次性write/send的数据为一个消息,需要明白的是当对方

Socket解决粘包问题1

粘包是指发送端发送的包速度过快,到接收端那边多包并成一个包的现象,比如发送端连续10次发送1个字符'a',因为发送的速度很快,接收端可能一次就收到了10个字符'aaaaaaaaaa',这就是接收端的粘包. 可能我们在平时练习时没觉的粘包有什么危害,或者通过把发送端发送的速率调慢来解决粘包,但在实时通信中,发送端常常是单片机或者其他系统的信息采集机,它们的发送速率是无法控制的,如果不解决接收端的粘包问题,我们无法获得正常的信息. 就以我自己正在做的项目来说,接收端是一台单频指标测量仪,它会把当前测

c# socket 解决粘包,半包

处理原理: 半包:即一条消息底层分几次发送,先有个头包读取整条消息的长度,当不满足长度时,将消息临时缓存起来,直到满足长度再解码 粘包:两条完整/不完整消息粘在一起,一般是解码完上一条消息,然后再判断是否有剩余字节,有的话缓存起来,循环半包处理 客户端接收代码: private void callReceived(object sender, SocketAsyncEventArgs args) { var socket = sender as Socket; var bb = args.Use

Python-socket发送文件并解决粘包问题

服务器端要先根据客户端要下载的文件进行判断是否存在,还要根据文件大小来进行传送,最后还要比对文件的md5值来判断传送的文件是否正确,通过判断剩余字节来解决粘包问题 服务器端 # -*- coding:utf-8 -*- __author__ = "MuT6 Sch01aR" import socket import os import hashlib server = socket.socket() server.bind(('127.0.0.1',8080)) server.list

粘包产生的原因 socket 基于tcp实现远程执行命令(解决粘包)low

# 粘包产生的原因 # 粘包问题主要还是因为接收方不知道消息之间的界限,不知道一次性提取多少字节的数据所造成的. # 基于tcp协议的套接字会有粘包现象,而基于udp协议的套接字不会产生粘包现象 # tcp是基于数据流的,于是收发的消息不能为空,这就需要在客户端和服务端都添加空消息的处理机制,防止程序卡住:而udp是基于数据报的,即使你输入的是空内容,那也不是空消息,udp协议会帮你封装上消息头(ip+端口的方式),这样就有了消息办界 # 两种情况下会发生粘包 # 1.发送端需要等缓冲区满才发送

峰哥解决粘包的方式

峰哥解决粘包的方法为字节流加上自定义固定长度报头,报头中包含字节流长度,然后一次send到对端,对端在接收时,先从缓存中取出定长的报头,然后再取真实数据 struct模块 该模块可以把一个类型,如数字,转成固定长度的bytes >> > struct.pack('i', 1111111111111) ......... struct.error: 'i'formatrequires - 2147483648 <= number <= 2147483647 # 这个是范围 im