广播与P2P通道(下) -- 方案实现

广播与P2P通道(上) -- 问题与方案 一文中,我们已经找到了最优的模型,即将广播与P2P通道相结合的方案,这样能使服务器的带宽消耗降到最低,最大节省服务器的宽带支出。当然,如果从零开始实现这种方案无疑是非常艰巨的,但基于ESFramework提供的通信功能和P2P功能来做,就不再那么遥不可及了。

1.P2P通道状态

根据上文模型3的讨论,要实现该模型,每个客户端需要知道自己与哪些用户创建了P2P通道,服务器也要知道每个客户端已建立的P2P通道的状态。

使用ESFramework,在客户端已经可以通过IRapidPassiveEngine.P2PController接口知道当前客户端与哪些其它客户端成功建立了P2P通道,并且可以通过P2PController接口发起与新的客户端建立新的P2P通道的尝试。但在服务端,对于每个客户端建立了哪些P2P通道,服务端是一无所知的。所以,基于ESFramework实现模型3的第一件事情,就是客户端要实时把自己的P2P状态变化报告给服务端,而服务端也要管理每个客户端的P2P通道状态。(注意。下面的所有实现,需要引用ESFramework.dll、ESPlus.dll、ESBasic.dll)

(1)P2PChannelManager

我们在服务端设计P2PChannelManager类来管理每个在线客户端已成功创建的所有P2P通道。

    public class P2PChannelManager
    {
        //key 表示P2P通道的起始点用户ID,value 表示P2P通道的目的点用户列表。(单向,因为某些P2P通道就是单向的)
        private SortedArray<string, SortedArray<string>> channels = new SortedArray<string, SortedArray<string>>();

        public void Initialize(IUserManager userManager)
        {
            userManager.SomeOneDisconnected += new ESBasic.CbGeneric<UserData, ESFramework.Server.DisconnectedType>(userManager_SomeOneDisconnected);
        }

        void userManager_SomeOneDisconnected(UserData user, ESFramework.Server.DisconnectedType obj2)
        {
            this.channels.RemoveByKey(user.UserID);
        }

        public void Register(string startUserID, string destUserID)
        {
            if (!this.channels.ContainsKey(startUserID))
            {
                this.channels.Add(startUserID, new SortedArray<string>());
            }

            this.channels[startUserID].Add(destUserID);
        }

        public void Unregister(string startUserID, string destUserID)
        {
            if (this.channels.ContainsKey(startUserID))
            {
                this.channels[startUserID].Remove(destUserID);
            }
        }

        public bool IsP2PChannelExist(string startUserID, string destUserID)
        {
            if (!this.channels.ContainsKey(startUserID))
            {
                return false;
            }

            return this.channels[startUserID].Contains(destUserID);
        }

     }

P2PChannelManager提供了注册P2P通道、注销P2P通道、以及查询P2P通道是否存在的方法。其内部使用类似字典的SortedArray来管理每个用户的已经成功建立的P2P通道(即与哪些其它用户打通了P2P)。另外,P2PChannelManager预定了IUserManager的SomeOneDisconnected事件,这样,当某个用户掉线时,就可以清除其所有的P2P状态。因为,在ESFramework中,当客户端与服务器的TCP连接断开时,客户端会自动关闭所有的P2P通道。

(2)客户端实时报告自己的P2P状态变化给服务端

当客户端每次成功创建一个P2P通道、或者已有P2P通道中断时,客户端要发消息告诉服务端。这样,我们就需要定义这个消息的类型:

    public static class MyInfoTypes
    {
        public const int P2PChannelOpen = 1;
        public const int P2PChannelClose = 2;
    }

再定义消息协议:

    public class P2PChannelReportContract
    {
        public P2PChannelReportContract() { }
        public P2PChannelReportContract(string dest)
        {
            this.destUserID = dest;
        }

        #region DestUserID
        private string destUserID;
        public string DestUserID
        {
            get { return destUserID; }
            set { destUserID = value; }
        }
        #endregion
    }

