高效的TCP数据拆包器

高效的TCP数据拆包器 接收器,每秒拆1KB的包达到30万以上

    /// 数据同步协义数据接收器
    /// </summary>
    /// <remarks>
    /// 主要功能有
    /// 1.将一个TCPSocket的所有数据全部接收
    /// 2.解析协义
    /// 3.解析完成后的协义调用 Handler通知外部处理
    /// 4.定义一个协义解析线程不停的解析协义
    /// </remarks>
    public class TCPReceiver : IDisposable
    {
        #region 构造函数
        /// <summary>
        /// 数据同步协义数据接收器 实例
        /// </summary>
        public TCPReceiver()
        {

        }

        /// <summary>
        /// 数据同步协义数据接收器 实例
        /// </summary>
        /// <param name="protocolhead">协议头</param>
        /// <param name="protocolfoot">协议尾</param>
        public TCPReceiver(byte[] protocolhead, byte[] protocolfoot = null)
        {
            //邦定包头,与包体
            PackageHead = protocolhead;
            PackageFoot = protocolfoot;
        }

        #endregion

        /// <summary>
        /// 最大单个协义体数据长度,默认10MB
        /// </summary>
        private int maxProtocolBinary = 1024 * 1024 * 10;
        /// <summary>
        /// 最大单个协义体数据长度
        /// </summary>
        public int MaxProtocolBinary
        {
            get { return maxProtocolBinary; }
            set { maxProtocolBinary = value; }
        }

        /// <summary>
        /// 是否正在运行
        /// </summary>
        public bool IsRuning { get; set; }

        private Task task = null;
        /// <summary>
        /// 当前处理解析协义的线程
        /// </summary>
        public Task PraseProtocolTask
        {
            get { return task; }
        }

        /// <summary>
        /// 接收数据处理事件
        /// </summary>
        public Action<byte[], Socket> ProtocolReceivedHandler
        {
            get;
            set;
        }

        /// <summary>
        /// 是从哪个节点接收的数据
        /// </summary>
        public Socket Handler
        {
            get;
            set;
        }

        #region 接收数据添加到队列
        /// <summary>
        /// 接收数据处理集合,默认开放1MB的空间
        /// </summary>
        // protected System.Collections.Generic.Queue<byte> byteQueue = new Queue<byte>(1024 * 1024);

        /// <summary>
        /// 默认开放500空间,100万次单纯添加用时95毫秒
        /// </summary>
        private Queue<byte[]> receiveByteArrayQueue = new Queue<byte[]>(500);
        /// <summary>
        /// 接入队列处理器
        /// </summary>
        protected Queue<byte[]> ReceiveByteArrayQueue
        {
            get { return receiveByteArrayQueue; }
        }

#if DEBUG
        //private int cuount = 1;
#endif

        /// <summary>
        /// 接收数据
        /// </summary>
        public void Receive(byte[] buff)
        {
#if DEBUG
            //严重影响性能,会变慢1117倍
            // Console.WriteLine(buff.ToHex());
            //Console.WriteLine(buff.ByteArray2HexString());
            // Console.WriteLine("-----"+cuount++);
#endif

            lock (receiveByteArrayQueue)
            {
                //添加对像数据
                receiveByteArrayQueue.Enqueue(buff);
            }
        }
        #endregion

        #region 线程控制

        /// <summary>
        /// 停止解析协义
        /// </summary>
        public void StopParseProtocol()
        {
            IsRuning = false;
            //throw new NotImplementedException("请编写代码,在线程停止后需要将缓存队列中的数据全部处理完成");
            //在线程停止后需要将缓存队列中的数据全部处理完成
            for (; receiveByteArrayQueue.Count > 0; )
            {
                //处理数据
                ProcessBytes();
            }
        }
        #endregion

        #region 解析协义数据
        /// <summary>
        /// 分包用包头
        /// </summary>
        private byte[] packageHead = new byte[] { 0x7e };//0x7e

        /// <summary>
        /// 分包用包头
        /// </summary>
        public byte[] PackageHead
        {
            get { return packageHead; }
            set
            {
                if (value != null)
                {
                    packageHead = value;
                }
            }
        }
        /// <summary>
        /// 分包用包尾
        /// </summary>
        private byte[] packageFoot = new byte[] { 0x7e };
        /// <summary>
        /// 分包用包尾
        /// </summary>
        public byte[] PackageFoot
        {
            get { return packageFoot; }
            set
            {
                if (value != null)
                {
                    packageFoot = value;

                }
            }
        }

        /// <summary>
        /// 用于处理数据协义的功能
        /// </summary>
        List<byte> bytes = new List<byte>();

        /// <summary>
        /// 默认开 3MB的数据接收缓冲区,如果超过3MB则数据会挂掉
        /// </summary>
        //private byte[] ByteBuff = null;

        /// <summary>
        /// 协义数据实体队列,已经进行拆包后的协义数据
        /// </summary>
        private Queue<byte[]> ProtocolEntityQueue = new Queue<byte[]>(500);

        /// <summary>
        /// 找到分包用包头
        /// </summary>
        bool FindPackageHead = false;
        /// <summary>
        /// 找包头的当着序号
        /// </summary>
        int findHeadindex = 0;
        /// <summary>
        /// 找包尾
        /// </summary>
        int findFootIndex = 0;

        /// <summary>
        /// 解析协义方法
        /// 之所以先全部放到一个query里是进行快速的接收
        ///
        /// </summary>
        public void PraseProtocol()
        {
            IsRuning = true;
            while (IsRuning)
            {
                ProcessBytes();
            }
        }
        /// <summary>
        /// 处理队列中的数据删除包头,包尾巴
        /// </summary>
        public void ProcessBytes()
        {
            byte[] arr = null;
            //开始解析数据
            //1.取出数据
            lock (receiveByteArrayQueue)
            {
                if (receiveByteArrayQueue.Count > 0)
                {
                    arr = receiveByteArrayQueue.Dequeue();
                }
            }
            if (arr != null)
            {
                //锁处理
                lock (bytes)
                {
                    //此协义数据中的协义数据索引
                    // List<int> ints = new List<int>();

                    //2.将数据进行包查找
                    //开始从队列中取数据
                    for (int k = 0; k < arr.Length; k++)
                    {
                        //队列有数据
                        byte b = arr[k];
                        //如果超过最大接收字节数
                        if (maxProtocolBinary <= bytes.Count)
                        {
                            bytes.Clear();
                        }
                        //添加到对像集合
                        bytes.Add(b);
                        //3.从集合的前面开始取数据.找包头,进行拆包
                        #region 找包头
                        //等于包数据
                        if (packageHead.Length > 0 && b == packageHead[findHeadindex] && !FindPackageHead)
                        {

                            //包头找完
                            if (findHeadindex == packageHead.Length - 1)
                            {

                                //ints.Add(k);
                                System.Threading.Interlocked.Exchange(ref findHeadindex, 0);
                                if (!FindPackageHead)
                                {
                                    FindPackageHead = true;
                                }
                                //这里取一个完整包
                                byte[] byteFarm = bytes.Take(bytes.Count - packageHead.Length).ToArray();
                                //如果是有效的数据
                                if (byteFarm.Length > packageHead.Length)
                                {
                                    lock (ProtocolEntityQueue)
                                    {
                                        ProtocolEntityQueue.Enqueue(byteFarm);
                                    }
                                    //开始从 bytes 中移除数据
                                    bytes.Clear();
                                    //添加包头
                                    bytes.AddRange(packageHead);
                                }
                                //包头找完则找下一字节
                                continue;
                            }
                            else
                            {
                                System.Threading.Interlocked.Increment(ref findHeadindex);
                            }
                        }
                        else
                        {
                            System.Threading.Interlocked.Exchange(ref findHeadindex, 0);
                            //findHeadindex = 0;
                            if (!FindPackageHead && packageHead.Length == 0)
                            {
                                FindPackageHead = true;
                            }
                        }
                        #endregion

                        #region 找包尾

                        if (packageFoot != null && packageFoot.Length > 0 && FindPackageHead)
                        {
                            if (b == packageFoot[findFootIndex])
                            {
                                //包尾找完
                                if (findFootIndex == packageFoot.Length - 1)
                                {
                                    //删除包尾字节,可能会包含包头字节
                                      //byte[] byteFarm = bytes.Take(bytes.Count - packageFoot.Length).ToArray();
                                    byte[] byteFarm = bytes.ToArray();
                                    //跳过包头字节,包尾字节
                                    //byte[] byteFarm = bytes.Skip(packageHead.Length).Take(bytes.Count - (packageFoot.Length + packageHead.Length)).ToArray();
                                    //如果是有效的数据
                                    if (byteFarm.Length >= packageFoot.Length)
                                    {
                                        lock (ProtocolEntityQueue)
                                        {
                                            ProtocolEntityQueue.Enqueue(byteFarm);
                                        }
                                        //开始从 bytes 中移除数据
                                        bytes.Clear();
                                    }
                                    FindPackageHead = false;
                                    //包尾找完则找下一字节
                                    continue;
                                }
                                else
                                {
                                    System.Threading.Interlocked.Increment(ref findFootIndex);
                                }
                            }
                            else
                            {
                                System.Threading.Interlocked.Exchange(ref findFootIndex, 0);
                                //findFootIndex = 0;

                            }
                        }

                        #endregion

                    }
                }
                //4.重新组成一个byte[] 进行数据解析
                lock (ProtocolEntityQueue)
                {
                    if (ProtocolEntityQueue.Count > 0)
                    {
                        //循环所有接收到的数据包
                        for (; ProtocolEntityQueue.Count > 0; )
                        {

                            //取取删除尾巴的的数据

                            //解析协议数据
                            byte[] bytearr = ProtocolEntityQueue.Dequeue();

                            //数据要大于分包的长度
                            if (bytearr.Length >= packageFoot.Length && bytearr.Length >= packageHead.Length)
                            {
                                ProtocolReceivedHandler.Invoke(bytearr, Handler);
                            }
                        }
                    }
                }
            }
            else
            {
                //停止运行
                IsRuning = false;
                //System.Threading.Thread.Sleep(5);
            }
        }

        #endregion

        /// <summary>
        /// 析构方法
        /// </summary>
        public void Dispose()
        {
            StopParseProtocol();
        }

    }

