Netty解决TCP粘包/拆包问题 - 按行分隔字符串解码器

服务端

package org.zln.netty.five.timer;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 时间服务器服务端
 * Created by sherry on 16/11/5.
 */
public class TimerServer {
    /**
     * 服务端绑定端口号
     */
    private int PORT;

    public TimerServer(int PORT) {
        this.PORT = PORT;
    }

    /**
     * 日志
     */
    private static Logger logger = LoggerFactory.getLogger(TimerServer.class);

    public void bind() {
        /*
        NioEventLoopGroup是线程池组
        包含了一组NIO线程,专门用于网络事件的处理
        bossGroup:服务端,接收客户端连接
        workGroup:进行SocketChannel的网络读写
         */
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            /*
            ServerBootstrap:用于启动NIO服务的辅助类,目的是降低服务端的开发复杂度
             */
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)//配置TCP参数,能够设置很多,这里就只设置了backlog=1024,
                    .childHandler(new TimerServerInitializer());//绑定I/O事件处理类
            logger.debug("绑定端口号:" + PORT + ",等待同步成功");
            /*
            bind:绑定端口
            sync:同步阻塞方法,等待绑定完成,完成后返回 ChannelFuture ,主要用于通知回调
             */
            ChannelFuture channelFuture = serverBootstrap.bind(PORT).sync();
            logger.debug("等待服务端监听窗口关闭");
            /*
             closeFuture().sync():为了阻塞,服务端链路关闭后才退出.也是一个同步阻塞方法
             */
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
        } finally {
            logger.debug("优雅退出,释放线程池资源");
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

TimerServer

 1 package org.zln.netty.five.timer;
 2
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.ChannelPipeline;
 5 import io.netty.channel.socket.SocketChannel;
 6 import io.netty.handler.codec.LineBasedFrameDecoder;
 7 import io.netty.handler.codec.string.StringDecoder;
 8
 9 /**
10  * Created by sherry on 16/11/5.
11  */
12 public class TimerServerInitializer extends ChannelInitializer<SocketChannel> {
13     @Override
14     protected void initChannel(SocketChannel socketChannel) throws Exception {
15
16         ChannelPipeline pipeline = socketChannel.pipeline();
17
18         pipeline.addLast(new LineBasedFrameDecoder(1024));
19         pipeline.addLast(new StringDecoder());
20         pipeline.addLast(new TimerServerHandler());
21
22
23
24     }
25 }

TimerServerInitializer

 1 package org.zln.netty.five.timer;
 2
 3 import io.netty.buffer.ByteBuf;
 4 import io.netty.buffer.Unpooled;
 5 import io.netty.channel.ChannelHandlerAdapter;
 6 import io.netty.channel.ChannelHandlerContext;
 7 import org.slf4j.Logger;
 8 import org.slf4j.LoggerFactory;
 9
10 import java.text.SimpleDateFormat;
11 import java.util.Date;
12
13 /**
14  * Handler主要用于对网络事件进行读写操作,是真正的业务类
15  * 通常只需要关注 channelRead 和 exceptionCaught 方法
16  * Created by sherry on 16/11/5.
17  */
18 public class TimerServerHandler extends ChannelHandlerAdapter {
19
20     /**
21      * 日志
22      */
23     private Logger logger = LoggerFactory.getLogger(TimerServerHandler.class);
24
25     private static int count = 0;
26
27     @Override
28     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
29
30         String body = (String) msg;
31         logger.debug("第  "+(++count)+"  次收到请求  -  "+body);
32
33         String timeNow = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS").format(new Date())+System.lineSeparator();
34
35         //获取发送给客户端的数据
36         ByteBuf resBuf = Unpooled.copiedBuffer(timeNow.getBytes("UTF-8"));
37
38         ctx.writeAndFlush(resBuf);
39     }
40
41
42     @Override
43     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
44         //将消息发送队列中的消息写入到SocketChannel中发送给对方
45         logger.debug("channelReadComplete");
46         ctx.flush();
47     }
48
49     @Override
50     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
51         //发生异常时,关闭 ChannelHandlerContext,释放ChannelHandlerContext 相关的句柄等资源
52         logger.error(cause.getMessage(),cause);
53         ctx.close();
54     }
55 }

TimerServerHandler

客户端

 1 package org.zln.netty.five.timer;
 2
 3 import io.netty.bootstrap.Bootstrap;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelOption;
 6 import io.netty.channel.EventLoopGroup;
 7 import io.netty.channel.nio.NioEventLoopGroup;
 8 import io.netty.channel.socket.nio.NioSocketChannel;
 9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11