定好了消息类型和contract类,我们在客户端预定P2P通道的状态变化,并报告给服务端:

   public void Initialize(IRapidPassiveEngine rapidPassiveEngine)
    {
        rapidPassiveEngine.P2PController.P2PChannelOpened += new CbGeneric<P2PChannelState>(P2PController_P2PChannelOpened);
        rapidPassiveEngine.P2PController.P2PChannelClosed += new CbGeneric<P2PChannelState>(P2PController_P2PChannelClosed);
    }
    void P2PController_P2PChannelClosed(P2PChannelState state)
    {
        this.P2PChannelReport(false, state.DestUserID);
    }

    void P2PController_P2PChannelOpened(P2PChannelState state)
    {
        this.P2PChannelReport(true, state.DestUserID);
    }

    private void P2PChannelReport(bool open, string destUserID)
    {
        P2PChannelReportContract contract = new P2PChannelReportContract(destUserID);
        int messageType = open ? MyInfoTypes.P2PChannelOpen : MyInfoTypes.P2PChannelClose;
        this.rapidPassiveEngine.CustomizeOutter.Send(messageType, CompactPropertySerializer.Default.Serialize(contract));
    }

在服务端,我们需要处理这两种类型的消息(实现ICustomizeHandler接口的HandleInformation方法):

    private P2PChannelManager p2PChannelManager = new P2PChannelManager();
   public void HandleInformation(string sourceUserID, int informationType, byte[] information)
    {
        if (informationType == MyInfoTypes.P2PChannelOpen)
        {
            P2PChannelReportContract contract = CompactPropertySerializer.Default.Deserialize<P2PChannelReportContract>(information, 0);
            this.p2PChannelManager.Register(sourceUserID, contract.DestUserID);
            return ;
        }

        if (informationType == MyInfoTypes.P2PChannelClose)
        {
            P2PChannelReportContract contract = CompactPropertySerializer.Default.Deserialize<P2PChannelReportContract>(information, 0);
            this.p2PChannelManager.Unregister(sourceUserID, contract.DestUserID);
            return ;
        }    }

这样,服务端就实时地知道每个客户端的P2P状态了。

2.与广播结合

同样的,我们首先为广播消息定义一个消息类型:

    public static class MyInfoTypes
    {
        public const int P2PChannelOpen = 1;
        public const int P2PChannelClose = 2;
        public const int Broadcast = 3; //广播消息
    }

再定义对应的协议类:

    public class BroadcastContract
    {
        #region Ctor
        public BroadcastContract() { }
        public BroadcastContract(string _broadcasterID, string _groupID, int infoType ,byte[] info )
        {
            this.broadcasterID = _broadcasterID;
            this.groupID = _groupID;
            this.content = info;
            this.informationType = infoType;
            this.actionTypeOnChannelIsBusy = action;
        }
        #endregion

        #region BroadcasterID
        private string broadcasterID = null;
        /// <summary>
        /// 发出广播的用户ID。
        /// </summary>
        public string BroadcasterID
        {
            get { return broadcasterID; }
            set { broadcasterID = value; }
        }
        #endregion

        #region GroupID
        private string groupID = "";
        /// <summary>
        /// 接收广播的组ID
        /// </summary>
        public string GroupID
        {
            get { return groupID; }
            set { groupID = value; }
        }
        #endregion

        #region InformationType
        private int informationType = 0;
        /// <summary>
        /// 广播信息的类型。
        /// </summary>
        public int InformationType
        {
            get { return informationType; }
            set { informationType = value; }
        }
        #endregion

        #region Content
        private byte[] content;
        public byte[] Content
        {
            get { return content; }
            set { content = value; }
        }
        #endregion

  }

(1)在客户端发送广播消息

在客户端,我们根据与组内成员的P2P通道的状态,来判断发送的方案,就像依据上文提到的,可细分为三种情况:

a.当某个客户端发现自己和组内的所有其它成员都建立了P2P通道时,那么,它就不用把广播消息发送给服务器了。

b.如果客户端与组内的所有其它成员的P2P通道都没有建立成功,那么,它只需要将广播消息发送给服务器。