使用方法

TCPReceiver   rece = new  TCPReceiver();

//将接收到的数据加入处理

rece .Receive(buff);

另起一个线程进行处理

while(true)

{

rece .PraseProtocol();

}

高效的TCP数据拆包器,布布扣,bubuko.com

时间: 2024-10-25 20:58:44

高效的TCP数据拆包器的相关文章

[转载] tcp数据重传时间细节探秘及数据中心优化

原文: http://weibo.com/p/1001603821691477346388 在数据中心网络内,机器之间数据传输的往返时间(rtt)一般在10ms以内,为此调内部服务的超时时间一般会设置成50ms.200ms.500ms等,如果在传输过程中出现丢包,这样的服务超时时间,tcp层有机会发现并重传一次数据么?如果设置成200ms以内,答案是没有机会,原因是linux系统下第一次重传时间等于传输的往返时间上至少加上200ms的预测偏差值,即如果rtt值是7ms,第一次重传超时时间至少是2

Sweet Snippet系列 之 TCP数据接收

Sweet Snippet系列 之 TCP数据接收 一.引子 虽说仍然是Sweet Snippet,不过本篇并没有代码,纯粹是自己觉得有点趣味,就索性一记了~ 二. 问题 接触过网络编程的朋友大概都应知道TCP,作为一种"流"式协议,TCP的粘包问题一直都是程序处理的要点,而这次的问题就是,如果发送n字节的TCP数据,对端接收时会出现多少种接收情况? 三. 解法 我们先从具体的一个实例来简单算一算吧~就假设我们发送了3个字节的TCP数据: 由于TCP如果接收成功至少可以接收一个字节,所

