一起学Netty(九)之LengthFieldBasedFrameDecoder

之前介绍了Netty天然的几种解析器,也稍微介绍了一下ByteToMessageDecoder类,我们对Netty的解码器还是有了一定的了解~

今天要介绍的是Netty中一个很重要的解码器,因为相比于其他的普通的解码器,这个解码器用的场景更多,并不是说其他解码器不重要,只是因为我们业务场景所致

在当今比较流行的水平拆分的架构之下,RPC协议很是流行,这样可以使各个项目解耦,使得更加灵活,每个项目之间通过远程调用交互,相互之间定义一个通讯私有协议,然后解析,这样就可以进行数据接口交互

例如我们定义一个这样的协议类:

package com.lyncc.netty.codec.lengthFieldBasedFrame;

public class CustomMsg {

    //类型  系统编号 0xAB 表示A系统,0xBC 表示B系统
    private byte type;

    //信息标志  0xAB 表示心跳包    0xBC 表示超时包  0xCD 业务信息包
    private byte flag;

    //主题信息的长度
    private int length;

    //主题信息
    private String body;

    public CustomMsg() {

    }

    public CustomMsg(byte type, byte flag, int length, String body) {
        this.type = type;
        this.flag = flag;
        this.length = length;
        this.body = body;
    }

    public byte getType() {
        return type;
    }

    public void setType(byte type) {
        this.type = type;
    }

    public byte getFlag() {
        return flag;
    }

    public void setFlag(byte flag) {
        this.flag = flag;
    }

    public int getLength() {
        return length;
    }

    public void setLength(int length) {
        this.length = length;
    }

    public String getBody() {
        return body;
    }

    public void setBody(String body) {
        this.body = body;
    }

}

我们规定两个系统通过Netty去发送这样的一个格式的信息,CustomMsg中包含这样的几类信息:

1)type表示发送端的系统类型

2)flag表示发送信息的类型,是业务数据,还是心跳包数据

3)length表示主题body的长度

4)body表示主题信息

有了这样的相互规定,发送端与接收端按照这种格式去编码和解码数据,这样就很容易的进行数据交互,当然如果netty不提供任何的类,我们也能进行编码解码,但是Netty还是提供了一个现有的类,这样可以避免我们重复造车,并且即使我们愿意重复造车,我们造的车也不一定比Netty好,所以我们还是直接使用吧

Netty提供的类叫做LengthFieldBasedFrameDecoder,与其他的解码器不一致的地方是它需要几个参数作为它的构造函数参数:

这几个参数的详细解析可以见如下文档:

http://blog.163.com/[email protected]/blog/static/127857195201210821145721/

我们也仔细说明一下这些参数,加入我们需要解析,加入我们需要解析我们刚才定义的CustomMsg,我们需要自定义一个decoder,这个类继承Netty提供的LengthFieldBasedFrameDecoder:

package com.lyncc.netty.codec.lengthFieldBasedFrame;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

public class CustomDecoder extends LengthFieldBasedFrameDecoder {

    //判断传送客户端传送过来的数据是否按照协议传输,头部信息的大小应该是 byte+byte+int = 1+1+4 = 6
    private static final int HEADER_SIZE = 6;

    private byte type;

    private byte flag;

    private int length;

    private String body;

    /**
     *
     * @param maxFrameLength 解码时,处理每个帧数据的最大长度
     * @param lengthFieldOffset 该帧数据中,存放该帧数据的长度的数据的起始位置
     * @param lengthFieldLength 记录该帧数据长度的字段本身的长度
     * @param lengthAdjustment 修改帧数据长度字段中定义的值,可以为负数
     * @param initialBytesToStrip 解析的时候需要跳过的字节数
     * @param failFast 为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异常
     */
    public CustomDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
            int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength,
                lengthAdjustment, initialBytesToStrip, failFast);
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (in == null) {
            return null;
        }
        if (in.readableBytes() < HEADER_SIZE) {
            throw new Exception("可读信息段比头部信息都小,你在逗我?");
        }

        //注意在读的过程中,readIndex的指针也在移动
        type = in.readByte();

        flag = in.readByte();

        length = in.readInt();

        if (in.readableBytes() < length) {
            throw new Exception("body字段你告诉我长度是"+length+",但是真实情况是没有这么多,你又逗我?");
        }
        ByteBuf buf = in.readBytes(length);
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        body = new String(req, "UTF-8");

        CustomMsg customMsg = new CustomMsg(type,flag,length,body);
        return customMsg;
    }

}