c.如果客户端与部分组内的成员建立了P2P通道,那么,它不仅需要将广播消息发送给服务器,还需要将该广播消息经过每个P2P通道发送一次。

    public void Broadcast(string currentUserID, string groupID, int broadcastType, byte[] broadcastContent)
    {
        BroadcastContract contract = new BroadcastContract(currentUserID, groupID, broadcastType, broadcastContent);
        byte[] info = CompactPropertySerializer.Default.Serialize(contract);
        List<string> members = this.groupManager.GetGroupMembers(groupID);
        if (members == null)
        {
            return;
        }
        bool allP2P = true;
        foreach (string memberID in members)
        {
            if (memberID == this.currentUserID)
            {
                continue;
            }

            if (rapidPassiveEngine.P2PController.IsP2PChannelExist(memberID))
            {
                rapidPassiveEngine.CustomizeOutter.SendByP2PChannel(memberID, MyInfoTypes.Broadcast, info, ActionTypeOnNoP2PChannel.Discard, true, ActionTypeOnChannelIsBusy.Continue);
            }
            else
            {
                allP2P = false;
            }
        }

        if (!allP2P) //只要有一个组成员没有成功建立P2P,就要发给服务端。
        {
            this.rapidPassiveEngine.CustomizeOutter.Send(null, this.groupInfoTypes.Broadcast, info, true, action);
        }
    }

 (2)服务端转发广播

当服务器收到一个广播消息时,首先,查看目标组中的用户,然后,根据广播消息的发送者的P2P通道状态,来综合决定该广播消息需要转发给哪些客户端。我们只需在上面的HandleInformation方法中增加代码就可以了:

   if (informationType == MyInfoTypes.Broadcast)
    {
        BroadcastContract contract = CompactPropertySerializer.Default.Deserialize<BroadcastContract>(information, 0);
        string groupID = contract.GroupID;

        List<string> members = this.groupManager.GetGroupMembers(groupID);
        if (members != null)
        {
            foreach (string memberID in members)
            {
                bool useP2PChannel = this.p2PChannelManager.IsP2PChannelExist(sourceUserID, memberID);
                if (memberID != sourceUserID && !useP2PChannel)
                {
                    this.customizeController.Send(memberID, MyInfoTypes.Broadcast, information, true, ActionTypeOnChannelIsBusy.Continue);
                }
            }
        }
        return;
    }

(3)客户端处理接收到的广播消息

客户端也只要实现ICustomizeHandler接口的HandleInformation方法,就可以处理来自P2P通道或者转发自服务端的广播消息了(即处理MyInfoTypes.Broadcast类型的消息),这里就不赘述了。

实际上,本文的实现还可以进一步优化,特别是在高频的广播消息时(如前文举的视频会议的例子),这种优化效果是很明显的。那就是,比如,我们在客户端可以将组内的成员分成两类管理起来,一类是P2P已经打通的,一类是没有通的,并根据实际的P2P状态变化而调整。这样,客户端每次发送广播消息时,就不用遍历自己与每个组员的P2P通道的状态,这可以节省不少的cpu时间。同理,服务端也可以如此处理。

时间: 2024-10-28 22:58:18

广播与P2P通道(下) -- 方案实现的相关文章

WCF在tcp通道下启用httpget

关于tcp通道下,启用httpget,必须启用一个http的基地址,如果要启用无数据交换,host中必须开启服务描述. //01 create host Uri tcpBaseAddress = new Uri("net.tcp://127.0.0.1:12345/kp"); Uri httpBaseAddress = new Uri("http://127.0.0.1:12346/kp"); ServiceHost host = new ServiceHost(t

Android利用广播监听按下HOME和电源键