修改tcp数据内容

http://blog.sina.com.cn/s/blog_6f0c85fb0100xi1x.html 2.6内核基于NetFilter处理框架修改TCP数据包实现访问控制 参考上面的钩子函数,结合下面的函数修改tcp的数据内容. __nf_nat_mangle_tcp_packet (十五)洞悉linux下的Netfilter&iptables:开发自己的hook函数[实战](上) http://blog.chinaunix.net/uid-23069658-id-3243434.html

Haproxy TCP数据转发

在实际项目中需要用到haproxy做TCP转发,下面主要针对haproxy的安装及TCP数据转发配置进行说明 一.安装Haproxy (1)编译安装Haproxy mkdir -p /data01/haproxy tar -zxvf haproxy-1.7.1.tar.gz cd haproxy-1.7.1 make TARGET=linux26 ARCH=x86_64 PREFIX=/data01/haproxy make install PREFIX=/data01/haproxy mkdi

监控Netstat中的TCP数据

通过netstat命令,我们能获取TCP数据,监控它们有助于了解系统状态. 如果netstat版本比较老的话,那么运行时可能会遇到类似下面的错误信息: error parsing /proc/net/netstat: Success 假设操作系统是CentOS,首先让我们看看如何确认netstat隶属于哪个软件包: shell> rpm -qf $(which netstat) net-tools-<VERSION> 如上所示,可以得知netstat属于net-tools软件包,接着升级