12 /**
13  * 时间服务器客户端
14  * Created by sherry on 16/11/5.
15  */
16 public class TimerClient {
17     /**
18      * 日志
19      */
20     private Logger logger = LoggerFactory.getLogger(TimerServer.class);
21
22     private String HOST;
23     private int PORT;
24
25     public TimerClient(String HOST, int PORT) {
26         this.HOST = HOST;
27         this.PORT = PORT;
28     }
29
30     public void connect(){
31         //配置客户端NIO线程组
32         EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
33         try {
34             Bootstrap bootstrap = new Bootstrap();
35             bootstrap.group(eventLoopGroup)
36                     .channel(NioSocketChannel.class)
37                     .option(ChannelOption.TCP_NODELAY,true)
38                     .handler(new TimerClientInitializer());
39             //发起异步连接操作
40             logger.debug("发起异步连接操作 - start");
41             ChannelFuture channelFuture = bootstrap.connect(HOST,PORT).sync();
42             logger.debug("发起异步连接操作 - end");
43             //等待客户端链路关闭
44             logger.debug("等待客户端链路关闭 - start");
45             channelFuture.channel().closeFuture().sync();
46             logger.debug("等待客户端链路关闭 - end");
47         } catch (InterruptedException e) {
48             logger.error(e.getMessage(),e);
49         }finally {
50             //优雅的关闭
51             eventLoopGroup.shutdownGracefully();
52         }
53     }
54 }

TimerClient

 1 package org.zln.netty.five.timer;
 2
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.ChannelPipeline;
 5 import io.netty.channel.socket.SocketChannel;
 6 import io.netty.handler.codec.LineBasedFrameDecoder;
 7 import io.netty.handler.codec.string.StringDecoder;
 8
 9 /**
10  * Created by sherry on 16/11/5.
11  */
12 public class TimerClientInitializer extends ChannelInitializer<SocketChannel> {
13     @Override
14     protected void initChannel(SocketChannel socketChannel) throws Exception {
15         ChannelPipeline pipeline = socketChannel.pipeline();
16         pipeline.addLast(new LineBasedFrameDecoder(1024));
17         pipeline.addLast(new StringDecoder());
18         pipeline.addLast(new TimerClientHandler());
19     }
20 }

TimerClientInitializer

 1 package org.zln.netty.five.timer;
 2
 3 import io.netty.buffer.ByteBuf;
 4 import io.netty.buffer.Unpooled;
 5 import io.netty.channel.ChannelHandlerAdapter;
 6 import io.netty.channel.ChannelHandlerContext;
 7 import org.slf4j.Logger;
 8 import org.slf4j.LoggerFactory;
 9
10 import java.io.UnsupportedEncodingException;
11
12 /**
13  * Created by sherry on 16/11/5.
14  */
15 public class TimerClientHandler extends ChannelHandlerAdapter {
16
17     /**
18      * 日志
19      */
20     private Logger logger = LoggerFactory.getLogger(TimerClientHandler.class);
21
22     private static int count = 0;
23
24     @Override
25     public void channelActive(ChannelHandlerContext ctx) throws Exception {
26         logger.debug("客户端连接上了服务端");
27
28         //发送请求
29         ByteBuf reqBuf = null;
30         for (int i = 0; i < 100; i++) {
31             reqBuf = getReq("GET TIME"+System.lineSeparator());
32             ctx.writeAndFlush(reqBuf);
33         }
34
35
36     }
37
38     /**
39      * 将字符串包装成ByteBuf
40      * @param s
41      * @return
42      */
43     private ByteBuf getReq(String s) throws UnsupportedEncodingException {
44         byte[] data = s.getBytes("UTF-8");
45         ByteBuf reqBuf = Unpooled.buffer(data.length);
46         reqBuf.writeBytes(data);
47         return reqBuf;
48     }
49
50     @Override
51     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
52         String body = (String) msg;
53         logger.debug("这是收到的第 "+(++count)+" 笔响应 -- "+body);
54     }
55
56     @Override
57     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
58         ctx.close();
59     }
60 }

TimerClientHandler

这里主要使用 LineBasedFrameDecoder 和 StringDecoder 来实现解决粘包问题

原理如下:

  LineBasedFrameDecoder 依次遍历 ByteBuf 中的可读字节,判断是否有 \n 或 \r\n,如果有,就作为结束位置。从可读索引到结束位置区间的字节组成一行。它是以换行符为结束标志的解码器。支持携带结束符或者不懈怠结束符两种解码方式。同时支持配置单行的最大长度。如果读取到了最大长度仍旧没有发现换行符,就会抛出异常,同时忽略掉之前读到的数据。

  StringDecoder 的作用就是讲接收到的对象转化成字符串,然后继续调用handler。这样就不需要再handler中手动将对象转化成字符串了,直接强制转化就行。

  LineBasedFrameDecoder+StringDecoder组合就是按行切割的文本解码器,用来解决TCP的粘包和拆包问题。

  

时间: 2024-12-08 09:35:41

Netty解决TCP粘包/拆包问题 - 按行分隔字符串解码器的相关文章

netty解决tcp粘包拆包问题

tcp粘包拆包解决方案 1.发送定长的消息 server端:                    EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup)  .channel(NioServerSocketChannel.cl

Netty的TCP粘包/拆包(源码二)

假设客户端分别发送了两个数据包D1和D2给服务器,由于服务器端一次读取到的字节数是不确定的,所以可能发生四种情况: 1.服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包. 2.服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包. 3.服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包. 4.服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1

netty 解决TCP粘包与拆包问题(二)

TCP以流的方式进行数据传输,上层应用协议为了对消息的区分,采用了以下几种方法. 1.消息固定长度 2.第一篇讲的回车换行符形式 3.以特殊字符作为消息结束符的形式 4.通过消息头中定义长度字段来标识消息的总长度 一.采用指定分割符解决粘包与拆包问题 服务端 1 package com.ming.netty.nio.stickpack; 2 3 4 5 import java.net.InetSocketAddress; 6 7 import io.netty.bootstrap.ServerB

netty 解决TCP粘包与拆包问题(三)

今天使用netty的固定长度进行解码 固定长度解码的原理就是按照指定消息的长度对消息自动解码. 在netty实现中,只需要采用FiexedLengthFrameDecoder解码器即可... 以下是服务端代码 1 package com.ming.netty.nio.stickpack; 2 3 4 5 import java.net.InetSocketAddress; 6 7 import io.netty.bootstrap.ServerBootstrap; 8 import io.net

Netty(三)TCP粘包拆包处理

tcp是一个“流”的协议,一个完整的包可能会被TCP拆分成多个包进行发送,也可能把小的封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题. 粘包.拆包问题说明 假设客户端分别发送数据包D1和D2给服务端,由于服务端一次性读取到的字节数是不确定的,所以可能存在以下4种情况. 1.服务端分2次读取到了两个独立的包,分别是D1,D2,没有粘包和拆包: 2.服务端一次性接收了两个包,D1和D2粘在一起了,被成为TCP粘包; 3.服务端分2次读取到了两个数据包,第一次读取到了完整的D1和D2包的部

TCP粘包/拆包问题

无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层的粘包/拆包机制. TCP粘包/拆包 TCP是个"流"协议,所谓流,就是没有界限的一串数据.大家可以想想河里的流水,是连成一片的,其间并没有分界线.TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题. TCP粘包/拆包问题说明 假设客户

【游戏开发】Netty TCP粘包/拆包问题的解决办法(二)

上一篇:[Netty4.X]Unity客户端与Netty服务器的网络通信(一) 一.什么是TCP粘包/拆包 如图所示,假如客户端分别发送两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4中情况: 第一种情况:Server端分别读取到D1和D2,没有产生粘包和拆包的情况. 第二种情况:Server端一次接收到两个数据包,D1和D2粘合在一起,被称为TCP粘包. 第三种情况:Server端分2次读取到2个数据包,第一次读取到D1包和D2包的部分内容D2_1,第二次

Netty学习之TCP粘包/拆包

一.TCP粘包/拆包问题说明,如图 二.未考虑TCP粘包导致功能异常案例 按照设计初衷,服务端应该收到100条查询时间指令的请求查询,客户端应该打印100次服务端的系统时间 1.服务端类 package com.phei.netty.s2016042302; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitial

Netty中LineBasedFrameDecoder解码器使用与分析:解决TCP粘包问题

[toc] Netty中LineBasedFrameDecoder解码器使用与分析:解决TCP粘包问题 上一篇文章<Netty中TCP粘包问题代码示例与分析>演示了使用了时间服务器的例子演示了TCP的粘包问题,这里使用LineBasedFrameDecoder就是用来解决这个问题的. 不过需要注意的是,LineBasedFrameDecoder见名知其义,可见其是与行相关的,而在前面演示TCP粘包问题时,作者是有意在发送的消息中都加入了换行符,目的也是为了在后面去讲解LineBasedFram