package cc.testhome; import cc.testhome.HomeKeyObserver.OnHomeKeyListener; import cc.testhome.PowerKeyObserver.OnPowerKeyListener; import android.os.Bundle; import android.app.Activity; /** * Demo描述: * 利用广播监听Home键的按下和长按Home键 * 利用广播监听电源键的按下(关闭屏幕) * *

树莓派进阶之路 (038) - P2P 文件下载机

硬件要求: 树莓派开发板 USB外接硬盘 一. Together 1. 更新安装程序 sudosudo apt- apt-get update get updat sudo apt-get upgrade sudo apt-get install python-software-properties //树莓派不用添加repo sudo add-apt-repository ppa:deluge-team/ppa //树莓派不用添加repo,直接执行下面两步 2. 安装Deuge: sudo a

我的作品

1.ESFramework通信框架 ESFramework 是一套性能卓越.稳定可靠.强大易用的跨平台通信框架,支持应用服务器集群.其内置了消息的收发与自定义处理(支持同步/异步模型).消息广播.P2P通道.文件传送(支持断点续传).心跳检测.断线重连.登录验证.在线用户管理.好友与群组管理.性能诊断等功能.基于ESFramework,您可以方便快捷地开发出各种优秀的网络通信应用.此外,我们在长期实践中所积累的丰富经验,更将成为您强大的技术保障,从开发到上线直至后续运维,全程为您保驾护航,让您高

成熟的C#网络通信框架介绍——ESFramework通信框架

ESFramework通信框架是一套性能卓越.稳定可靠.强大易用的跨平台C#网络通信框架,支持应用服务器集群.其内置了消息的收发与自定义处理(支持同步/异步模型).消息广播.P2P通道.文件传送(支持断点续传).心跳检测.断线重连.登录验证.在线用户管理.好友与群组管理.性能诊断等功能.基于ESFramework通信框架,您可以方便快捷地开发出各种优秀的网络通信应用.此外,我们在长期实践中所积累的丰富经验,更将成为您强大的技术保障,从开发到上线直至后续运维,全程为您保驾护航,让您高枕无忧.具体而

ESFramework ——可堪重任的网络通信框架

ESFramework是一套性能卓越.稳定可靠.强大易用的跨平台通信框架,支持应用服务器集群.其内置了消息的收发与自定义处理(支持同步/异步模型).消息广播.P2P通道.文件传送(支持断点续传).心跳检测.断线重连.登录验证.在线用户管理.好友与群组管理.性能诊断等功能.基于ESFramework,您可以方便快捷地开发出各种优秀的网络通信应用.此外,我们在长期实践中所积累的丰富经验,更将成为您强大的技术保障,从开发到上线直至后续运维,全程为您保驾护航,让您高枕无忧.具体而言,ESFramewor

高铁站地下停车场FM广播信号覆盖方案

(北京恒星科通科技发展有限公司http://www.bjhxkt.com010-82565576 13810952040)一.高铁站地下停车场FM广播信号覆盖系统概述21世纪之交对中国而言,是一个大变革大发展大融合的年代,中国的汽车工业飞速前进,随着人民生活水平的提高,汽车由政府部门和权贵象征走入千家万户,逐步成为工薪阶层的代步工具.由于城市土地资源的宝贵和紧缺,地下停车场如雨后春笋应运而生,成为机场.火车站.商场.写字楼.居民区和企事业单位不可或缺的配套建筑,地下停车场设计越来越先进,规模越来

Android广播机制(转)

1.Android广播机制概述 Android广播分为两个方面:广播发送者和广播接收者,通常情况下,BroadcastReceiver指的就是广播接收者(广播接收器).广播作为Android组件间的通信方式,可以使用的场景如下:1.同一app内部的同一组件内的消息通信(单个或多个线程之间): 2.同一app内部的不同组件之间的消息通信(单个进程): 3.同一app具有多个进程的不同组件之间的消息通信: 4.不同app之间的组件之间消息通信: 5.Android系统在特定情况下与App之间的消息通

【开源下载】c#编写的聊天程序微风IM 版本2 增加局域网P2P通信

新年第一天 恭祝大家新年快乐 一直有朋友问P2P相关的问题,最近有时间在微风IM的基础上,实现了P2P通信,共享给大家,希望大家批评指正. 源码下载 (只包含源码,无插入式广告:)  数据库下载   数据库与第一版相同没有变化 我们知道在网络通信中,如果所有的通信都通过服务器转发,会增加服务器的负担,如果实现了P2P,客户端之间直接通讯,比如聊天或者传送文件时不再通过服务器,而是客户端之间直接通信,将会有效的减轻服务器的负担,提高程序的效率. 本节相关的P2P,指的是通过TCP协议,在局域网中实