最好用的兼容多种数据库通用高效的大数据分页功能

通用权限管理系统底层有一个通用分页查询功能,该功能可实现多种数据库的查询,支持多表关联分页查询,目前是最完善的分页功能实现. 下面代码是使用的方法截图: /////////////////////////////// 后台代码截图1 /////////////////////////////// 后台代码截图2 /////////////////////////////// 后台代码截图3 /////////////////////////////// 后台代码截图4 /////////////

三分钟解读零基础如何高效学习大数据?

在我们的生活中,你用微信的时候,你用高德地图的时候,你用电脑的时候,你用某宝网购的时候......无时无刻不在制造数据,而这些数据在"有心人"的利用下,将会给我们的生活带来巨大变化.如今90%的企业都在运用或者都想要利用大数据为其带来更便利的服务,从而大数据高端软件类人才可谓供不应求. 数据分析师已成为当下中国互联网行业需求最高的六类人才职位之一.报告表明数据分析人才供给指数仅为5%,属于高度稀缺.此外,数据分析人才的跳槽速度也最快,平均跳槽速度为19.8个月.根据中国商业联合会数据分

网络TCp数据的传输设计(黏包处理)

//1.该片为引用别人的文章:http://www.cnblogs.com/alon/archive/2009/04/16/1437599.html 解决TCP网络传输"粘包"问题 解决TCP网络传输"粘包"问题 作者:杨小平 王胜开 原文出处:http://www.ciw.com.cn/ 当前在网络传输应用中,广泛采用的是TCP/IP通信协议及其标准的socket应用开发编程接口(API).TCP/IP传输层有两个并列的协议:TCP和UDP.其中TCP(trans

饿了么开源项目:便捷高效的Android数据持久化存储框架

版权所有.所有权利保留. 欢迎转载,转载时请注明出处: http://blog.csdn.net/xiaofei_it/article/details/51436972 Android应用开发时经常要对许多数据进行持久化存储,便于以后访问. 对于int.double.boolean这些基本数据,可以使用SharedPreference.对于一些对象,往SharedPreference里存储的时候需要使用序列化技术.如果对象很大,或者碰到列表.数组等结构,就必须使用数据库.而使用数据库比较麻烦,成