3.Netty的粘包、拆包(二)

Netty提供的TCP数据拆包、粘包解决方案

1.前言

关于TCP的数据拆包、粘包的介绍,我在上一篇文章里面已经有过介绍。

想要了解一下的,请点击这里 Chick Here!

今天我们要讲解的是Netty提供的两种解决方案:

  1. DelimiterBasedFrameDecoder
  2. FixedLengthFrameDecoder

2.关于Decoder

  1. 先观察下两段代码的不同

    (1)使用StringDecoder之前

    @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
         try {
    
             ByteBuf in = (ByteBuf) msg;
             String str = in.toString(CharsetUtil.UTF_8);
             System.out.println("Client:"+str);
    
         } finally {
             ReferenceCountUtil.release(msg);
         }
     }

    (2)使用StringDecoder之后

    @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
         try {
             String str = (String) msg;
             System.out.println("Client:"+str);
    
         } finally {
             ReferenceCountUtil.release(msg);
         }
     }
  2. 关于Decoder

    decoder:n. 解码器

    在我看来,Netty数据的解析方式大概为:

    发送过程:Buffer------>数据报------>比特流

    接受过程:Buffer<------数据报<------比特流

    所以我们接受到的msg是一个ButeBuf

    使用了Decoder(这里使用StringDecoder举例)之后:

    发送过程:Buffer------>数据报------>比特流

    接受过程:String<------Buffer<------数据报<------比特流

    相当于ByteBuf按照StringDecoder的解码规则,把msg翻译成为了一个字符串。

  3. 如何使用Decoder

    (1)实际代码演示:

    package com.xm.netty.demo02;
    
    import java.net.InetSocketAddress;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Server {
    
     private final int port;
    
     public Server(int port) {
         this.port = port;
     }
    
     public static void main(String[] args) {
    
         int port = 8989;
         try {
             new Server(port).start();
         } catch (InterruptedException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
         }
    
     }
    
     private void start() throws InterruptedException {
         EventLoopGroup g1 = new NioEventLoopGroup();
         EventLoopGroup g2 = new NioEventLoopGroup();
         try {
             ServerBootstrap bootstrap = new ServerBootstrap();
              bootstrap
                     .group(g1,g2)
                     .channel(NioServerSocketChannel.class)
                     .localAddress(new InetSocketAddress( port))
                     .childHandler(new ChannelInitializer() {
                         @Override
                         protected void initChannel(Channel ch) throws Exception {
                             ch.pipeline().addLast(new StringDecoder());
                             ch.pipeline().addLast(new ServerHandler());
                         }
                     });
              ChannelFuture future = bootstrap.bind().sync();
              future.channel().closeFuture().sync();
         } finally {
             g1.shutdownGracefully().sync();
             g2.shutdownGracefully().sync();
         }
     }
    
    }

    代码改动:

    ch.pipeline().addLast(new StringDecoder());

    ?

    ? ch.pipeline().addLast(new ServerHandler());

    (2)多个Decoder的使用顺序:

    从前往后,依次解码

    ?

    假设我们有个通过字符串变化为时间的TimeDecoder:

    ch.pipeline().addLast(new StringDecoder());

    ch.pipeline().addLast(new TimeDecoder());

    ? ch.pipeline().addLast(new ServerHandler());

    解析规则为:

3.DelimiterBasedFrameDecoder

  1. 关于DelimiterBasedFrameDecoder

    其实很简单,就是在一个缓冲区的末尾添加一个结束字符。

    在规定了最大长度的缓冲区里,遇到一个特殊字符,就截取一次。

    原理类似于String的split()方法。

  2. 代码实现

    (1)服务端Server

    package com.xm.netty.demo03;
    
    import java.net.InetSocketAddress;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Server {
    
     private final int port;
    
     public Server(int port) {
         this.port = port;
     }
    
     public static void main(String[] args) {
    
         int port = 8989;
         try {
             new Server(port).start();
         } catch (InterruptedException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
         }
    
     }
    
     private void start() throws InterruptedException {
         EventLoopGroup g1 = new NioEventLoopGroup();
         EventLoopGroup g2 = new NioEventLoopGroup();
         try {
             ServerBootstrap bootstrap = new ServerBootstrap();
              bootstrap
                     .group(g1,g2)
                     .channel(NioServerSocketChannel.class)
                     .localAddress(new InetSocketAddress( port))
                     .childHandler(new ChannelInitializer() {
                         @Override
                         protected void initChannel(Channel ch) throws Exception {
                             ByteBuf buf = Unpooled.copiedBuffer("$".getBytes());
                             ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf));
                             ch.pipeline().addLast(new StringDecoder());
                             ch.pipeline().addLast(new ServerHandler());
                         }
                     });
              ChannelFuture future = bootstrap.bind().sync();
              future.channel().closeFuture().sync();
         } finally {
             g1.shutdownGracefully().sync();
             g2.shutdownGracefully().sync();
         }
     }
    
    }

    (2)服务端ServerHandler

    package com.xm.netty.demo03;
    
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.CharsetUtil;
    
    public class ServerHandler extends ChannelHandlerAdapter {
    
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
         String str = (String) msg;
         System.out.println("Server:"+str);
         str = "服务器返回--->"+ str+"$";
         ctx.writeAndFlush(Unpooled.copiedBuffer(str.getBytes()));
     }
    
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
         cause.printStackTrace();
         ctx.close();
     }
    
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         System.out.println(DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now())+"一个客户端连接上服务器!");
     }
    
    }
    

    (3)客户端Client

    package com.xm.netty.demo03;
    
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Client {
    
     private final int port;
     private final String host;
    
     public Client(int port, String host) {
         this.port = port;
         this.host = host;
     }
    
     public static void main(String[] args) {
         String host = "127.0.0.1";
         int port = 8989;
         try {
             new Client(port, host).start();
         } catch (InterruptedException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
         }
     }
    
     private void start() throws InterruptedException {
    
         EventLoopGroup group = new NioEventLoopGroup();
         try {
             Bootstrap bootstrap = new Bootstrap();
             bootstrap
                     .group(group)
                     .channel(NioSocketChannel.class)
                     .remoteAddress(host, port)
                     .handler(new ChannelInitializer<SocketChannel>() {
    
                         @Override
                         protected void initChannel(SocketChannel ch) throws Exception {
                             ByteBuf buf = Unpooled.copiedBuffer("$".getBytes());
                             ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf));
                             ch.pipeline().addLast(new StringDecoder());
                             ch.pipeline().addLast(new ClientHandler());
                         }
    
                     });
    
             ChannelFuture future = bootstrap.connect().sync();
    
             for(int i=10;i<20;i++) {
                 String str = DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now()) + "---- " +i+"<<<$";
                 future.channel().write(Unpooled.copiedBuffer(str.getBytes()));
             }
    
             future.channel().flush();
    
             //future.channel().writeAndFlush(Unpooled.copiedBuffer("Hello Netty!".getBytes()));
    
             future.channel().closeFuture().sync();
         } finally {
             group.shutdownGracefully().sync();
         }
    
     }
    
    }
    

    (4)客户端ClientHandler

    package com.xm.netty.demo03;
    
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.CharsetUtil;
    import io.netty.util.ReferenceCountUtil;
    
    public class ClientHandler extends ChannelHandlerAdapter {
    
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
         try {
             String str = (String) msg;
             System.out.println("Client:"+str);
    
         } finally {
             ReferenceCountUtil.release(msg);
         }
     }
    
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
         cause.printStackTrace();
         ctx.close();
     }
    
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         System.out.println(DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now())+"已连接服务器!");
     }
    
    }
    
  3. 运行结果截图

    (1)服务端运行结果:

    (2)客户端运行结果:

4.FixedLengthFrameDecoder

  1. 关于FixedLengthFrameDecoder

    其实很简单,就是对规定的发送的数据进行限制长度,

    当符合这个长度的情况下,就可以解析。

    假设你发送一个’123456‘,’654321‘

    那么解析的状况为’12345‘,’66543‘

  2. 代码实现

    (1)服务端Server

    package com.xm.netty.demo04;
    
    import java.net.InetSocketAddress;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.FixedLengthFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Server {
    
     private final int port;
    
     public Server(int port) {
         this.port = port;
     }
    
     public static void main(String[] args) {
    
         int port = 8989;
         try {
             new Server(port).start();
         } catch (InterruptedException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
         }
    
     }
    
     private void start() throws InterruptedException {
         EventLoopGroup g1 = new NioEventLoopGroup();
         EventLoopGroup g2 = new NioEventLoopGroup();
         try {
             ServerBootstrap bootstrap = new ServerBootstrap();
              bootstrap
                     .group(g1,g2)
                     .channel(NioServerSocketChannel.class)
                     .localAddress(new InetSocketAddress( port))
                     .childHandler(new ChannelInitializer() {
                         @Override
                         protected void initChannel(Channel ch) throws Exception {
                             ch.pipeline().addLast(new FixedLengthFrameDecoder(5));
                             ch.pipeline().addLast(new StringDecoder());
                             ch.pipeline().addLast(new ServerHandler());
                         }
                     });
              ChannelFuture future = bootstrap.bind().sync();
              future.channel().closeFuture().sync();
         } finally {
             g1.shutdownGracefully().sync();
             g2.shutdownGracefully().sync();
         }
     }
    
    }

    (2)服务端ServerHandler

    package com.xm.netty.demo04;
    
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.CharsetUtil;
    
    public class ServerHandler extends ChannelHandlerAdapter {
    
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
         String str = (String) msg;
         System.out.println("Server:"+str);
         ctx.writeAndFlush(Unpooled.copiedBuffer(str.getBytes()));
     }
    
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
         cause.printStackTrace();
         ctx.close();
     }
    
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         System.out.println(DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now())+"一个客户端连接上服务器!");
     }
    
    }
    

    (3)客户端Client

    package com.xm.netty.demo04;
    
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.FixedLengthFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Client {
    
     private final int port;
     private final String host;
    
     public Client(int port, String host) {
         this.port = port;
         this.host = host;
     }
    
     public static void main(String[] args) {
         String host = "127.0.0.1";
         int port = 8989;
         try {
             new Client(port, host).start();
         } catch (InterruptedException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
         }
     }
    
     private void start() throws InterruptedException {
    
         EventLoopGroup group = new NioEventLoopGroup();
         try {
             Bootstrap bootstrap = new Bootstrap();
             bootstrap
                     .group(group)
                     .channel(NioSocketChannel.class)
                     .remoteAddress(host, port)
                     .handler(new ChannelInitializer<SocketChannel>() {
    
                         @Override
                         protected void initChannel(SocketChannel ch) throws Exception {
                             ch.pipeline().addLast(new FixedLengthFrameDecoder(5));
                             ch.pipeline().addLast(new StringDecoder());
                             ch.pipeline().addLast(new ClientHandler());
                         }
    
                     });
    
             ChannelFuture future = bootstrap.connect().sync();
    
             for(int i=123450;i<123460;i++) {
                 String str = ""+i;
                 future.channel().write(Unpooled.copiedBuffer(str.getBytes()));
             }
             future.channel().flush();
    
             //future.channel().writeAndFlush(Unpooled.copiedBuffer("Hello Netty!".getBytes()));
    
             future.channel().closeFuture().sync();
         } finally {
             group.shutdownGracefully().sync();
         }
    
     }
    
    }
    

    (4)客户端ClientHandler

    package com.xm.netty.demo04;
    
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.CharsetUtil;
    import io.netty.util.ReferenceCountUtil;
    
    public class ClientHandler extends ChannelHandlerAdapter {
    
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
         try {
             String str = (String) msg;
             System.out.println("Client:"+str);
    
         } finally {
             ReferenceCountUtil.release(msg);
         }
     }
    
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
         cause.printStackTrace();
         ctx.close();
     }
    
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         System.out.println(DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now())+"已连接服务器!");
     }
    
    }
    
  3. 运行结果截图

    (1)服务端运行结果:

    (2)客户端运行结果:

原文地址:https://www.cnblogs.com/TimerHotel/p/netty03.html

时间: 2024-07-31 07:03:50

3.Netty的粘包、拆包(二)的相关文章

【游戏开发】Netty TCP粘包/拆包问题的解决办法(二)

上一篇:[Netty4.X]Unity客户端与Netty服务器的网络通信(一) 一.什么是TCP粘包/拆包 如图所示,假如客户端分别发送两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4中情况: 第一种情况:Server端分别读取到D1和D2,没有产生粘包和拆包的情况. 第二种情况:Server端一次接收到两个数据包,D1和D2粘合在一起,被称为TCP粘包. 第三种情况:Server端分2次读取到2个数据包,第一次读取到D1包和D2包的部分内容D2_1,第二次

Netty学习之TCP粘包/拆包

一.TCP粘包/拆包问题说明,如图 二.未考虑TCP粘包导致功能异常案例 按照设计初衷,服务端应该收到100条查询时间指令的请求查询,客户端应该打印100次服务端的系统时间 1.服务端类 package com.phei.netty.s2016042302; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitial

Netty(三)TCP粘包拆包处理

tcp是一个“流”的协议,一个完整的包可能会被TCP拆分成多个包进行发送,也可能把小的封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题. 粘包.拆包问题说明 假设客户端分别发送数据包D1和D2给服务端,由于服务端一次性读取到的字节数是不确定的,所以可能存在以下4种情况. 1.服务端分2次读取到了两个独立的包,分别是D1,D2,没有粘包和拆包: 2.服务端一次性接收了两个包,D1和D2粘在一起了,被成为TCP粘包; 3.服务端分2次读取到了两个数据包,第一次读取到了完整的D1和D2包的部

netty权威指南--------第四章TCP粘包/拆包问题

第三章中的示例用于功能测试一般没有问题,但当压力上来或者发送大报文时,就会存在粘包/拆包问题. 这时就需要使用LineBasedFrameDecoder+StringDecoder client端请求改为连续的100次 package com.xiaobing.netty.fourth; import java.net.SocketAddress; import org.omg.CORBA.Request; import io.netty.buffer.ByteBuf; import io.ne

Netty 粘包 &amp; 拆包 &amp; 编码 &amp; 解码 &amp; 序列化 介绍

目录: 粘包 & 拆包及解决方案 ByteToMessageDecoder 基于长度编解码器 基于分割符的编解码器 google 的 Protobuf 序列化介绍 其他的 前言 Netty 作为一个网络框架,对 TCP 连接中的问题都做了全面的考虑,比如粘包拆包导致的半包问题,如何编解码,如何实现私有协议,序列化等等.本文主要针对这些问题做一个简单介绍,目的是想对整个 Netty 的编解码框架做一个全盘的审视,以确保在后面的源码学习中不会一叶障目不见泰山. 1. 粘包 & 拆包及解决方案

Netty解决粘包和拆包问题的四种方案

在RPC框架中,粘包和拆包问题是必须解决一个问题,因为RPC框架中,各个微服务相互之间都是维系了一个TCP长连接,比如dubbo就是一个全双工的长连接.由于微服务往对方发送信息的时候,所有的请求都是使用的同一个连接,这样就会产生粘包和拆包的问题.本文首先会对粘包和拆包问题进行描述,然后介绍其常用的解决方案,最后会对Netty提供的几种解决方案进行讲解.这里说明一下,由于oschina将"jie ma qi"认定为敏感文字,因而本文统一使用"解码一器"表示该含义 粘包

Netty中粘包和拆包的解决方案

粘包和拆包是TCP网络编程中不可避免的,无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层的粘包/拆包机制. TCP粘包和拆包 TCP是个“流”协议,所谓流,就是没有界限的一串数据.TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题. 如图所示,假设客户端分别发送了两个数据包D1和D2给服务端,

服务端NETTY 客户端非NETTY处理粘包和拆包的问题

之前为了调式和方便一直没有处理粘包的问题,今天专门花了时间来搞NETTY的粘包处理,要知道在高并发下,不处理粘包是不可能的,数据流的混乱会造成业务的崩溃什么的我就不说了.所以这个问题 在我心里一直是个结. 使用NETTY真的很幸福,以前用C写服务端 还的自己处理粘包的问题 各种痛苦 不过那也是基本功 没办法的事情.在NETTY里面 有几个拆个包器 我使用的是 LengthFileldBasedFrameDecoder,这个用来解析带有长度属性的包,只要我们在传输协议中加入包的总长度就行 arg0

netty解决tcp粘包拆包问题

tcp粘包拆包解决方案 1.发送定长的消息 server端:                    EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup)  .channel(NioServerSocketChannel.cl

Netty5入门学习笔记003-TCP粘包/拆包问题的解决之道(下)

TCP网络通信时候会发生粘包/拆包的问题,上节使用定长解码器解码,本次使用Netty提供的特殊分隔符解码器 还是用上节中的代码例子,但是只需要修改一下发送的消息和配置一下解码器就可以了 客户端发送消息中添加分隔符做为指令的结束符,模拟多条指令粘包发出 服务器配置分隔符解码器使用&符号拆包 运行结果: 服务器使用分隔符解码器成功拆包. 当然还有更复杂的自定义协议处理TCP粘包/拆包问题,后续深入学习后在进行讨论. 史上最高性价比PS教程-敬伟Photoshop经典教程