头部信息的大小我们这边写的是6,原因在代码里面也解释了,byte是一个字节,int是四个字节,那么头部大小就是6个字节,接下来就是要定义构造函数了,构造函数的入参的解释代码里已经标注了,我们真实的入参是:

稍微解释一下:

1)LENGTH_FIELD_LENGTH指的就是我们这边CustomMsg中length这个属性的大小,我们这边是int型,所以是4

2)LENGTH_FIELD_OFFSET指的就是我们这边length字段的起始位置,因为前面有type和flag两个属性,且这两个属性都是byte,两个就是2字节,所以偏移量是2

3)LENGTH_ADJUSTMENT指的是length这个属性的值,假如我们的body长度是40,有时候,有些人喜欢将length写成44,因为length本身还占有4个字节,这样就需要调整一下,那么就需要-4,我们这边没有这样做,所以写0就可以了

好了,以下给出完整的代码:

CustomServer.java

package com.lyncc.netty.codec.lengthFieldBasedFrame;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetSocketAddress;

public class CustomServer {

    private static final int MAX_FRAME_LENGTH = 1024 * 1024;
    private static final int LENGTH_FIELD_LENGTH = 4;
    private static final int LENGTH_FIELD_OFFSET = 2;
    private static final int LENGTH_ADJUSTMENT = 0;
    private static final int INITIAL_BYTES_TO_STRIP = 0;

    private int port;

    public CustomServer(int port) {
        this.port = port;
    }

    public void start(){
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        protected void initChannel(SocketChannel ch) throws Exception {
                             ch.pipeline().addLast(new CustomDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_ADJUSTMENT,INITIAL_BYTES_TO_STRIP,false));
                             ch.pipeline().addLast(new CustomServerHandler());
                        };

                    }).option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
             // 绑定端口,开始接收进来的连接
             ChannelFuture future = sbs.bind(port).sync();  

             System.out.println("Server start listen at " + port );
             future.channel().closeFuture().sync();
        } catch (Exception e) {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new CustomServer(port).start();
    }
}

CustomServerHandler.java

package com.lyncc.netty.codec.lengthFieldBasedFrame;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class CustomServerHandler extends SimpleChannelInboundHandler<Object> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        if(msg instanceof CustomMsg) {
            CustomMsg customMsg = (CustomMsg)msg;
            System.out.println("Client->Server:"+ctx.channel().remoteAddress()+" send "+customMsg.getBody());
        }

    }

}

CustomClient.java

package com.lyncc.netty.codec.lengthFieldBasedFrame;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class CustomClient {

    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));

    public static void main(String[] args) throws Exception {

        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new CustomEncoder());
                     ch.pipeline().addLast(new CustomClientHandler());
                 }
             });

            ChannelFuture future = b.connect(HOST, PORT).sync();
            future.channel().writeAndFlush("Hello Netty Server ,I am a common client");
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

}

CustomClientHandler.java

package com.lyncc.netty.codec.lengthFieldBasedFrame;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class CustomClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        CustomMsg customMsg = new CustomMsg((byte)0xAB, (byte)0xCD, "Hello,Netty".length(), "Hello,Netty");
        ctx.writeAndFlush(customMsg);
    }

}

最最重要的就是两个译码器:

CustomDecoder.java

package com.lyncc.netty.codec.lengthFieldBasedFrame;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

public class CustomDecoder extends LengthFieldBasedFrameDecoder {

