MINA 网络黏包处理代码

本文完整代码,可以浏览:

https://github.com/hjj2017/xgame-code_server/blob/master/game_server/src/com/game/gameServer/framework/mina/MsgCumulativeFilter.java

我在网上查阅过的 MINA 黏包处理,一般都是放在 Decoder 中做的。也就是黏包处理和消息解码放在一起做,显得比较混乱不好打理。而以下这段代码,我是把黏包处理放在 Filter 中了。在具体使用时可以这样:

 1 // 创建 IO 接收器
 2 NioSocketAcceptor acceptor = new NioSocketAcceptor();
 3
 4 // 获取责任链
 5 DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();
 6 // 处理网络粘包
 7 chain.addLast("msgCumulative", new MsgCumulativeFilter());
 8
 9 // 添加自定义编解码器
10 chain.addLast("msgCodec", new ProtocolCodecFilter(
11     new XxxEncoder(),
12     new XxxDecoder()
13 ));
14
15 // 获取会话配置
16 IoSessionConfig cfg = acceptor.getSessionConfig();
17
18 // 设置缓冲区大小
19 cfg.setReadBufferSize(4096);
20 // 设置 session 空闲时间
21 cfg.setIdleTime(IdleStatus.BOTH_IDLE, 10);
22
23 // 设置 IO 句柄
24 acceptor.setHandler(new XxxHandler());
25 acceptor.setReuseAddress(true);
26
27 try {
28     // 绑定端口
29     acceptor.bind(new InetSocketAddress("127.0.0.1", 4400));
30 } catch (Exception ex) {
31     // 输出错误日志
32     System.error.println(ex);
33 }

目前 Netty 框架要比 MINA 流行的多,而且 Netty 对网络黏包处理也做了很好的处理,不用开发者自己费那么大劲。我也考虑过迁移到 Netty 框架上,不过目前还没有找到特别充分的理由。闲话不多说了,以下就是黏包处理代码:

  1 package com.game.gameServer.framework.mina;
  2
  3 import java.util.concurrent.ConcurrentHashMap;
  4
  5 import org.apache.mina.core.buffer.IoBuffer;
  6 import org.apache.mina.core.filterchain.IoFilterAdapter;
  7 import org.apache.mina.core.session.IoSession;
  8
  9 import com.game.gameServer.framework.FrameworkLog;
 10 import com.game.gameServer.msg.SpecialMsgSerialUId;
 11 import com.game.part.msg.IoBuffUtil;
 12
 13 /**
 14  * 消息粘包处理
 15  *
 16  * @author hjj2017
 17  * @since 2014/3/17
 18  *
 19  */
 20 class MsgCumulativeFilter extends IoFilterAdapter {
 21     /**
 22      * 从客户端接收的消息估计长度,
 23      * {@value} 字节,
 24      * 对于从客户端接收的数据来说, 都是简单的命令!
 25      * 很少超过 {@value}B
 26      *
 27      */
 28     private static final int DECODE_MSG_LEN = 64;
 29     /** 容器 Buff 字典 */
 30     private static final ConcurrentHashMap<Long, IoBuffer> _containerBuffMap = new ConcurrentHashMap<>();
 31
 32     @Override
 33     public void sessionClosed(NextFilter nextFilter, IoSession sessionObj) throws Exception {
 34         if (nextFilter == null ||
 35             sessionObj == null) {
 36             // 如果参数对象为空,
 37             // 则直接退出!
 38             FrameworkLog.LOG.error("null nextFilter or sessionObj");
 39             return;
 40         }
 41
 42         // 移除容器 Buff
 43         removeContainerBuff(sessionObj);
 44         // 向下传递
 45         super.sessionClosed(nextFilter, sessionObj);
 46     }
 47
 48     @Override
 49     public void messageReceived(
 50         NextFilter nextFilter, IoSession sessionObj, Object msgObj) throws Exception {
 51         if (nextFilter == null ||
 52             sessionObj == null) {
 53             // 如果参数对象为空,
 54             // 则直接退出!
 55             FrameworkLog.LOG.error("null nextFilter or sessionObj");
 56             return;
 57         }
 58
 59         // 获取会话 UId
 60         long sessionUId = sessionObj.getId();
 61
 62         if (!(msgObj instanceof IoBuffer)) {
 63             // 如果消息对象不是 ByteBuff,
 64             // 则直接向下传递!
 65             FrameworkLog.LOG.warn("msgObj is not a IoBuff, sessionUId = " + sessionUId);
 66             super.messageReceived(nextFilter, sessionObj, msgObj);
 67         }
 68
 69         // 获取输入 Buff
 70         IoBuffer inBuff = (IoBuffer)msgObj;
 71
 72         if (!inBuff.hasRemaining()) {
 73             // 如果没有剩余内容,
 74             // 则直接退出!
 75             FrameworkLog.LOG.error("inBuff has not remaining, sessionUId = " + sessionUId);
 76             return;
 77         } else if (inBuff.remaining() <= 8) {
 78             // 如果 <= 8 字节,
 79             // 那还是执行粘包处理过程吧 ...
 80             // 8 字节 = 消息长度 ( Short ) + 消息类型 ( Short ) + 时间戳 ( Int )
 81             // 如果比这个长度都小,
 82             // 那肯定不是一条完整消息 ...
 83             this.msgRecv_0(nextFilter, sessionObj, inBuff);
 84             return;
 85         }
 86
 87         // 获取消息长度
 88         final int msgSize = inBuff.getShort();
 89         inBuff.position(0);
 90
 91         if (msgSize == inBuff.limit() &&
 92             containerBuffIsEmpty(sessionObj)) {
 93             //
 94             // 如果消息长度和极限值刚好相同,
 95             // 并且容器 Buff 中没有任何内容 ( 即, 上一次消息没有粘包 ),
 96             // 那么直接向下传递!
 97             //
 98             super.messageReceived(
 99                 nextFilter, sessionObj, inBuff
100             );
101         } else {
102             //
103             // 如果消息长度和极限值不同,
104             // 则说明是网络粘包!
105             // 这时候跳转到粘包处理过程 ...
106             //
107             this.msgRecv_0(nextFilter, sessionObj, inBuff);
108         }
109     }
110
111     /**
112      * 接收连包消息
113      *
114      * @param nextFilter
115      * @param sessionObj
116      * @param inBuff
117      * @throws Exception
118      *
119      */
120     private void msgRecv_0(
121         NextFilter nextFilter, IoSession sessionObj, IoBuffer inBuff) throws Exception {
122         if (nextFilter == null ||
123             sessionObj == null) {
124             // 如果参数对象为空,
125             // 则直接退出!
126             FrameworkLog.LOG.error("null nextFilter or sessionObj");
127             return;
128         }
129
130         // 获取会话 UId
131         long sessionUId = sessionObj.getId();
132         // 获取容器 Buff
133         IoBuffer containerBuff = getContainerBuff(sessionObj);
134
135         // 添加新 Buff 到容器 Buff 的末尾
136         IoBuffUtil.append(containerBuff, inBuff);
137         // 令 position = 0
138         containerBuff.position(0);
139
140 //        // 记录调试信息
141 //        FrameworkLog.LOG.debug("\nin = [ " + inBuff.getHexDump() + " ]");
142
143         for (int i = 0; ; i++) {
144 //            // 记录调试信息
145 //            FrameworkLog.LOG.debug(
146 //                "i = " + i
147 //                + "\nco = [ " + containerBuff.getHexDump() + " ]"
148 //                + "\nco.pos = " + containerBuff.position()
149 //                + "\nco.lim = " + containerBuff.limit()
150 //            );
151
152             if (containerBuff.remaining() < 4) {
153                 //
154                 // 如果剩余字节数 < 4,
155                 // 这样根本无法识别出消息类型 msgSerialUId ...
156                 // 直接退出!
157                 // 在退出前,
158                 // 准备好接收下一次消息!
159                 //
160                 IoBuffUtil.readyToNext(containerBuff);
161                 return;
162             }
163
164             // 获取原始位置
165             final int oldPos = containerBuff.position();
166             // 获取消息长度和类型
167             final int msgSize = containerBuff.getShort();
168             final int msgSerialUId = containerBuff.getShort();
169
170 //            // 记录调试信息
171 //            FrameworkLog.LOG.debug(
172 //                "i = " + i
173 //                + "\nmsgSize = " + msgSize
174 //                + "\nmsgSerialUId = " + msgSerialUId
175 //            );
176
177             // 还原原始位置
178             containerBuff.position(oldPos);
179
180             if (msgSerialUId == SpecialMsgSerialUId.CG_FLASH_POLICY ||
181                 msgSerialUId == SpecialMsgSerialUId.CG_QQ_TGW) {
182                 //
183                 // 如果是 Flash 安全策略消息,
184                 // 或者是腾讯网关消息,
185                 // 则尝试找一下 0 字节的位置 ...
186                 //
187                 int pos0 = IoBuffUtil.indexOf(containerBuff, (byte)0);
188
189                 if (pos0 <= -1) {
190                     // 如果找不到 0 字节的位置,
191                     // 则说明消息还没接收完,
192                     // 准备接受下次消息并直接退出!
193                     IoBuffUtil.readyToNext(containerBuff);
194                     return;
195                 }
196
197                 // 复制 Buff 内容
198                 containerBuff.position(0);
199                 IoBuffer realBuff = IoBuffUtil.copy(containerBuff, pos0);
200
201                 // 更新 Buff 位置
202                 final int newPos = containerBuff.position() + pos0;
203                 containerBuff.position(newPos);
204                 // 压缩容器 Buff
205                 IoBuffUtil.compact(containerBuff);
206
207                 // 向下传递
208                 super.messageReceived(
209                     nextFilter, sessionObj, realBuff
210                 );
211                 continue;
212             }
213
214             if (msgSize <= 0) {
215                 //
216                 // 如果消息长度 <= 0,
217                 // 则直接退出!
218                 // 这种情况可能是消息已经乱套了 ...
219                 // 还是重新来过吧!
220                 //
221                 FrameworkLog.LOG.error("i = " + i + ", msgSize = " + msgSize + ", sessionUId = " + sessionUId);
222                 // 将容器 Buff 内容清空
223                 containerBuff.position(0);
224                 containerBuff.flip();
225                 // 压缩容器 Buff
226                 IoBuffUtil.compact(containerBuff);
227                 return;
228             }
229
230             if (containerBuff.remaining() < msgSize) {
231                 //
232                 // 如果消息长度不够,
233                 // 则可能是出现网络粘包情况了 ...
234                 // 直接退出就可以了!
235                 //
236                 FrameworkLog.LOG.warn(
237                     "i = " + i
238                     + ", msgSize = " + msgSize
239                     + ", containerBuff.remaining = " + containerBuff.remaining()
240                     + ", sessionUId = " + sessionUId
241                 );
242
243                 // 准备接受下一次消息
244                 IoBuffUtil.readyToNext(containerBuff);
245                 return;
246             }
247
248             // 创建新 Buff 并复制字节内容
249             IoBuffer realBuff = IoBuffUtil.copy(containerBuff, msgSize);
250
251             if (realBuff == null) {
252                 //
253                 // 如果真实的 Buff 为空,
254                 // 则直接退出!
255                 // 这种情况可能也是消息乱套了 ...
256                 // 记录一下错误信息
257                 //
258                 FrameworkLog.LOG.error("i = " + i + ", null realBuff, sessionUId = " + sessionUId);
259             } else {
260 //                // 记录调试信息
261 //                FrameworkLog.LOG.debug(
262 //                    "i = " + i
263 //                    + "\nreal = [ " + realBuff.getHexDump() + " ]"
264 //                    + "\nreal.pos = " + realBuff.position()
265 //                    + "\nreal.lim = " + realBuff.limit()
266 //                );
267
268                 // 向下传递
269                 super.messageReceived(
270                     nextFilter, sessionObj, realBuff
271                 );
272             }
273
274             // 更新位置
275             containerBuff.position(containerBuff.position() + msgSize);
276             // 压缩容器 Buff
277             IoBuffUtil.compact(containerBuff);
278         }
279     }
280
281     /**
282      * 获取玩家的 Buff, 如果为空则新建一个!
283      *
284      * @param sessionObj
285      * @return
286      *
287      */
288     private static IoBuffer getContainerBuff(IoSession sessionObj) {
289         if (sessionObj == null) {
290             // 如果参数对象为空,
291             // 则直接退出!
292             return null;
293         }
294
295         // 获取会话 UId
296         long sessionUId = sessionObj.getId();
297         // 获取容器 Buff
298         IoBuffer containerBuff = _containerBuffMap.get(sessionUId);
299
300         if (containerBuff == null) {
301             // 创建缓存 Buff
302             containerBuff = IoBuffer.allocate(DECODE_MSG_LEN);
303             containerBuff.setAutoExpand(true);
304             containerBuff.setAutoShrink(true);
305             containerBuff.position(0);
306             containerBuff.flip();
307             // 缓存  Buff 对象
308             Object oldVal = _containerBuffMap.putIfAbsent(sessionUId, containerBuff);
309
310             if (oldVal != null) {
311                 FrameworkLog.LOG.warn("exists oldVal");
312             }
313         }
314
315         return containerBuff;
316     }
317
318     /**
319      * 移除容器 Buff
320      *
321      * @param sessionObj
322      *
323      */
324     private static void removeContainerBuff(IoSession sessionObj) {
325         if (sessionObj == null) {
326             // 如果参数对象为空,
327             // 则直接退出!
328             return;
329         }
330
331         // 获取会话 UId
332         long sessionUId = sessionObj.getId();
333         // 获取容器 Buff
334         IoBuffer containerBuff = _containerBuffMap.get(sessionUId);
335
336         if (containerBuff != null) {
337             // 是否所占资源
338             containerBuff.clear();
339         }
340
341         // 移除玩家的 Buff 对象
342         _containerBuffMap.remove(sessionUId);
343     }
344
345     /**
346      * 容器 Buff 为空 ?
347      *
348      * @param sessionObj
349      * @return
350      *
351      */
352     private static boolean containerBuffIsEmpty(IoSession sessionObj) {
353         if (sessionObj == null) {
354             // 如果参数对象为空,
355             // 则直接退出!
356             return false;
357         }
358
359         // 获取容器 Buff
360         IoBuffer containerBuff = getContainerBuff(sessionObj);
361
362         if (containerBuff == null) {
363             // 如果容器为空,
364             // 则直接退出!
365             FrameworkLog.LOG.error("null containerBuff, sessionUId = " + sessionObj.getId());
366             return false;
367         } else {
368             // 如果当前位置和极限值都为 0,
369             // 则判定为空!
370             return (containerBuff.position() == 0
371                  && containerBuff.limit() == 0);
372         }
373     }
374 }
时间: 2024-09-30 00:28:43

MINA 网络黏包处理代码的相关文章

测试网络丢包情况代码

import java.io.*; import java.util.regex.Matcher; import java.util.regex.Pattern; public class PingIpUtils { public static boolean isWindowsOS() { boolean isWindowsOS = false; String osName = System.getProperty("os.name"); if (osName.toLowerCase

网络编程中黏包的解决方案

解决方案一 问题的根源在于,接收端不知道发送端将要传送的字节流的长度,所以解决粘包的方法就是围绕,如何让发送端在发送数据前,把自己将要发送的字节流总大小让接收端知晓,然后接收端来一个死循环接收完所有数据. #_*_coding:utf-8_*_ import socket,subprocess ip_port=('127.0.0.1',8080) s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.setsockopt(socket.SOL_

网络编程-----黏包问题

一,黏包现象 我们通过一段简单程序来看看黏包现象: import socket sk=socket.socket() sk.bind(('127.0.0.1',8090)) sk.listen() conn,addr=sk.accept() while True: cmd=input(">>>") if cmd=='q': conn.send(b'q') break conn.send(cmd.encode('gbk')) res=conn.recv(1024).de

网络编程之黏包

当我们同时执行多条命令之后,得到的结果很可能只有一部分,在执行其他命令的时候又接收到之前执行的另外一部分结果,这种现象就是黏包. 黏包成因 TCP协议中的数据传递: tcp协议的拆包机制 当发送端缓冲区的长度大于网卡的MTU时,tcp会将这次发送的数据拆成几个数据包发送出去. MTU是Maximum Transmission Unit的缩写.意思是网络上传送的最大数据包.MTU的单位是字节. 大部分网络设备的MTU都是1500.如果本机的MTU比网关的MTU大,大的数据包就会被拆开来传送,这样会

《Python》网络编程之黏包

黏包 一.黏包现象 同时执行多条命令之后,得到的结果很可能只有一部分,在执行其他命令的时候又接收到之前执行的另外一部分结果,这种显现就是黏包. server端 import socket sk = socket.socket() sk.bind(('127.0.0.1', 9000)) sk.listen() conn, addr = sk.accept() conn.send(b'hello,') conn.send(b'world') conn.close() client端 import

四. 网络编程(TCP 黏包)

一 .黏包现象(TCP) 1.黏包成因 TCP协议中的数据传递 tcp协议的拆包机制 当发送端缓冲区的长度大于网卡的MTU时,tcp会将这次发送的数据拆成几个数据包发送出去. MTU是Maximum Transmission Unit的缩写.意思是网络上传送的最大数据包.MTU的单位是字节. 大部分网络设备的MTU都是1500.如果本机的MTU比网关的MTU大,大的数据包就会被拆开来传送,这样会产生很多数据包碎片,增加丢包率,降低网络速度 面向流的通信特点和Nagle算法 TCP(transpo

Python之网络编程 黏包

黏包现象 系统缓冲区 缓冲区的作用 没有缓冲区 , 如果你的网路出现短暂的异常或者波动, 接收数据就会出现短暂的中断, 影响你的下载或者上传的效率 但是凡事都有双刃剑, 缓冲区解决了上传下载的传输效率问题 也带来了黏包的问题 讲粘包之前先看看socket缓冲区的问题: 每个 socket 被创建后,都会分配两个缓冲区,输入缓冲区和输出缓冲区. write()/send() 并不立即向网络中传输数据,而是先将数据写入缓冲区中,再由TCP协议将数据从缓冲区发送到目标机器.一旦将数据写入到缓冲区,函数

网络TCp数据的传输设计(黏包处理)

//1.该片为引用别人的文章:http://www.cnblogs.com/alon/archive/2009/04/16/1437599.html 解决TCP网络传输"粘包"问题 解决TCP网络传输"粘包"问题 作者:杨小平 王胜开 原文出处:http://www.ciw.com.cn/ 当前在网络传输应用中,广泛采用的是TCP/IP通信协议及其标准的socket应用开发编程接口(API).TCP/IP传输层有两个并列的协议:TCP和UDP.其中TCP(trans

网络编程- 解决黏包现象方案一(六)

队列 利用队列的思路来解决黏包问题 总结 原文地址:https://www.cnblogs.com/mys6/p/10797907.html