c#基于事件模型的UDP通讯框架(适用于网络包编解码)

之前写过一篇关于c#udp分包发送的文章

这篇文章里面介绍的方法是一种实现,可是存在一个缺点就是一个对象序列化后会增大非常多。不利于在网络中的传输。

我们在网络中的传输是须要尽可能的减小传送的数据包的大小。于是我參考了网上一些资料和一些开源的项目(http://www.fishlee.net/)这个上面的那个开源的飞鸽传输的框架。

事实上也就是把要传送的数据依照某种规定放在一个byte数组中,然后接收到后依照对应的格式把数据解析出来,为了减小数据还使用了GZipStream的压缩,之前出的问题就是在解压缩时,只是如今已经攻克了。

首先我们要定义一种能表示我们传送数据的包的格式

public class PacketNetWorkMsg : IComparable<PacketNetWorkMsg>
    {
        /// <summary>
		/// 封包版本号
		/// </summary>
		public int Version { get; set; }

		/// <summary>
		/// 要发送的数据包
		/// </summary>
		public byte[] Data { get; set; }

        /// <summary>
        /// 数据包所含数据长度
        /// </summary>
        public int DataLength { get; set; }

        /// <summary>
        /// 分包后最后一个剩余长度
        /// </summary>
        public int Remainder { get; set; }
		/// <summary>
		/// 远程地址
		/// </summary>
		public IPEndPoint RemoteIP { get; set; }

		/// <summary>
		/// 发送次数
		/// </summary>
		public int SendTimes { get; set; }

		/// <summary>
		/// 包编号
		/// </summary>
		public long PackageNo { get; set; }

		/// <summary>
		/// 分包索引
		/// </summary>
		public int PackageIndex { get; set; }

		/// <summary>
		/// 分包总数
		/// </summary>
		public int PackageCount { get; set; }

		/// <summary>
		/// 获得或设置是否须要返回已收到标志
		/// </summary>
        public bool IsRequireReceiveCheck { get; set; }

        public PacketNetWorkMsg()
		{
			Version = 1;
            CreationTime = DateTime.Now;
		}
        public PacketNetWorkMsg(long packageNo, int Count, int index, byte[] data, int dataLength, int remainder, IPEndPoint desip, bool IsRequireReceive)
        {
            this.PackageNo = packageNo;
            this.PackageCount = Count;
            this.PackageIndex = index;
            this.Data = data;
            this.DataLength = dataLength;
            this.Remainder = remainder;
            this.IsRequireReceiveCheck = IsRequireReceive;//默认都须要确认包
            this.RemoteIP = desip;
        }
		#region IComparable<PackedNetworkMessage> 成员

        public int CompareTo(PacketNetWorkMsg other)
		{
			return PackageIndex < other.PackageIndex ? -1 : 1;
		}

		#endregion

        /// <summary>
        /// 获得生成数据包的时间
        /// </summary>
        public DateTime CreationTime { get; private set; }
    }

这个类是我们在网络中须要传输的详细最小的包

然后另一个就是我们用来表示我们传输的数据的格式类Msg类。一个Msg类可能被分为多个PacketNetWorkMsg 来传送。分包的方法是为了突破udp方式数据传输的限制(64k),,可以使用udp传输大数据。非常多人就在问,为什么不用TCP,尽管TCP是非常好的方式。可是我也不想说出个所以然来,我就喜欢这么干。

public class Msg
    {

        /// <summary>
        /// 是否已经被处理.在挂钩过程中,假设为true,则底层代码不会再对信息进行处理
        /// </summary>
        public bool Handled { get; set; }

        /// <summary>
        /// 获得或设置当前的消息编号
        /// </summary>
        /// <value></value>
        /// <remarks></remarks>
        public long PackageNo { get; set; }

        /// <summary>
        /// 获得或设置当前的消息所属的主机名
        /// </summary>
        public string HostName { get; set; }

        /// <summary>
        /// 获得或设置当前的消息所属的username
        /// </summary>
        public string UserName { get; set; }

        /// <summary>
        /// 获得或设置当前的命令代码
        /// </summary>
        //命令的名称
        public Commands Command { get; set; }

        /// <summary>
        /// 获得或设置当前的消息的类型 文本消息。或者二进制消息
        /// </summary>
        public Consts Type { get; set; }

        /// <summary>
        /// 获得或设置当前的命令消息文本
        /// </summary>
        public string NormalMsg { get; set; }

        /// <summary>
        /// 消息文本字节
        /// </summary>
        public byte[] NormalMsgBytes { get; set; }

        /// <summary>
        /// 扩展消息文本字节
        /// </summary>
        public byte[] ExtendMessageBytes { get; set; }

        /// <summary>
        /// 获得或设置当前命令的扩展文本
        /// </summary>
        public string ExtendMessage { get; set; }

        /// <summary>
        /// 远程地址
        /// </summary>
        public IPEndPoint RemoteAddr { get; set; }

        /// <summary>
        /// 主机地址
        /// </summary>
        public IPEndPoint HostAddr { get; set; }
        /// <summary>
        /// 获得或设置是否须要返回已收到标志
        /// </summary>
        public bool IsRequireReceive { get; set; }

        public Msg(IPEndPoint Addr)
		{
			RemoteAddr = Addr;
			Handled = false;
            Type = Consts.MESSAGE_TEXT;
		}
        public Msg(IPEndPoint hostIP,IPEndPoint remoteIP,Commands cmd)
        {
            HostAddr = hostIP;
            RemoteAddr = remoteIP;
            Command = cmd;
            Handled = false;
            Type = Consts.MESSAGE_TEXT;
        }
		public Msg(IPEndPoint addr, string hostName, string userName,Commands command, string message, string extendMessage)
		{
			RemoteAddr = addr;
			Handled = false;
			HostName = hostName;
			UserName = userName;
			Command = command;
			NormalMsg = message;
			ExtendMessage = extendMessage;
            Type = Consts.MESSAGE_TEXT;
		}

		/// <summary>
		/// 直接创建一个新的Message对象
		/// </summary>
		/// <param name="host">主机对象</param>
		/// <param name="addr">远程地址</param>
		/// <param name="hostName">主机名</param>
		/// <param name="userName">username</param>
		/// <param name="command">命令</param>
		/// <param name="options">选项</param>
		/// <param name="message">信息</param>
		/// <param name="extendMessage">扩展信息</param>
		/// <returns></returns>
		public static Msg Create(Host host, IPEndPoint addr, string hostName, string userName, Commands command, string message, string extendMessage)
		{
			return new Msg(addr,hostName, userName, command,message, extendMessage);
		}
    }

眼下这两个类就是我们基本的数据结构了。

然后接下来是我们基本的类,分包和组包的一个类了

/// <summary>
    /// 消息封包类
    /// </summary>
    public class MessagePacker
    {
        Timer _timer;
        public MessagePacker()
		{
			_timer = new Timer(_ => CheckForOutdateMessage(), null, new TimeSpan(0, 5, 0), new TimeSpan(0, 0, 5, 0));
		}
        /*
		 * 消息包注意:
		 * 1.第一位始终是2(ASCII码50)
		 * 2.第二位到第九位是一个long类型的整数,代表消息编号
		 * 3.第十位到第十三位是一个int类型的整数,代表消息内容总长度
		 * 4.第十四位到第十七位是一个int类型的整数。代表分包的总数
		 * 5.第十八位到第二十一位是一个int类型的整数。代表当前的分包编号
		 * 6.第二十二位表示是否须要返回一个确认标识(1/0)
		 * 7.第二十三到第三十一位是保留的(Reserved)
		 * 8.第三十二字节以后是数据包
		 * */

        /// <summary>
        /// 消息版本
        /// </summary>
        public static byte VersionHeader { get { return 50; } }
        /// <summary>
        /// 返回当前消息封包的头字节数
        /// </summary>
        public static int PackageHeaderLength { get { return 32; } }

        /// <summary>
        /// 获得消息包的字节流
        /// </summary>
        /// <param name="message">要打包的消息对象</param>
        /// <returns></returns>
        public static PacketNetWorkMsg[] BuildNetworkMessage(Msg message)
        {
            if (message.ExtendMessageBytes != null)
            {
                return BuildNetworkMessage(
                message.RemoteAddr,
                message.PackageNo,
                message.Command,
                message.UserName,
                message.HostName,
                message.Type,
                message.NormalMsgBytes,
                message.ExtendMessageBytes,
                message.IsRequireReceive
                );
            }
            else
            {
                return BuildNetworkMessage(
                message.RemoteAddr,
                message.PackageNo,
                message.Command,
                message.UserName,
                message.HostName,
                message.Type,
                System.Text.Encoding.Unicode.GetBytes(message.NormalMsg),
                System.Text.Encoding.Unicode.GetBytes(message.ExtendMessage),
                message.IsRequireReceive
                );
            }
        }

        /// <summary>
        /// 获得消息包的字节流
        /// </summary>
        /// <param name="remoteIp">远程主机地址</param>
        /// <param name="packageNo">包编号</param>
        /// <param name="command">命令</param>
        /// <param name="options">參数</param>
        /// <param name="userName">username</param>
        /// <param name="hostName">主机名</param>
        /// <param name="content">正文消息</param>
        /// <param name="extendContents">扩展消息</param>
        /// <returns></returns>
        public static PacketNetWorkMsg[] BuildNetworkMessage(IPEndPoint remoteIp, long packageNo, Commands command, string userName, string hostName,Consts type ,byte[] content, byte[] extendContents, bool RequireReceiveCheck)
        {

            //每次发送所能容下的数据量
            int maxBytesPerPackage = (int)Consts.MAX_UDP_PACKAGE_LENGTH - PackageHeaderLength;
            //压缩数据流
            var ms = new MemoryStream();
            //var dest = new MemoryStream();
            //var zip = new GZipStream(dest, CompressionMode.Compress);
            var bw = new BinaryWriter(ms, System.Text.Encoding.Unicode);
            //写入头部数据
            bw.Write(packageNo);			//包编号
            bw.Write(userName);				//username
            bw.Write(hostName);				//主机名
            bw.Write((long)command);        //命令
            bw.Write((long)type);           //数据类型
            bw.Write(content == null ?

0 : content.Length);//数据长度

            //写入消息数据
            if (content != null)
                bw.Write(content);
            bw.Write(extendContents == null ? 0 : extendContents.Length);//补充数据长度
            if (extendContents != null)
                bw.Write(extendContents);

            ms.Flush();
            ms.Seek(0, System.IO.SeekOrigin.Begin);
            byte[] ibuf = ms.ToArray();

            var dest = new System.IO.MemoryStream();
            GZipStream zipStream = new GZipStream(dest, CompressionMode.Compress, true);
            byte[] buff = new byte[1024];
            int offset;
            ms.Seek(0, SeekOrigin.Begin);
            while ((offset = ms.Read(buff, 0, buff.Length)) > 0)
            {
                zipStream.Write(buff, 0, offset);//先把数据用二进制写入内存,然后在把它用zip压缩,获取压缩过后的二进制流dest
            }
            zipStream.Close();
            bw.Close();
            ms.Close();
            dest.Seek(0, SeekOrigin.Begin);
            //打包数据总量
            int dataLength = (int)dest.Length;

            int packageCount = (int)Math.Ceiling(dataLength * 1.0 / maxBytesPerPackage);
            PacketNetWorkMsg[] pnma = new PacketNetWorkMsg[packageCount];
            for (int i = 0; i < packageCount; i++)
            {
                int count = i == packageCount - 1 ? dataLength - maxBytesPerPackage * (packageCount - 1) : maxBytesPerPackage;

                byte[] buf = new byte[count + PackageHeaderLength];
                buf[0] = VersionHeader;//版本 第1位
                BitConverter.GetBytes(packageNo).CopyTo(buf, 1);//消息编号 第2到9位 long类型的整数
                BitConverter.GetBytes(dataLength).CopyTo(buf, 9);//消息内容长度 第10到13位 int类型的整数
                BitConverter.GetBytes(packageCount).CopyTo(buf, 13);//分包总数 第14位到第17位 int类型的整数
                BitConverter.GetBytes(i).CopyTo(buf, 17);//分包编号 第18位到第21位 int类型的整数
                buf[21] = RequireReceiveCheck ? (byte)1 : (byte)0;//是否回确认包 第22位
                //第23到第31位是保留的(Reserved)
                dest.Read(buf, 32, buf.Length - 32);//第32字节以后是,详细的数据包

                pnma[i] = new PacketNetWorkMsg()
                {
                    Data = buf,
                    PackageCount = packageCount,
                    PackageIndex = i,
                    PackageNo = packageNo,
                    RemoteIP = remoteIp,
                    SendTimes = 0,
                    Version = 2,
                    IsRequireReceiveCheck = buf[21] == 1
                };
            }

            return pnma;
        }

        /// <summary>
        /// 检測确认是否是这个类型的消息包
        /// </summary>
        /// <param name="buffer"></param>
        /// <returns></returns>
        public static bool Test(byte[] buffer)
        {
            return buffer != null && buffer.Length > PackageHeaderLength && buffer[0] == VersionHeader;
        }

        /// <summary>
        /// 缓存接收到的片段
        /// </summary>
        static Dictionary<long, PacketNetWorkMsg[]> packageCache = new Dictionary<long, PacketNetWorkMsg[]>();

        /// <summary>
        /// 分析网络数据包并进行转换为信息对象
        /// </summary>
        /// <param name="packs">接收到的封包对象</param>
        /// <returns></returns>
        /// <remarks>
        /// 对于分包消息,假设收到的仅仅是片段而且尚未接收全然。则不会进行解析
        /// </remarks>
        public static Msg ParseToMessage(params PacketNetWorkMsg[] packs)
        {
            if (packs.Length == 0 || (packs[0].PackageCount > 1 && packs.Length != packs[0].PackageCount))
                return null;

            var ms = DecompressMessagePacks(packs);
            if (ms == null)
            {
                //事件
                return null;
            }
           //构造读取流
            System.IO.BinaryReader br = new System.IO.BinaryReader(ms, System.Text.Encoding.Unicode);
            //開始读出数据
            Msg m = new Msg(packs[0].RemoteIP);
            m.PackageNo = br.ReadInt64();//包编号

            m.UserName = br.ReadString();//username
            m.HostName = br.ReadString();//主机名
            m.Command = (Commands)br.ReadInt64(); //命令
            m.Type = (Consts)br.ReadInt64();//数据类型
            int length = br.ReadInt32(); //数据长度
            m.NormalMsgBytes = new byte[length];
            br.Read(m.NormalMsgBytes, 0, length);//读取内容

            length = br.ReadInt32();    //附加数据长度
            m.ExtendMessageBytes = new byte[length];
            br.Read(m.ExtendMessageBytes, 0, length);//读取附加数据

            if (m.Type == Consts.MESSAGE_TEXT)
            {
                m.NormalMsg = System.Text.Encoding.Unicode.GetString(m.NormalMsgBytes, 0, length);	//正文
                m.ExtendMessage = System.Text.Encoding.Unicode.GetString(m.ExtendMessageBytes, 0, length);	//扩展消息
                m.ExtendMessageBytes = null;
                m.NormalMsgBytes = null;

            }
            return m;
        }
        /// <summary>
        /// 组合全部的网络数据包并运行解压缩
        /// </summary>
        /// <param name="packs"></param>
        /// <returns></returns>
        static MemoryStream DecompressMessagePacks(params PacketNetWorkMsg[] packs)
        {
            try
            {
                //尝试解压缩,先排序
                Array.Sort(packs);
                var msout = new MemoryStream();
                using (var ms = new System.IO.MemoryStream())
                {
                    //合并写入
                    //Array.ForEach(packs, s => ms.Write(s.Data, 32, s.Data.Length-32));
                    Array.ForEach(packs, s => ms.Write(s.Data, 0, s.Data.Length));
                    ms.Seek(0, SeekOrigin.Begin);

                    //解压缩
                    using (var gz = new GZipStream(ms, CompressionMode.Decompress))
                    {
                        var buffer = new byte[0x400];
                        var count = 0;
                        while ((count = gz.Read(buffer, 0, buffer.Length)) > 0)
                        {
                            msout.Write(buffer, 0, count);
                        }
                    }
                }
                msout.Seek(0, SeekOrigin.Begin);

                return msout;
            }
            catch (Exception)
            {

                return null;
            }
        }
        /// <summary>
        /// 尝试将收到的网络包解析为实体
        /// </summary>
        /// <param name="pack">收到的网络包</param>
        /// <returns></returns>
        /// <remarks>假设收到的包是分片包。且其全部子包尚未接受全然。则会返回空值</remarks>
        public static Msg TryToTranslateMessage(PacketNetWorkMsg pack)
        {
            if (pack == null || pack.PackageIndex > pack.PackageCount - 1) return null;
            else if (pack.PackageCount == 1) return ParseToMessage(pack);
            else
            {
                lock (packageCache)
                {
                    if (packageCache.ContainsKey(pack.PackageNo))
                    {
                        PacketNetWorkMsg[] array = packageCache[pack.PackageNo];
                        array[pack.PackageIndex] = pack;

                        //检測是否完整
                        if (Array.FindIndex(array, s => s == null) == -1)
                        {
                            packageCache.Remove(pack.PackageNo);
                            return ParseToMessage(array);
                        }
                        else
                        {
                            return null;
                        }
                    }
                    else
                    {
                        PacketNetWorkMsg[] array = new PacketNetWorkMsg[pack.PackageCount];
                        array[pack.PackageIndex] = pack;
                        packageCache.Add(pack.PackageNo, array);
                        return null;
                    }
                }
            }

        }

        /// <summary>
        /// 将网络信息解析为封包
        /// </summary>
        /// <param name="buffer"></param>
        /// <returns></returns>
        public static PacketNetWorkMsg Parse(byte[] buffer, IPEndPoint clientAddress)
        {
            if (!Test(buffer)) return null;

            PacketNetWorkMsg p = new PacketNetWorkMsg()
            {
                RemoteIP = clientAddress,
                SendTimes = 0
            };
            p.PackageNo = BitConverter.ToInt64(buffer, 1);//包编号
            p.DataLength = (int)BitConverter.ToInt64(buffer, 9); //内容长度
            p.PackageCount = BitConverter.ToInt32(buffer, 13);//分包总数
            p.PackageIndex = BitConverter.ToInt32(buffer, 17);//索引
            p.IsRequireReceiveCheck = buffer[21] == 1;//是否须要回包
            p.Data = new byte[buffer.Length - PackageHeaderLength];
            Array.Copy(buffer, PackageHeaderLength, p.Data, 0, p.Data.Length);

            return p;
        }
        void CheckForOutdateMessage()
        {

            lock (packageCache)
            {
                //TODO 这里设置最短的过期时间为5分钟,也就是说五分钟之前的消息会被干掉
                var minTime = DateTime.Now.AddMinutes(5.0);
                var targetList = new List<long>();
                foreach (var pkgid in packageCache.Keys)
                {
                    if (Array.TrueForAll(packageCache[pkgid], s => s == null || s.CreationTime < minTime))
                    {
                        targetList.Add(pkgid);
                    }
                }

                foreach (var pkgid in targetList)
                {
                    packageCache.Remove(pkgid);
                }
            }

        }
        #region 事件
        /// <summary>
        /// 网络层数据包解压缩失败
        /// </summary>
        public static event EventHandler<PackageEventArgs> DecompressFailed;

        /// <summary>
        /// 触发解压缩失败事件
        /// </summary>
        /// <param name="e">事件包括的參数</param>
        protected static void OnDecompressFailed(PackageEventArgs e)
        {
            if (DecompressFailed != null) DecompressFailed(typeof(MessagePacker),e);
        }
        #endregion
    }

通过BuildNetworkMessage方法能够把一个Msg对象分为1个或多个packet。然后的话就能够通过曾经所用的方法把分好的包挨个发送了。

收到数据包后用TryToTranslateMessage方法把数据包组装成为一个Msg

接下来是一个 UDP底层通信的类

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net.Sockets;
using Netframe.Model;
using System.Net;
using System.Threading;
using Netframe.Tool;
using Netframe.Event;

namespace Netframe.Core
{
    /// <summary>
    /// 基本通信类  UDP,可以进行基本数据发送,UdpPacketMsg的发送,数据收到时触发事件
    /// </summary>
    public class UDPThread
    {
        #region 私有变量

        /// <summary>
        /// 配置信息
        /// </summary>
        Config _config;

        /// <summary>
        /// UDP客户端
        /// </summary>
        UdpClient client;

        /// <summary>
        /// 用于轮询是否发送成功的记录
        /// </summary>
        List<PacketNetWorkMsg> SendList;

        #endregion

        #region 属性

        /// <summary>
        /// 是否已经初始化了
        /// </summary>
        public bool IsInitialized { get; private set; }

        /// <summary>
        /// 是否建立连接
        /// </summary>
        public bool IsConnect { get; private set; }
        /// <summary>
        /// 检查发送队列间隔
        /// </summary>
        public int CheckQueueTimeInterval { get; set; }

        /// <summary>
        /// 没有收到确认包时,最大又一次发送的数目,超过此数目会丢弃并触发PackageSendFailture 事件。
        /// </summary>
        public int MaxResendTimes { get; set; }

        #endregion

        #region 构造函数

        /// <summary>
        /// 构造一个新的消息对象。并绑定到指定的端口和IP上。
        /// </summary>
        /// <param name="ip">绑定的IP</param>
        /// <param name="port">绑定的端口</param>
        public UDPThread(int port)
        {
            IsInitialized = false;
            IPAddress LocalIPAddress = null;
            //获得本机当前的ip
            try
            {
                IPAddress[] address = Dns.GetHostAddresses(Dns.GetHostName());
                foreach (IPAddress addr in address)
                {
                    if (addr.AddressFamily.ToString().Equals("InterNetwork"))
                    {
                        LocalIPAddress = addr;
                        break;
                    }
                }
            }
            catch (Exception)
            {
                OnLocalIpError(new EventArgs());
                //获取本机ip异常
                return;
            }
            try
            {
                client = new UdpClient(new IPEndPoint(LocalIPAddress, port));
                IsConnect = false;
            }
            catch (Exception)
            {
                OnNetworkError(new EventArgs());
                return;
            }
            SendList = new List<PacketNetWorkMsg>();
            client.EnableBroadcast = true;

            CheckQueueTimeInterval = 2000;
            MaxResendTimes = 5;
            new Thread(new ThreadStart(CheckUnConfirmedQueue)) { IsBackground = true }.Start();

            IsInitialized = true;

            //開始监听
            client.BeginReceive(ReceiveDataAsync, null);
            //ReceiveData();
        }

        public UDPThread(Config config)
        {
            IsInitialized = false;
            try
			{
                client = new UdpClient(new IPEndPoint(config.BindedIP, config.Port));
			}
            catch (Exception)
            {
                OnNetworkError(new EventArgs());
                return;
            }
            SendList = new List<PacketNetWorkMsg>();
            client.EnableBroadcast = true;
            this._config = config;
            CheckQueueTimeInterval = 2000;
            MaxResendTimes = 5;
            new Thread(new ThreadStart(CheckUnConfirmedQueue)) { IsBackground = true }.Start();

            IsInitialized = true;

            //開始监听
            client.BeginReceive(ReceiveDataAsync, null);
        }
        /// <summary>
        /// 构造函数与远程主机连接
        /// </summary>
        /// <param name="ipaddress">绑定ip</param>
        /// <param name="port">端口</param>
        public UDPThread(string ip, int port)
        {
            IsInitialized = false;
            IPAddress ipaddress = IPAddress.Parse(ip);//构造远程连接的參数
            try
            {
                client = new UdpClient();
                client.Connect(new IPEndPoint(ipaddress, port));//与远程server建立连接ps:仅仅是形式上,udp本身无连接的
                IsConnect = true;
            }
            catch (Exception)
            {
                OnNetworkError(new EventArgs());
                return;
            }
            SendList = new List<PacketNetWorkMsg>();
            client.EnableBroadcast = true;

            CheckQueueTimeInterval = 2000;
            MaxResendTimes = 5;
            new Thread(new ThreadStart(CheckUnConfirmedQueue)) { IsBackground = true }.Start();

            IsInitialized = true;

            //開始监听
            client.BeginReceive(ReceiveDataAsync, null);
            //ReceiveData();
        }
        #endregion

        #region 私有方法
        /// <summary>
        /// 接收数据的方法
        /// </summary>
        /// <param name="ar"></param>
        void ReceiveDataAsync(IAsyncResult ar)
        {
            IPEndPoint ipend = null;
            byte[] buffer = null;
            try
            {
                buffer = client.EndReceive(ar, ref ipend);
            }
            catch (Exception)
            {
                return;
            }
            finally
            {
                if (IsInitialized && client != null)
                    client.BeginReceive(ReceiveDataAsync, null);
            }

            if (buffer == null || buffer.Length == 0) return;
            //触发已收到事件
            OnPackageReceived(new PackageEventArgs() { RemoteIP = ipend, Data = buffer });

        }
        /// <summary>
        /// 同步数据接收方法
        /// </summary>
        private void ReceiveData()
        {
            while (true)
            {
                IPEndPoint retip = null;
                byte[] buffer = null;
                try
                {
                    buffer = client.Receive(ref retip);//接收数据,当Client端连接主机的时候,retip就变成Cilent端的IP了
                }
                catch (Exception)
                {
                    //异常处理操作
                    return;
                }
                if (buffer == null || buffer.Length == 0) return;
                PackageEventArgs arg = new PackageEventArgs(buffer, retip);
                OnPackageReceived(arg);//数据包收到触发事件
            }
        }

        /// <summary>
        /// 异步接受数据
        /// </summary>
        private void AsyncReceiveData()
        {
            try
            {
                client.BeginReceive(new AsyncCallback(ReceiveCallback), null);
            }
            catch (SocketException ex)
            {
                throw ex;
            }
        }
        /// <summary>
        /// 接收数据的回调函数
        /// </summary>
        /// <param name="param"></param>
        private void ReceiveCallback(IAsyncResult param)
        {
            if (param.IsCompleted)
            {
                IPEndPoint retip = null;
                byte[] buffer = null;
                try
                {
                    buffer = client.EndReceive(param, ref retip);//接收数据,当Client端连接主机的时候,test就变成Cilent端的IP了
                }
                catch (Exception ex)
                {
                    //异常处理操作
                }
                finally
                {
                    AsyncReceiveData();
                }
                if (buffer == null || buffer.Length == 0) return;
                OnPackageReceived(new PackageEventArgs() { RemoteIP = retip, Data = buffer });
            }
        }

        #endregion

        #region 公共函数

        /// <summary>
        /// 关闭客户端
        /// </summary>
        public void Close()
        {
            if (IsInitialized)
            {
                IsInitialized = false;
                if (IsInitialized)
                    client.Close();
                IsConnect = false;
                client = null;
            }
        }

        /// <summary>
        /// 发送数据,不进行检查
        /// </summary>
        /// <param name="address">远程主机地址</param>
        /// <param name="port">远程主机端口</param>
        /// <param name="data">数据流</param>
        /// <param name="packageNo">数据包编号</param>
        /// <param name="packageIndex">分包索引</param>
        private void Send(IPAddress address, int port, byte[] data, long packageNo, int packageIndex)
        {
            Send(false, new IPEndPoint(address, port), data, packageNo, packageIndex);
        }
        /// <summary>
        /// 发送数据,并推断是否对数据作回应检查。

将会在每隔 <see cref="CheckQueueTimeInterval"/> 的间隔后又一次发送。直到收到对方的回应。

/// 注意:网络层不会解析回应。请调用 <see cref="PopSendItemFromList"/> 方法来告知已收到数据包。

/// </summary>
        /// <param name="receiveConfirm">消息是否会回发确认包</param>
        /// <param name="address">远程主机地址</param>
        /// <param name="port">远程主机端口</param>
        /// <param name="data">数据流</param>
        /// <param name="packageNo">数据包编号</param>
        /// <param name="packageIndex">分包索引</param>
        private void Send(bool receiveConfirm, IPAddress address, int port, byte[] data, long packageNo, int packageIndex)
        {
            Send(receiveConfirm, new IPEndPoint(address, port), data, packageNo, packageIndex);
        }

        /// <summary>
        /// 发送数据,并对数据作回应检查。当 <see cref="receiveConfirm"/> 为 true 时,将会在每隔 <see cref="CheckQueueTimeInterval"></see> 的间隔后又一次发送,直到收到对方的回应。
        /// 注意:网络层不会解析回应,请调用 <see cref="PopSendItemFromList"></see> 方法来告知已收到数据包。
        /// </summary>
        /// <param name="receiveConfirm">消息是否会回发确认包</param>
        /// <param name="address">远程主机地址</param>
        /// <param name="data">数据流</param>
        /// <param name="packageNo">数据包编号</param>
        /// <param name="packageIndex">分包索引</param>
        private void Send(bool receiveConfirm, IPEndPoint address, byte[] data, long packageNo, int packageIndex)
        {
            if (IsInitialized)
            {
                client.Send(data, data.Length, address);
                if (receiveConfirm)
                    PushSendItemToList(new PacketNetWorkMsg() { Data = data, RemoteIP = address, SendTimes = 0, PackageIndex = packageIndex, PackageNo = packageNo });
            }
        }

        /// <summary>
        /// 同步发送分包数据
        /// </summary>
        /// <param name="message"></param>
        public void SendMsg(Msg message)
        {
            if (IsInitialized)
            {
                ICollection<PacketNetWorkMsg> udpPackets = MessagePacker.BuildNetworkMessage(message);
                foreach (PacketNetWorkMsg packedMessage in udpPackets)
                {
                    //使用同步发送
                    SendPacket(packedMessage);
                }
            }
        }
        /// <summary>
        /// 将已经打包的消息发送出去
        /// </summary>
        /// <param name="packet"></param>
        public void SendPacket(PacketNetWorkMsg packet)
        {
            if (IsInitialized)
            {
                //使用同步的方法发送数据
                if (!IsConnect)
                    client.Send(packet.Data, packet.Data.Length, packet.RemoteIP);
                else
                    client.Send(packet.Data, packet.Data.Length);
                if (packet.IsRequireReceiveCheck)
                    PushSendItemToList(packet);
            }
        }
        /// <summary>
        /// 异步分包发送数组的方法
        /// </summary>
        /// <param name="message"></param>
        public void AsyncSendMsg(Msg message)
        {
            if (IsInitialized)
            {
                ICollection<PacketNetWorkMsg> udpPackets = MessagePacker.BuildNetworkMessage(message);
                foreach (PacketNetWorkMsg packedMessage in udpPackets)
                {
                    //使用异步的方法发送数据
                    AsyncSendPacket(packedMessage);
                }
            }

        }
        /// <summary>
        /// 发送完毕后的回调方法
        /// </summary>
        /// <param name="param"></param>
        private void SendCallback(IAsyncResult param)
        {
            if (param.IsCompleted)
            {
                try
                {
                    client.EndSend(param);//这句话必须得写。BeginSend()和EndSend()是成对出现的
                }
                catch (Exception)
                {
                    PackageEventArgs e = new PackageEventArgs();
                    OnPackageSendFailure(e);//触发发送失败事件
                }
            }

        }
        /// <summary>
        /// 异步将将已经打包的消息发送出去,不进行发送检查
        /// </summary>
        /// <param name="packet"></param>
        public void AsyncSendPacket(PacketNetWorkMsg packet)
        {
            //使用异步的方法发送数据
            if (IsInitialized)
            {
                if (!IsConnect)
                    this.client.BeginSend(packet.Data, packet.Data.Length, packet.RemoteIP, new AsyncCallback(SendCallback), null);
                else
                    this.client.BeginSend(packet.Data, packet.Data.Length, new AsyncCallback(SendCallback), null);
                if (packet.IsRequireReceiveCheck)
                    PushSendItemToList(packet);//将该消息压入列表
            }
        }

        #endregion
        System.Threading.SendOrPostCallback cucqCallpack;
        System.Threading.SendOrPostCallback resendCallback;
        /// <summary>
        /// 自由线程,检測未发送的数据并发出
        /// </summary>
        void CheckUnConfirmedQueue()
        {
            //异步调用托付
            if (cucqCallpack == null) cucqCallpack = (s) => OnPackageSendFailure(s as PackageEventArgs);
            if (resendCallback == null) resendCallback = (s) => OnPackageResend(s as PackageEventArgs);
            do
            {
                if (SendList.Count > 0)
                {
                    PacketNetWorkMsg[] array = null;
                    lock (SendList)
                    {
                        array = SendList.ToArray();
                    }
                    //挨个又一次发送并计数
                    Array.ForEach(array, s =>
                    {
                        s.SendTimes++;
                        if (s.SendTimes >= MaxResendTimes)
                        {
                            //发送失败啊
                            PackageEventArgs e = new PackageEventArgs();
                            if (SeiClient.NeedPostMessage)
                            {
                                SeiClient.SendSynchronizeMessage(cucqCallpack, e);
                            }
                            else
                            {
                                OnPackageSendFailure(e);//触发发送失败事件
                            }
                            SendList.Remove(s);
                        }
                        else
                        {
                            //又一次发送
                            AsyncSendPacket(s);
                            PackageEventArgs e = new PackageEventArgs() { PacketMsg = s };
                            if (SeiClient.NeedPostMessage)
                            {
                                SeiClient.SendASynchronizeMessage(resendCallback, e);
                            }
                            else
                            {
                                OnPackageResend(e);//触发又一次发送事件
                            }
                        }
                    });
                }
                Thread.Sleep(CheckQueueTimeInterval);
            } while (IsInitialized);
        }

        static object lockObj = new object();
        /// <summary>
        /// 将数据信息压入列表
        /// </summary>
        /// <param name="item"></param>
        public void PushSendItemToList(PacketNetWorkMsg item)
        {
            SendList.Add(item);
        }

        /// <summary>
        /// 将数据包从列表中移除
        /// 网络层不会解析
        /// </summary>
        /// <param name="packageNo">数据包编号</param>
        /// <param name="packageIndex">数据包分包索引</param>
        public void PopSendItemFromList(long packageNo, int packageIndex)
        {
            lock (lockObj)
            {
                Array.ForEach(SendList.Where(s => s.PackageNo == packageNo && s.PackageIndex == packageIndex).ToArray(), s => SendList.Remove(s));
            }
        }

        #region 事件

        /// <summary>
        /// 网络出现异常,无法获取本地ip地址
        /// </summary>
        public event EventHandler IPError;

        protected void OnLocalIpError(EventArgs e)
        {
            if (IPError != null) IPError(this, e);
        }

        /// <summary>
        /// 网络出现异常(如端口无法绑定等,此时无法继续工作)
        /// </summary>
        public event EventHandler NetworkError;

        protected void OnNetworkError(EventArgs e)
        {
            if (NetworkError != null) NetworkError(this, e);
        }

        /// <summary>
        /// 当数据包收到时触发
        /// </summary>
        public event EventHandler<PackageEventArgs> PackageReceived;

        /// <summary>
        /// 当数据包收到事件触发时。被调用
        /// </summary>
        /// <param name="e">包括事件的參数</param>
        protected virtual void OnPackageReceived(PackageEventArgs e)
        {
            if (PackageReceived != null) PackageReceived(this, e);
        }
        /// <summary>
        /// 数据包发送失败
        /// </summary>
        public event EventHandler<PackageEventArgs> PackageSendFailure;

        /// <summary>
        /// 当数据发送失败时调用
        /// </summary>
        /// <param name="e">包括事件的參数</param>
        protected virtual void OnPackageSendFailure(PackageEventArgs e)
        {
            if (PackageSendFailure != null) PackageSendFailure(this, e);
        }

        /// <summary>
        /// 数据包未接收到确认,又一次发送
        /// </summary>
        public event EventHandler<PackageEventArgs> PackageResend;

        /// <summary>
        /// 触发又一次发送事件
        /// </summary>
        /// <param name="e">包括事件的參数</param>
        protected virtual void OnPackageResend(PackageEventArgs e)
        {
            if (PackageResend != null) PackageResend(this, e);
        }

        #endregion

        #region IDisposable 成员

        /// <summary>
        /// 关闭客户端并释放资源
        /// </summary>
        public void Dispose()
        {
            Close();
        }

        #endregion

    }
}

再然后来一个 UDP上层的类,用来把收到的包组装合并成为一个msg

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net;
using Netframe.Event;
using Netframe.Model;
using Netframe.Tool;
using System.Threading;

namespace Netframe.Core
{
    /// <summary>
    /// 对底层收到数据的解析
    /// </summary>
    public class MsgTranslator
    {
        #region 属性
        /// <summary>
        /// 用来发送和接收消息的对象
        /// </summary>
        public UDPThread Client { get; set; }

        Config _config;

        //用来检測反复收到的消息包
        Queue<long> ReceivedQueue;

        #endregion

        public MsgTranslator(UDPThread udpClient,Config config)
        {
            this.Client = udpClient;
            this._config = config;
            ReceivedQueue = new Queue<long>();
            Client.PackageReceived += PackageReceived;
        }

        /// <summary>
        /// 发送信息实体
        /// </summary>
        /// <param name="msg"></param>
        public void Send(Msg msg)
        {
            //消息正在发送事件
            OnMessageSending(new MessageEventArgs(msg));
            Client.AsyncSendMsg(msg);
            //消息已发送事件
            OnMessageSended(new MessageEventArgs(msg));
        }

        static object lockObj = new object();
        /// <summary>
        /// 消息包接收到时的事件
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void PackageReceived(object sender, PackageEventArgs e)
        {
            if (!e.IsHandled)
            {
                e.IsHandled = true;
                Msg m = ResolveToMessage(e.Data, e.RemoteIP);
                if (m == null) return;
                if (m.Command == Commands.RecvConfirm)
                {
                    long pno = m.NormalMsg.TryParseToInt(0);
                    int pindex = m.ExtendMessage.TryParseToInt(0);
                    if (pno != 0)
                        this.Client.PopSendItemFromList(pno, pindex);
                    return;
                }
                //检查近期收到的消息队列里面是否已经包括了这个消息包,假设是。则丢弃
                if (!ReceivedQueue.Contains(m.PackageNo))
                {
                    ReceivedQueue.Enqueue(m.PackageNo);
                    if (ReceivedQueue.Count > 100) ReceivedQueue.Dequeue();

                    OnMessageReceived(new MessageEventArgs(m));
                }
                else
                    OnMessageDroped(new MessageEventArgs(m));
            }
        }

        public Msg ResolveToMessage(byte[] buffer, IPEndPoint remoteEndPoint)
        {
            if (buffer == null || buffer.Length < 0) return null;
            Msg m = null;
            if (MessagePacker.Test(buffer))
            {
                PacketNetWorkMsg pack = MessagePacker.Parse(buffer, remoteEndPoint);
                if (pack == null) return null;
                if (DetermineConfirm(pack))
                {
                    //发送确认标志
                    Msg cm = Helper.CreateRecivedCheck(remoteEndPoint, pack.PackageNo, pack.PackageIndex, _config);
                    Client.SendMsg(cm);
                }
                m = MessagePacker.TryToTranslateMessage(pack);
            }
            return m;
        }
        /// <summary>
        /// 检測是否须要发送回复包来确认收到
        /// </summary>
        /// <param name="message"></param>
        /// <returns></returns>
        static bool DetermineConfirm(PacketNetWorkMsg packet)
        {
            return packet.IsRequireReceiveCheck;
        }
        static bool DetermineConfirm(Msg message)
        {
            return message.IsRequireReceive;
        }
        #region 事件

        /// <summary>
        /// 接收到消息包(UDP)
        /// </summary>
        public event EventHandler<MessageEventArgs> MessageReceived;
        SendOrPostCallback messageReceivedCallBack;
        /// <summary>
        /// 引发接收到消息包事件
        /// </summary>
        /// <param name="e"></param>
        protected virtual void OnMessageReceived(MessageEventArgs e)
        {
            if (MessageReceived == null) return;
            if (!SeiClient.NeedPostMessage)
            {
                MessageReceived(this, e);
            }
            else
            {
                if (messageReceivedCallBack == null)
                    messageReceivedCallBack = s => MessageReceived(this, s as MessageEventArgs);

                SeiClient.SendSynchronizeMessage(messageReceivedCallBack, e);
            }
        }

        /// <summary>
        /// 消息将要发送事件
        /// </summary>
        public event EventHandler<MessageEventArgs> MessageSending;
        SendOrPostCallback messageSendingCallBack;
        /// <summary>
        /// 引发消息将要发送事件
        /// </summary>
        /// <param name="e"></param>
        protected virtual void OnMessageSending(MessageEventArgs e)
        {
            if (MessageSending == null) return;

            if (!SeiClient.NeedPostMessage)
            {
                MessageSending(this, e);
            }
            else
            {
                if (messageSendingCallBack == null)
                    messageSendingCallBack = s => MessageSending(this, s as MessageEventArgs);
                SeiClient.SendSynchronizeMessage(messageSendingCallBack, e);
            }
        }

        /// <summary>
        /// 消息已经发送事件
        /// </summary>
        public event EventHandler<MessageEventArgs> MessageSended;
        SendOrPostCallback messageSendedCall;
        /// <summary>
        /// 引发消息已经发送事件
        /// </summary>
        /// <param name="e"></param>
        protected virtual void OnMessageSended(MessageEventArgs e)
        {
            if (MessageSended == null) return;

            if (!SeiClient.NeedPostMessage)
            {
                MessageSended(this, e);
            }
            else
            {
                if (messageSendedCall == null)
                    messageSendedCall = s => MessageSended(this, s as MessageEventArgs);
                SeiClient.SendSynchronizeMessage(messageSendedCall, e);
            }
        }

        /// <summary>
        /// 反复收包然后丢包事件
        /// </summary>
        public event EventHandler<MessageEventArgs> MessageDroped;
        /// <summary>
        /// 引发丢弃Msg事件
        /// </summary>
        /// <param name="e"></param>
        protected virtual void OnMessageDroped(MessageEventArgs e)
        {
            if (MessageDroped == null) return;

            MessageDroped(this, e);
        }

        #endregion

    }
}
时间: 2024-10-04 13:02:16

c#基于事件模型的UDP通讯框架(适用于网络包编解码)的相关文章

基于select模型的udp客户端实现超时机制

参考:http://www.cnblogs.com/chenshuyi/p/3539949.html 多路选择I/O — select模型 其思想在于使用一个集合,该集合中包含需要进行读写的fd,通过轮询这个集合,直到有一个fd可读写,才返回.与阻塞I/O不同的是,阻塞I/O仅使用了一次系统调用,就是对fd的读写,如果没有fd处于就绪状态,则进程一直阻塞,而多路选择I/O使用了两次系统调用,第一次是轮询并返回可读写fd数,第二次是对fd进行读写,阻塞只发生在轮询fd的过程. select函数的原

关于事件模型的一些看法

http://forkme.info/about-event-loop/ 概述 事件处理模型, 也即是全异步事件处理模型.在以前, 对于那些同时执行多项任务, 但仍能响应用户交互的应用程序通常需要实施一种使用多进程(如linux的fork操作)或者多线程的操作.对于低并发的环境, 这样做无疑能避免进程因等待某个操作而出现"假死"现象.但对于更复杂的异步应用程序或者是要求高并发的环境, 就要使用事件模型来处理异步事件, 这样做有很多好处: 在高并发条件下响应用户时间更快; 内存消耗降低,

ExtJS框架基础:事件模型及其常用功能

前言 工作中用ExtJS有一段时间了,Ext丰富的UI组件大大的提高了开发B/S应用的效率.虽然近期工作中天天都用到ExtJS,但很少对ExtJS框架原理性的东西进行过深入学习,这两天花了些时间学习了下.我并不推荐大家去研究ExtJS框架的源码,虽然可以学习其中的思想和原理,但太浪费精力了,除非你要自己写框架. 对于ExtJS这种框架,非遇到"杂症"的时候我觉得也没必要去研究其源码和底层的原理,对其一些机制大致有个概念,懂得怎么用就行,这也是本篇博文的主要目的. Ext自己的事件机制

HP-SOCKET TCP/UDP通信框架库解析

项目概述: HP-SOCKET是一套通用TCP/UDP通信框架,包括服务器.客户端.Agent组件:其目标是提供高性能.通用性.简易性.可扩展.可定制: 鉴于此,其仅实现基本的通用框架通信.数据收发功能,供上层应用直接简单使用的接口实现:而对于数据包完整性和协议解析等未处理, 也就意味着需要应用层自己处理一些数据包构造或解析等操作: 事实上目前只能支持windows平台: 1. 对于TCP通信模式下:服务器端和Agent均采用的是异步IO模型中的完成端口模型,客户端采用的是就绪IO通告模型中的W

高性能 TCP &amp; UDP 通信框架 HP-Socket v3.5.1 正式发布

HP-Socket 是一套通用的高性能 TCP/UDP 通信框架,包含服务端组件.客户端组件和 Agent 组件,广泛适用于各种不同应用场景的 TCP/UDP 通信系统,提供 C/C++.C#.Delphi.E(易语言).Java.Python 等编程语言接口.HP-Socket 对通信层实现完全封装,应用程序不必关注通信层的任何细节:HP-Socket 提供基于事件通知模型的 API 接口,能非常简单高效地整合到新旧应用程序中. 为了让使用者能方便快速地学习和使用 HP-Socket,迅速掌握

高性能 TCP &amp; UDP 通信框架 HP-Socket v3.2.2 正式发布

HP-Socket 是一套通用的高性能 TCP/UDP 通信框架,包含服务端组件.客户端组件和 Agent 组件,广泛适用于各种不同应用场景的 TCP/UDP 通信系统,提供 C/C++.C#.Delphi.E(易语言).Java.Python 等编程语言接口.HP-Socket 对通信层实现完全封装,应用程序不必关注通信层的任何细节:HP-Socket 提供基于事件通知模型的 API 接口,能非常简单高效地整合到新旧应用程序中. 为了让使用者能方便快速地学习和使用 HP-Socket,迅速掌握

高性能 TCP &amp; UDP 通信框架 HP-Socket v3.3.1 正式发布

HP-Socket 是一套通用的高性能 TCP/UDP 通信框架,包含服务端组件.客户端组件和 Agent 组件,广泛适用于各种不同应用场景的 TCP/UDP 通信系统,提供 C/C++.C#.Delphi.E(易语言).Java.Python 等编程语言接口.HP-Socket 对通信层实现完全封装,应用程序不必关注通信层的任何细节:HP-Socket 提供基于事件通知模型的 API 接口,能非常简单高效地整合到新旧应用程序中. 为了让使用者能方便快速地学习和使用 HP-Socket,迅速掌握

HP-Socket 是一套通用的高性能 TCP/UDP 通信框架

???HP-Socket 是一套通用的高性能 TCP/UDP 通信框架,包含服务端组件.客户端组件和Agent组件,广泛适用于各种不同应用场景的 TCP/UDP 通信系统,提供 C/C++.C#.Delphi.E(易语言).Java.Python 等编程语言接口.HP-Socket 对通信层实现完全封装,应用程序不必关注通信层的任何细节:HP-Socket 提供基于事件通知模型的 API 接口,能非常简单高效地整合到新旧应用程序中.????为了让使用者能方便快速地学习和使用 HP-Socket

高性能 TCP &amp;amp; UDP 通信框架 HP-Socket v3.2.2 正式公布

HP-Socket 是一套通用的高性能 TCP/UDP 通信框架.包括服务端组件.client组件和 Agent 组件,广泛适用于各种不同应用场景的 TCP/UDP 通信系统,提供 C/C++.C#.Delphi.E(易语言).Java.Python 等编程语言接口.HP-Socket 对通信层实现全然封装.应用程序不必关注通信层的不论什么细节:HP-Socket 提供基于事件通知模型的 API 接口,能很easy高效地整合到新旧应用程序中. 为了让使用者能方便高速地学习和使用 HP-Socke