怎么办,这很尴尬,为啥呢,因为kbe的某些原因让我放弃了使用它所以本打算继续更新的,说一下原因:
在DAY1中我希望kbe能够开启一个http服务,并让php端做一个web请求将消息传递给对应的用户,可是这个http服务我是写起来了,发送消息的函数也写出来(花了不少时间,kbe的注释和文档都不多,特别是kbe把BaseHttpServer这个python库另外弄了个名字,用http.server import as 才导入成功)尴尬的就是http服务和发消息的函数怎么也放不到一起:
1.一旦某个class不继承自KBEngine.Base,那么他就无法访问KBEngine的几乎所有静态函数、属性,就无法获取到对应用户的mailbox完成消息发送
2.一旦继承KBEgine.Base,你就做不了HTTP 服务,因为你的handler必须继承baseHandler,你继承不了,且即使你继承到baseHandler去访问KBEgine.Base的mailbox之类的又回到刚的死逻辑之中
3.系统库的HTTP服务会阻塞进程,这个文档还是写了,不过替代框架太麻烦,且调试太不方便,且语法太熟悉,且…………虽然我想说一万个且,只能说明我无能啊…………
当然论坛和官网当中也有人反应类似的问题,例如第三方接口访问KBE的成员/属性问题,不过看起来好像并没有现成的解决方案,最后的最后。。。我放弃了
然后呢~~我自己老老实实写了一个消息服务器(基于socket ,with WPF .net 4.5+)以及消息协议
消息协议采用http://msgpack.org/ 基本上支援所有的语言,因此实际上我这个消息服务器可以服务任何类型的客户端,不管你啥平台啥语言
1.0版协议(还没名字呢)规定:
1.BasePack代表发送的包,BaseAckPack代表回执包,BaseAckPack继承自BasePack
2.每个Pack长度为1024字节,且第0~4字节转换成int代表pack类型, BasePack及其子类从1.2.3...10... BaseAckPack 及其子类从1001,1002,1003...1010...(有考虑负数,其实应该也ok)
为啥这样做? 这里很奇特,你在把这1024个字节用msgpack转成对象之前,你并不知道这个pack是哪个对象,你不能统一按某一个特定的对象去转,比如LoginPack比BasePack只多了2个属性,你在不知道它是一个LoginPack还是一个BasePack之前,你无法拆开他,你按任何一种来拆开都有可能出错(属性多了或少了,熟悉iOS 的KVC的应该很清楚),所以必须先把前面4个字节腾出来,可选的,第5~8个字节放长度(mespack可以长度大于内容拆开没问题),读8个字节之后再读剩下的1016(当然不一定每个包一定得是1024,可以更大,毕竟我目前够用了)个字节
using System; //send包 namespace Packs { //基础包 public class BasePack<T> { public int packType; public int fromId; public int toId; public int messageId; //将基本包转bytes public byte[] PackToBytes() { var encode = MsgPack.Serialization.MessagePackSerializer.Get<T>(); byte[] packContent = encode.PackSingleObject(this); byte[] type = BasePack.intToBytes(this.packType); byte[] len = BasePack.intToBytes(packContent.Length); int lenth = packContent.Length; byte[] dest = new byte[1024]; //第一个int空间:类型 Buffer.BlockCopy(type, 0, dest, 0, type.Length); //第二个int空间:长度 Buffer.BlockCopy(len, 0, dest, type.Length, len.Length); //剩余空间:包内容 Buffer.BlockCopy(packContent, 0, dest, type.Length+len.Length, packContent.Length); Console.WriteLine("打包pack,类型:" + this.packType + "长度:" + packContent.Length); return dest; } //将bytes转回基本包 public static T BytesToPack(byte[] bytes) { var encode = MsgPack.Serialization.MessagePackSerializer.Get<T>(); return encode.UnpackSingleObject(bytes); } } public class BasePack:BasePack<BasePack> { public const int LOGIN_PACK = 1; public const int REGISTER_PACK = 2; public const int PING_PACK = 3; public const int PONG_PACK = 4; public const int TEXT_PACK = 5; public const int SYSTEM_PUSH_PACK = 6; public const int LOGIN_ACK = 1001; public const int REGISTER_ACK = 1002; public const int PING_ACK = 1003; public const int PONG_ACK = 1004; public const int TEXT_ACK = 1005; public const int SYSTEM_PUSH_ACK = 1006; public const int CONNECTED_ACK = 1007; /** * 将int数值转换为占四个字节的byte数组,本方法适用于(低位在前,高位在后)的顺序。 * @param value * 要转换的int值 * @return byte数组 */ public static byte[] intToBytes(int value) { byte[] byte_src = new byte[4]; byte_src[3] = (byte)((value & 0xFF000000) >> 24); byte_src[2] = (byte)((value & 0x00FF0000) >> 16); byte_src[1] = (byte)((value & 0x0000FF00) >> 8); byte_src[0] = (byte)((value & 0x000000FF)); return byte_src; } /** * byte数组中取int数值,本方法适用于(低位在前,高位在后)的顺序。 * * @param ary * byte数组 * @param offset * 从数组的第offset位开始 * @return int数值 */ public static int bytesToInt(byte[] ary, int offset) { int value; value = (int)((ary[offset] & 0xFF) | ((ary[offset + 1] << 8) & 0xFF00) | ((ary[offset + 2] << 16) & 0xFF0000) | ((ary[offset + 3] << 24) & 0xFF000000)); return value; } } //1.登录包 public class LoginPack: BasePack<LoginPack> { public string username; public string password; public LoginPack() { this.packType = BasePack.LOGIN_PACK; } } //5.文字包 public class TextPack:BasePack<TextPack> { public string content; public string toUser; public string fromUser; public TextPack() { this.packType = BasePack.TEXT_PACK; } } //6.系统推送包 public class SystemPushPack: BasePack<SystemPushPack> { public string content; public string toUser; public SystemPushPack() { this.packType = BasePack.SYSTEM_PUSH_PACK; } } }
3.server端Accept之后立即发送ConnectPack,客户端收到后发送ConnectAckPack完成连接
private void OnAccept() { while (this.isServing) { //异步Accept 回调ConnEnd //serverSocket.BeginAccept(new System.AsyncCallback(this.ConnEnd), null); //同步Accept Socket clientSocket = serverSocket.Accept(); ReceiveObject obj = new ReceiveObject(); obj.acceptClient = clientSocket; clients.Add(obj); Thread receiveThread = new Thread(OnReceive); receiveThread.Start(obj); cThreads.Add(clientSocket.RemoteEndPoint.ToString(), receiveThread); Console.WriteLine("新的客户端连接:" + clientSocket.RemoteEndPoint.ToString()); BaseACKPack pack = new BaseACKPack(); pack.packType = BasePack.CONNECTED_ACK; clientSocket.Send(pack.PackToBytes()); } }
4.客户端发送LoginPack(由于php已经校验了用户名和密码并且生成了token,所以loginPack实际上我没有写校验密码的逻辑,单纯的绑定用户名,用来接收消息),server端拆开pack将用户名绑定到客户端对象中,这个对象的内容如下:
using System.Net.Sockets; public class ReceiveObject { public Socket acceptClient; public byte[] buffer = new byte[1024]; public string userId; public string userName; public int roomId; public ReceiveObject() { } }
整个处理函数:
private void OnReceive(object obj) { while (this.isServing) { ReceiveObject e = obj as ReceiveObject; Socket c = e.acceptClient; e.buffer = new byte[1024]; //判断包类型,固定包在包之前 int type = c.Receive(e.buffer, 0, sizeof(Int32), SocketFlags.None); if (type == 0) { Console.WriteLine("客户端断开连接:" + c.RemoteEndPoint.ToString()); //clients.RemoveAll((ReceiveObject obj) => { return obj.acceptClient == 0 ? true : false; }); clients.Remove(e); cThreads.Remove(c.RemoteEndPoint.ToString()); Thread.CurrentThread.Abort(); //断开连接 c.Shutdown(SocketShutdown.Both); c.Close(); break; } type = BasePack.bytesToInt(e.buffer, 0); //获得包大小,固定第2个int int len = c.Receive(e.buffer, 0, sizeof(Int32), SocketFlags.None); len = BasePack.bytesToInt(e.buffer, 0); int receiveNumber = c.Receive(e.buffer, 0, 1024 - sizeof(Int32) * 2, SocketFlags.None); switch (type) { case BasePack.LOGIN_PACK: { LoginPack lPack = LoginPack.BytesToPack(e.buffer); Console.WriteLine("收到登录请求,用户名:" + lPack.username + "密码:" + lPack.password); e.userName = lPack.username; //发送登录ACK LoginACKPack loginACK = new LoginACKPack(); //loginACK.success = true; c.Send(loginACK.PackToBytes()); } break; case BasePack.TEXT_PACK: { //处理消息包 TextPack pack = TextPack.BytesToPack(e.buffer); //处理basePack Console.WriteLine("发送给" + pack.toUser + "的消息:" + pack.content); //从clients组找用户 List<ReceiveObject> list = clients.FindAll((ReceiveObject o) => { return o.userName == pack.toUser ? true : false; }); foreach (ReceiveObject target in list) { target.acceptClient.Send(pack.PackToBytes()); } } break; case BasePack.TEXT_ACK: { //处理消息回执 TextACKPack pack = TextACKPack.BytesToPack(e.buffer); //删除对应的消息 pusher.DeleteMessageById(pack.messageId); } break; case BasePack.SYSTEM_PUSH_ACK: { SystemPushACKPack pack = SystemPushACKPack.BytesToPack(e.buffer); //删除对应的消息 pusher.DeleteMessageById(pack.messageId); } break; default: //处理未知包 { } break; } } }
发送消息函数,目前写了2个case 原因:php端的推送类型很多,我直接写在pushPack的content内部,客户端用json解析开就行了,然后做了一个单聊的文本消息发送,按群组推还没来得及做:
public void SendMsg(string from, string to, string body, int type, int messageId) { //从clients组找用户 ReceiveObject target = clients.FindLast((ReceiveObject o) => { return o.userName == to ? true : false; }); if (target == null) return; //推送一条消息至客户端 //收到回执后才能修改sent状态为1 Console.WriteLine("推送消息给:" + to + "类型:" + type + "内容:" + body + "id:" + messageId); switch (type) { //推送文字消息 case BasePack.TEXT_PACK: { TextPack txtPack = new TextPack(); txtPack.fromUser = from; txtPack.toUser = to; txtPack.content = body; txtPack.messageId = messageId; target.acceptClient.Send(txtPack.PackToBytes()); } break; //系统消息 case BasePack.SYSTEM_PUSH_PACK: { SystemPushPack txtPack = new SystemPushPack(); txtPack.toUser = to; txtPack.content = body; txtPack.messageId = messageId; target.acceptClient.Send(txtPack.PackToBytes()); } break; default: { } break; } } }
然后就是消息队列和php《-》c#间的调用问题
1.严格按照p2p模型和pubSub模型的消息队列,即:
p2p模型: 如果消息接受者的username在clients数组中,立即发送标,否则存入数据库作为离线消息,待该用户登录时再从数据库取出该用户的离线消息至内存中继续发送,直到收到相应类型的ack或baseAck(客户端的协议比服务器端低),从数据库中彻底移除;
pubSub 模型:不管消息接受在clients数组中有多少个(相同的roomId标记),0到理论上限个,立即发送且不需要回执且立即从内存中移除且不存入数据库
2.由于php和c#程序是2个不同的进程,所以涉及到进程间通信,如果这2个程序运行在同一台电脑上,可行的办法有:共享内存、本地socket、管道等等??但是实际情况可能我们更希望web程序和消息程序可以不在同一台电脑,因此其他的方法:共享同一个数据库连接、http轮询
具体可以根据情况选择,我这里两种都有写。
且我的期望是php每插入一条消息,c#马上推送出去,那么c#做数据库轮询或者http轮询其实都还好,我只用了一个线程做轮询。
最后今天写下游戏端吧:
终于可以推各种包了,开始游戏包、出牌包、胜利包 DAY1已经描述,目前在做的: 客户端牌型校验以及每一局中的每一轮何时判定。
这个游戏规则就是标准的跑得快,也就是拿到黑桃3的玩家第一局第一轮先出牌,这里还没做,可以在所有玩家收到开始游戏包之后做一个简单的校验。
过牌直接调用出牌接口,传一个空的字符串即可,目前还没有主动过牌和结束每一轮的逻辑,做了结束每一局的逻辑,即判定胜负。
最后是几个测试截图,玩家id 45 和玩家 id 50玩了一局: