Netty的TCP粘包/拆包(源码二)

假设客户端分别发送了两个数据包D1和D2给服务器,由于服务器端一次读取到的字节数是不确定的,所以可能发生四种情况:

  1、服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包。

  2、服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包

  3、服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包

  4、服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余内容D1_2和D2包的整包

  如果此时服务端TCP接收滑窗非常小,而数据包D1和D2比较大,很有可能会发生第五种可能,即服务端分多次才能将D1和D2包接收完全,期间发生多次拆包。  

那么在Netty中可使用LineBasedFrameDecoderStringDecoder

  LineBasedFrameDecoder的工作原理是一次遍历ByteBuf中的可读字节,判断看是否有"\n"或者"\r\n",如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。

  StringDecoder将接收到的对象转换成字符串,然后继续调用后面的handler。

  利用LineBasedFrameDecoder解决TCP粘包问题:

 1 package netty;
 2
 3 import io.netty.bootstrap.ServerBootstrap;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.ChannelOption;
 7 import io.netty.channel.EventLoopGroup;
 8 import io.netty.channel.nio.NioEventLoopGroup;
 9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioServerSocketChannel;
11 import io.netty.handler.codec.LineBasedFrameDecoder;
12 import io.netty.handler.codec.string.StringDecoder;
13
14
15 public class TimeServer {
16
17     public void bind(int port) throws Exception{
18         //配置服务端的NIO线程组
19         EventLoopGroup bossGroup = new NioEventLoopGroup();
20         EventLoopGroup workerGroup = new NioEventLoopGroup();
21         try{
22             ServerBootstrap b = new ServerBootstrap();
23             b.group(bossGroup,workerGroup)
24             .channel(NioServerSocketChannel.class)
25             .option(ChannelOption.SO_BACKLOG, 1024)
26             .childHandler(new ChildChannelHandler());
27             //绑定端口,同步等待成功
28             ChannelFuture f = b.bind(port).sync();
29             //等待服务器监听端口关闭
30             f.channel().closeFuture().sync();
31         }finally{
32             //优雅退出,释放线程池资源
33             bossGroup.shutdownGracefully();
34             workerGroup.shutdownGracefully();
35         }
36     }
37
38     private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
39         @Override
40         protected void initChannel(SocketChannel arg0) throws Exception{
41             arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
42             arg0.pipeline().addLast(new StringDecoder());
43             arg0.pipeline().addLast(new TimeServerHandler());
44         }
45     }
46
47     public static void main(String args[]) throws Exception{
48         int port = 10001;
49         if(args != null && args.length > 0){
50             try{
51                 port = Integer.valueOf(args[0]);
52             }catch(NumberFormatException e){
53                 //采用默认值
54             }
55         }
56         new TimeServer().bind(port);
57     }
58 }

TimeServerHandler, msg是删除回车换行符后的请求消息,不需要额外考虑处理半包问题,也不需要对请求消息进行编码:

 1 import java.io.IOException;
 2 import io.netty.buffer.ByteBuf;
 3 import io.netty.buffer.Unpooled;
 4 import io.netty.channel.ChannelHandlerAdapter;
 5 import io.netty.channel.ChannelHandlerContext;
 6
 7 public class TimeServerHandler extends ChannelHandlerAdapter{
 8
 9     private int counter;
10
11     public void channelRead(ChannelHandlerContext ctx,Object msg) throws IOException{
12
13         String body = (String)msg;
14         System.out.println("The time server receive order:" + body + "; the counter is :" + ++counter);
15         String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)? new java.util.Date(
16                 System.currentTimeMillis()).toString() : "BAD ORDER";
17         currentTime = currentTime + System.getProperty("line.separator");
18         ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
19         ctx.writeAndFlush(resp);
20     }
21
22     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception{
23         ctx.flush();
24     }
25
26     @Override
27     public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
28         ctx.close();
29     }
30 }

TimeClient,在TimeClientHandler之前新增lineBasedFrameDecoder和StringDecoder解码器:

 1 import io.netty.bootstrap.Bootstrap;
 2 import io.netty.channel.ChannelFuture;
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.ChannelOption;
 5 import io.netty.channel.EventLoopGroup;
 6 import io.netty.channel.nio.NioEventLoopGroup;
 7 import io.netty.channel.socket.SocketChannel;
 8 import io.netty.channel.socket.nio.NioSocketChannel;
 9 import io.netty.handler.codec.LineBasedFrameDecoder;
10 import io.netty.handler.codec.string.StringDecoder;
11
12 public class TimeClient {
13
14     public void connect(int port,String host) throws Exception{
15         //创建客户端处理I/O读写的NioEventLoopGroup Group线程组
16         EventLoopGroup group = new NioEventLoopGroup();
17         try{
18             //创建客户端辅助启动类Bootstrap
19             Bootstrap b = new Bootstrap();
20             b.group(group).channel(NioSocketChannel.class)
21             .option(ChannelOption.TCP_NODELAY, true)
22             .handler(new ChannelInitializer<SocketChannel>(){
23                 //将ChannelHandler设置到ChannelPipleline中,用于处理网络I/O事件
24                 @Override
25                 protected void initChannel(SocketChannel ch) throws Exception {
26                     ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
27                     ch.pipeline().addLast(new StringDecoder());
28                     ch.pipeline().addLast(new TimeClientHandler());
29                 }
30             });
31             //发起异步连接操作,然后调用同步方法等待连接成功。
32             ChannelFuture f = b.connect(host,port).sync();
33
34             //等待客户端链路关闭
35             f.channel().closeFuture().sync();
36         }finally{
37             //优雅退出,释放NIO线程组
38             group.shutdownGracefully();
39         }
40     }
41
42     public static void main(String[] args) throws Exception{
43         int port = 10001;
44         if(args != null && args.length > 0){
45             try{
46                 port = Integer.valueOf(args[0]);
47             }catch(NumberFormatException e){
48                 //采用默认值
49             }
50         }
51         new TimeClient().connect(port, "0.0.0.0");
52     }
53
54 }

TimeClientHandler,拿到的msg已经是解码成字符串之后的应答消息:

 1 import io.netty.buffer.ByteBuf;
 2 import io.netty.buffer.Unpooled;
 3 import io.netty.channel.ChannelHandlerAdapter;
 4 import io.netty.channel.ChannelHandlerContext;
 5
 6 import java.util.logging.Logger;
 7
 8 public class TimeClientHandler extends ChannelHandlerAdapter{
 9
10     private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());
11
12     private int counter;
13
14     private byte[]req;
15
16     public TimeClientHandler(){
17          req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
18     }
19
20     //当客户端与服务端TCP链路简历成功后,Netty的NIO线程会调用该方法,发送查询时间的指令给服务器
21     public void channelActive(ChannelHandlerContext ctx){
22         //将请求消息发送给服务端
23         ByteBuf message = null;
24         for(int i = 0;i<100;i++){
25             message = Unpooled.buffer(req.length);
26             message.writeBytes(req);
27             ctx.writeAndFlush(message);
28         }
29     }
30
31     //当服务器返回应答消息时,该方法被调用
32     public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
33         String body = (String)msg;
34         System.out.println("Now is:" + body + "; the counter is :" + ++counter);
35     }
36
37     public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
38
39         //释放资源
40         logger.warning("Unexpected exception from downstream :" + cause.getMessage());
41         ctx.close();
42     }
43 }

运行结果:

发现。。就木有粘包或拆包的问题啦~~~~

机缘巧合,同事也一起实现了Scala版~

clientHandler:

 1 package main.nettyscala
 2
 3 import io.netty.buffer.{ByteBuf, Unpooled}
 4 import io.netty.channel.{ChannelInboundHandlerAdapter, ChannelHandlerContext, ChannelHandlerAdapter}
 5
 6 /**
 7  * Created by root on 2016/11/18.
 8  */
 9 class ClientHandler extends ChannelInboundHandlerAdapter {
10   override def channelActive(ctx: ChannelHandlerContext): Unit = {
11     println("channelActive")
12     //val content = "hello server"
13     val content = Console.readLine()
14     ctx.writeAndFlush(Unpooled.copiedBuffer(content.getBytes("UTF-8")))
15     //发送case class 不在发送字符串了,封装一个字符串
16     //    ctx.writeAndFlush(RegisterMsg("hello server"))
17   }
18
19   override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {
20     println("channelRead")
21     val byteBuf = msg.asInstanceOf[ByteBuf]
22     val bytes = new Array[Byte](byteBuf.readableBytes())
23     byteBuf.readBytes(bytes)
24     val message = new String(bytes, "UTF-8")
25     println(message)
26   }
27
28   override def channelReadComplete(ctx: ChannelHandlerContext): Unit = {
29     println("channeReadComplete")
30     ctx.flush()
31   }
32   //发送异常时关闭
33   override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
34     println("exceptionCaught")
35     ctx.close()
36   }
37
38 }

NettyClient:

 1 package main.nettyscala
 2
 3 import io.netty.bootstrap.Bootstrap
 4 import io.netty.channel.ChannelInitializer
 5 import io.netty.channel.nio.NioEventLoopGroup
 6 import io.netty.channel.socket.SocketChannel
 7 import io.netty.channel.socket.nio.{NioSocketChannel, NioServerSocketChannel}
 8 import io.netty.handler.codec.serialization.{ClassResolvers, ObjectDecoder, ObjectEncoder}
 9
10
11 object NettyClient {
12   def main(args: Array[String]) {
13     val host = args(0)
14     val port = args(1).toInt
15     val client = new NettyClient
16     client.connect(host, port)
17   }
18 }
19
20 class NettyClient {
21   def connect(host: String, port: Int): Unit = {
22     //创建客户端NIO线程组
23     val eventGroup = new NioEventLoopGroup
24     //创建客户端辅助启动类
25     val bootstrap = new Bootstrap
26     try {
27       //将NIO线程组传入到Bootstrap
28       bootstrap.group(eventGroup)
29         //创建NioSocketChannel
30         .channel(classOf[NioSocketChannel])
31         //绑定I/O事件处理类
32         .handler(new ChannelInitializer[SocketChannel] {
33         override def initChannel(ch: SocketChannel): Unit = {
34           ch.pipeline().addLast(
35             //            new ObjectEncoder,
36             //            new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader)),
37             new ClientHandler
38           )
39         }
40       })
41       //发起异步连接操作
42       val channelFuture = bootstrap.connect(host, port).sync()
43       //等待服务关闭
44       channelFuture.channel().closeFuture().sync()
45     } finally {
46       //优雅的退出,释放线程池资源
47       eventGroup.shutdownGracefully()
48     }
49   }
50 }

NettyServer:

 1 package main.nettyscala
 2
 3 /**
 4  * Created by root on 12/8/16.
 5  */
 6 import io.netty.bootstrap.ServerBootstrap
 7 import io.netty.channel.ChannelInitializer
 8 import io.netty.channel.nio.NioEventLoopGroup
 9 import io.netty.channel.socket.SocketChannel
10 import io.netty.channel.socket.nio.NioServerSocketChannel
11 import io.netty.handler.codec.serialization.{ClassResolvers, ClassResolver, ObjectDecoder, ObjectEncoder}
12
13
14 object NettyServer {
15   def main(args: Array[String]) {
16     val host = args(0)
17     val port = args(1).toInt
18     val server = new NettyServer
19     server.bind(host, port)
20   }
21 }
22 class NettyServer {
23   def bind(host: String, port: Int): Unit = {
24     //配置服务端线程池组
25     //用于服务器接收客户端连接
26     val bossGroup = new NioEventLoopGroup()
27     //用户进行SocketChannel的网络读写
28     val workerGroup = new NioEventLoopGroup()
29
30     try {
31       //是Netty用户启动NIO服务端的辅助启动类,降低服务端的开发复杂度
32       val bootstrap = new ServerBootstrap()
33       //将两个NIO线程组作为参数传入到ServerBootstrap
34       bootstrap.group(bossGroup, workerGroup)
35         //创建NioServerSocketChannel
36         .channel(classOf[NioServerSocketChannel])
37         //绑定I/O事件处理类
38         .childHandler(new ChannelInitializer[SocketChannel] {
39         override def initChannel(ch: SocketChannel): Unit = {
40           ch.pipeline().addLast(
41             //            new ObjectEncoder,
42             //            new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader)),
43             new ServerHandler
44           )
45         }
46       })
47       //绑定端口,调用sync方法等待绑定操作完成
48       val channelFuture = bootstrap.bind(host, port).sync()
49       //等待服务关闭
50       channelFuture.channel().closeFuture().sync()
51     } finally {
52       //优雅的退出,释放线程池资源
53       bossGroup.shutdownGracefully()
54       workerGroup.shutdownGracefully()
55     }
56   }
57 }

ServerHandler:

 1 package main.nettyscala
 2
 3 /**
 4  * Created by root on 12/8/16.
 5  */
 6 import io.netty.buffer.{Unpooled, ByteBuf}
 7 import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}
 8
 9 /**
10  * Created by root on 2016/11/18.
11  */
12 class ServerHandler extends ChannelInboundHandlerAdapter {
13   /**
14    * 有客户端建立连接后调用
15    */
16   override def channelActive(ctx: ChannelHandlerContext): Unit = {
17     println("channelActive invoked")
18   }
19
20   /**
21    * 接受客户端发送来的消息
22    */
23   override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {
24     println("channelRead invoked")
25     val byteBuf = msg.asInstanceOf[ByteBuf]
26     val bytes = new Array[Byte](byteBuf.readableBytes())
27     byteBuf.readBytes(bytes)
28     val message = new String(bytes, "UTF-8")
29     println(message)
30     val back = "received message: " + message
31     val resp = Unpooled.copiedBuffer(back.getBytes("UTF-8"))
32     println(msg)
33     ctx.write(resp)
34   }
35
36   /**
37    * 将消息对列中的数据写入到SocketChanne并发送给对方
38    */
39   override def channelReadComplete(ctx: ChannelHandlerContext): Unit = {
40     println("channekReadComplete invoked")
41     ctx.flush()
42   }
43
44
45 }

RegisterMsg:

1 package main.nettyscala
2
3 /**
4  * Created by root on 12/8/16.
5  */
6 case class RegisterMsg(content: String) extends Serializable

运行结果:

时间: 2024-12-28 00:46:14

Netty的TCP粘包/拆包(源码二)的相关文章

netty解决tcp粘包拆包问题

tcp粘包拆包解决方案 1.发送定长的消息 server端:                    EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup)  .channel(NioServerSocketChannel.cl

Netty解决TCP粘包/拆包问题 - 按行分隔字符串解码器

服务端 package org.zln.netty.five.timer; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; impo

Netty(三)TCP粘包拆包处理

tcp是一个“流”的协议,一个完整的包可能会被TCP拆分成多个包进行发送,也可能把小的封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题. 粘包.拆包问题说明 假设客户端分别发送数据包D1和D2给服务端,由于服务端一次性读取到的字节数是不确定的,所以可能存在以下4种情况. 1.服务端分2次读取到了两个独立的包,分别是D1,D2,没有粘包和拆包: 2.服务端一次性接收了两个包,D1和D2粘在一起了,被成为TCP粘包; 3.服务端分2次读取到了两个数据包,第一次读取到了完整的D1和D2包的部

Netty学习之TCP粘包/拆包

一.TCP粘包/拆包问题说明,如图 二.未考虑TCP粘包导致功能异常案例 按照设计初衷,服务端应该收到100条查询时间指令的请求查询,客户端应该打印100次服务端的系统时间 1.服务端类 package com.phei.netty.s2016042302; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitial

【游戏开发】Netty TCP粘包/拆包问题的解决办法(二)

上一篇:[Netty4.X]Unity客户端与Netty服务器的网络通信(一) 一.什么是TCP粘包/拆包 如图所示,假如客户端分别发送两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4中情况: 第一种情况:Server端分别读取到D1和D2,没有产生粘包和拆包的情况. 第二种情况:Server端一次接收到两个数据包,D1和D2粘合在一起,被称为TCP粘包. 第三种情况:Server端分2次读取到2个数据包,第一次读取到D1包和D2包的部分内容D2_1,第二次

TCP粘包/拆包问题

无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层的粘包/拆包机制. TCP粘包/拆包 TCP是个"流"协议,所谓流,就是没有界限的一串数据.大家可以想想河里的流水,是连成一片的,其间并没有分界线.TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题. TCP粘包/拆包问题说明 假设客户

TCP 粘包/拆包问题

简介 TCP 是一个’流’协议,所谓流,就是没有界限的一串数据. 大家可以想想河里的流水,是连成一片的.期间并没有分界线, TCP 底层并不了解上层业务数据的具体含义 ,它会根据 TCP 缓冲区的实际情况进行包得划分,所以在业务上认为,一个完整的包可能会被 TCP 拆分成多个包进行发送 . 也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的 TCP 拆包和粘包. TCP 粘包/拆包问题说明 我们可以通过图解对 TCP 粘包和拆包进行说明.粘包问题示例图: 假设客户端分别发送了两个数据包

netty 解决TCP粘包与拆包问题(二)

TCP以流的方式进行数据传输,上层应用协议为了对消息的区分,采用了以下几种方法. 1.消息固定长度 2.第一篇讲的回车换行符形式 3.以特殊字符作为消息结束符的形式 4.通过消息头中定义长度字段来标识消息的总长度 一.采用指定分割符解决粘包与拆包问题 服务端 1 package com.ming.netty.nio.stickpack; 2 3 4 5 import java.net.InetSocketAddress; 6 7 import io.netty.bootstrap.ServerB

netty权威指南--------第四章TCP粘包/拆包问题

第三章中的示例用于功能测试一般没有问题,但当压力上来或者发送大报文时,就会存在粘包/拆包问题. 这时就需要使用LineBasedFrameDecoder+StringDecoder client端请求改为连续的100次 package com.xiaobing.netty.fourth; import java.net.SocketAddress; import org.omg.CORBA.Request; import io.netty.buffer.ByteBuf; import io.ne