    //判断传送客户端传送过来的数据是否按照协议传输,头部信息的大小应该是 byte+byte+int = 1+1+4 = 6
    private static final int HEADER_SIZE = 6;

    private byte type;

    private byte flag;

    private int length;

    private String body;

    /**
     *
     * @param maxFrameLength 解码时,处理每个帧数据的最大长度
     * @param lengthFieldOffset 该帧数据中,存放该帧数据的长度的数据的起始位置
     * @param lengthFieldLength 记录该帧数据长度的字段本身的长度
     * @param lengthAdjustment 修改帧数据长度字段中定义的值,可以为负数
     * @param initialBytesToStrip 解析的时候需要跳过的字节数
     * @param failFast 为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异常
     */
    public CustomDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
            int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength,
                lengthAdjustment, initialBytesToStrip, failFast);
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (in == null) {
            return null;
        }
        if (in.readableBytes() < HEADER_SIZE) {
            throw new Exception("可读信息段比头部信息都小,你在逗我?");
        }

        //注意在读的过程中,readIndex的指针也在移动
        type = in.readByte();

        flag = in.readByte();

        length = in.readInt();

        if (in.readableBytes() < length) {
            throw new Exception("body字段你告诉我长度是"+length+",但是真实情况是没有这么多,你又逗我?");
        }
        ByteBuf buf = in.readBytes(length);
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        body = new String(req, "UTF-8");

        CustomMsg customMsg = new CustomMsg(type,flag,length,body);
        return customMsg;
    }

}

CustomEncoder.java

package com.lyncc.netty.codec.lengthFieldBasedFrame;

import java.nio.charset.Charset;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class CustomEncoder extends MessageToByteEncoder<CustomMsg> {

    @Override
    protected void encode(ChannelHandlerContext ctx, CustomMsg msg, ByteBuf out) throws Exception {
        if(null == msg){
            throw new Exception("msg is null");
        }

        String body = msg.getBody();
        byte[] bodyBytes = body.getBytes(Charset.forName("utf-8"));
        out.writeByte(msg.getType());
        out.writeByte(msg.getFlag());
        out.writeInt(bodyBytes.length);
        out.writeBytes(bodyBytes);

    }

}

好了,到此为止代码就全部写完了,运行测试一下:

启动服务器端:

运行客户端后,再回到服务器端的控制台:

时间: 2024-08-28 08:15:07

一起学Netty(九)之LengthFieldBasedFrameDecoder的相关文章

从头开始学JavaScript (九)——执行环境和作用域

原文:从头开始学JavaScript (九)--执行环境和作用域 一.执行环境:定义了变量或者函数有权访问的其他数据,决定了它们各自的行为.每个执行环境都有与之关联的变量对象. 变量对象:保存着环境中定义的变量和函数. 作用域链:保证对执行环境有权访问的所有变量和函数的有序访问. 标识符解析:沿着作用域链一级一级地搜索标识符的过程. 通过例子来说明执行环境.变量对象以及作用域链: 1 <script type="text/javascript"> 2 var color =

Mina、Netty、Twisted一起学(九):异步IO和回调函数

用过JavaScript或者jQuery的同学都知道,JavaScript特别是jQuery中存在大量的回调函数,比如Ajax.jQuery的动画等. $.get(url, function() { doSomething1(); // (3) }); // (1) doSomething2(); // (2) 上面的代码是jQuery的Ajax,因为Ajax是异步的,所以在请求URL的过程中并不会堵塞程序,也就是程序运行到(1)并不用等待Ajax请求的结果,就继续往下运行(2).而$.get的

一起学Netty(十四)之 Netty生产级的心跳和重连机制

sigh,写这篇博客的时候老脸还是红了一下,心里还是有些唏嘘的,应该算是剽窃吧,每个人的代码功力的确是有差距的,好在文章的标题是"一起学",而不是开涛大神的"跟我学"系列的文章,我们还是多花点时间学习吧,感叹无用~ 最近工作比较忙,但闲暇之余还是看了阿里的冯家春(fengjiachun)的github上的开源代码Jupiter,写的RPC框架让我感叹人外有人,废话不多说,下面的代码全部截取自Jupiter,写了一个比较完整的例子,供大家一起学习分享,再次对@Luca

一起学Netty(十)之 Netty使用Google的ProtoBuf

protobuf是由Google开发的一套对数据结构进行序列化的方法,可用做通信协议,数据存储格式,等等.其特点是不限语言.不限平台.扩展性强 Netty也提供了对Protobuf的天然支持,我们今天就写一个简单的示例,简单地了解一下Netty对Google的protoBuf的支持 我们的示例场景很简单的:客户端发送一个信息,这个信息用Protobuf来做序列化,然后服务器端接收这个信息,解码,读取信息 protobuf与xml,json这样的数据格式一样,都有自己的一套语法,且语法很简单,很容

一起学Netty(五)之 初识ByteBuf和ByteBuf的常用API

网络传输的载体是byte,这是任何框架谁也逃脱不了的一种规定,JAVA的NIO提供了ByteBuffer,用来完成这项任务,当然ByteBuffer也很好的完成了这个任务,Netty也提供了一个名字很相似的载体叫做ByteBuf,相比于ByteBuf而言,它有着更加更多友善的API,也更加易于维护,并且它可以扩容 一般来说,ByteBuf都是维护一个byte数组的,它的内部格式是长成这个样子的 * +-------------------+------------------+---------

一起学Netty(七)之 TCP粘包拆包基本解决方案

上个小节我们浅析了在Netty的使用的时候TCP的粘包和拆包的现象,Netty对此问题提供了相对比较丰富的解决方案 Netty提供了几个常用的解码器,帮助我们解决这些问题,其实上述的粘包和拆包的问题,归根结底的解决方案就是发送端给远程端一个标记,告诉远程端,每个信息的结束标志是什么,这样,远程端获取到数据后,根据跟发送端约束的标志,将接收的信息分切或者合并成我们需要的信息,这样我们就可以获取到正确的信息了 例如,我们刚才的例子中,我们可以在发送的信息中,加一个结束标志,例如两个远程端规定以行来切

一起学Netty(六)之 TCP粘包拆包场景

TCP编程底层都有粘包和拆包机制,因为我们在C/S这种传输模型下,以TCP协议传输的时候,在网络中的byte其实就像是河水,TCP就像一个搬运工,将这流水从一端转送到另一端,这时又分两种情况: 1)如果客户端的每次制造的水比较多,也就是我们常说的客户端给的包比较大,TCP这个搬运工就会分多次去搬运. 2)如果客户端每次制造的水比较少的话,TCP可能会等客户端多次生产之后,把所有的水一起再运输到另一端 上述第一种情况,就是需要我们进行粘包,在另一端接收的时候,需要把多次获取的结果粘在一起,变成我们

一起学Netty(三)之 SimpleChannelInboundHandler

其实Netty的知识点还是很零碎的,比如这个SimpleChannelInboundHandler这个类,在<Netty in Action>该书中的原版的Hello world的demo的客户端就是使用的SimpleChannelInboundHandler来作为处理器的,我本来也是使用这个类作为我处理类的,但是做一个新手,这个类还是让我走了一点弯路,我们可以看到SimpleChannelInboundHandler中有一个channelRead0的方法需要我们实现: 尼玛,我记得当时就是使

重学STM32---(九) ——CAN通信(一)

一.CAN简介 1.CAN是什么? CAN 是 Controller Area Network的缩写(以下称为 CAN),是 ISO 国际标准化的串行通信协议. 2.CAN特点 (1)  多主控制 (2)  消息的发送 在 CAN 协议中,所有的消息都以固定的格式发送.总线空闲时,所有与总线相连的单元都可以开始发送新 消息.两个以上的单元同时开始发送消息时,根据标识符(Identifier  以下称为 ID)决定优先级.ID 并不 是表示发送的目的地址,而是表示访问总线的消息的优先级.两个以上的