netty5自定义私有协议实例

  自定义协议可以解决粘包和拆包问题,客户端发送数据时携带数据包长度,服务端接收数据后解析消息体,获取数据包长度值,据此继续获取数据包内容。我们来看具体例子,自定义的协议如下:

  +--------------------------------------------------+----------+
   |                消息头                     | 消息体 |
   | Delimiter | Length | Type | Reserved  |   data   |
   +-------------------------------------------------+----------+

  

  1)      Delimiter:4bytes,消息头,用于分割消息。

  2)      Length:数据长度。

  3)      Type:1bytes,消息类型。

  4)      Reserved:1bytes,保留。

  5)      Data包数据

  接下来看如何实现编解码:

  1、先定义好javabean:

  总体的:

package com.wlf.netty.nettyapi.javabean;

import lombok.Getter;
import lombok.Setter;

@Setter
@Getter
public class NettyMessage {

    private Header header;

    private byte[] data;

    @Override
    public String toString() {
        return "NettyMessage{" +
                "header=" + header +
                ", data=" + data +
                ‘}‘;
    }
}

  头的:

package com.wlf.netty.nettyapi.javabean;

import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
public class Header {

    /**
     * 4bytes,消息头,用于分割消息。如0xABEF0101
     */
    private int delimiter;

    /**
     * 1byte,类型
     */
    private byte type;

    /**
     * 1byte,保留
     */
    private byte reserved;

    /**
     * 数据长度
     */
    private int length;

    @Override
    public String toString() {
        return "Header{" +
                "delimiter=" + delimiter +
                ", length=" + length +
                ", type=" + type +
                ", reserved=" + reserved +
                ‘}‘;
    }
}

  2、编码:

package com.wlf.netty.nettyapi.msgpack;

import com.wlf.netty.nettyapi.javabean.NettyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class NettyMessageEncoder extends MessageToByteEncoder<NettyMessage> {

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, NettyMessage nettyMessage, ByteBuf byteBuf) throws Exception {

        if (nettyMessage == null || nettyMessage.getHeader() == null) {
            throw new Exception("The nettyMessage is null.");
        }

        // 1、写入分割标志
        byteBuf.writeInt(nettyMessage.getHeader().getDelimiter());

        // 2、写入数据包长度
        byteBuf.writeInt(nettyMessage.getData() != null ? nettyMessage.getData().length : 0);

        // 3、写入请求类型
        byteBuf.writeByte(nettyMessage.getHeader().getType());

        // 4、写入预留字段
        byteBuf.writeByte(nettyMessage.getHeader().getReserved());

        // 5、写入数据
        byteBuf.writeBytes(nettyMessage.getData() != null ? nettyMessage.getData() : null);

    }
}

  3、解码:

package com.wlf.netty.nettyapi.msgpack;

import com.wlf.netty.nettyapi.constant.Delimiter;
import com.wlf.netty.nettyapi.javabean.Header;
import com.wlf.netty.nettyapi.javabean.NettyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;

import java.util.List;

public class NettyMessageDecoder extends MessageToMessageDecoder<ByteBuf> {

    /**
     * 消息体字节大小:分割符字段4字节+长度字段4字节+请求类型字典1字节+预留字段1字节=10字节
     */
    private static final int HEAD_LENGTH = 10;

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {

        // 字节流开始位置
        int packStartIndex;
        while (true) {

            // 获取字节流开始位置
            packStartIndex = byteBuf.readerIndex();

            // 若读取到分割标识,说明读取当前字节流开始位置了
            if (byteBuf.readInt() == Delimiter.DELIMITER) {
                break;
            }

            // 重置读索引为0
            byteBuf.resetReaderIndex();

            // 长度校验,字节流长度至少10字节,小于10字节则等待下一次字节流过来
            if (byteBuf.readableBytes() < HEAD_LENGTH) {
                return;
            }
        }

        // 2、获取data的字节流长度
        int dataLength = byteBuf.readInt();

        // 校验数据包是否全部发送过来,减去type和reserved两个字节后应该
        int totalLength = byteBuf.readableBytes();
        if ((totalLength - 2) < dataLength) {

            // 长度校验,字节流长度少于数据包长度,说明数据包拆包了,等待下一次字节流过来
            byteBuf.readerIndex(packStartIndex);
            return;
        }

        // 3、请求类型
        byte type = byteBuf.readByte();

        // 4、预留字段
        byte reserved = byteBuf.readByte();

        // 5、数据包内容
        byte[] data = null;
        if (dataLength > 0) {
            data = new byte[dataLength];
            byteBuf.readBytes(data);
        }

        NettyMessage nettyMessage = new NettyMessage();
        Header header = new Header();
        header.setDelimiter(Delimiter.DELIMITER);
        header.setLength(dataLength);
        header.setType(type);
        header.setReserved(reserved);
        nettyMessage.setHeader(header);
        nettyMessage.setData(data);

        list.add(nettyMessage);

        // 回收已读字节
        byteBuf.discardReadBytes();
    }
}

  为了运行,我们需要写客户端和服务端的handler:

  4、客户端handler:

package com.wlf.netty.nettyclient.handler;

import com.wlf.netty.nettyapi.constant.Delimiter;
import com.wlf.netty.nettyapi.constant.MessageType;
import com.wlf.netty.nettyapi.javabean.Header;
import com.wlf.netty.nettyapi.javabean.NettyMessage;
import com.wlf.netty.nettyapi.util.CommonUtil;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;

import java.io.RandomAccessFile;
import java.util.Arrays;

/**
 * 客户端处理类
 */
@Slf4j
public class NettyClientHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {     ctx.writeAndFlush(buildClientRequest());
    }

    /**
     * 在处理过程中引发异常时被调用
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("[Client] netty client request error: {}", cause.getMessage());
        ctx.close();
    }

    /**
     * 创建请求消息体
     *
     * @param audioData
     * @param time
     * @return
     */
    private NettyMessage buildClientRequest() {
        NettyMessage nettyMessage = new NettyMessage();
        Header header = new Header();
        byte[] data = buildData();
        header.setDelimiter(Delimiter.DELIMITER);
        header.setLength(data.length);
        header.setType(MessageType.PCM_TYPE.getType());
        header.setReserved((byte) 0);
        nettyMessage.setHeader(header);

        // 设置数据包
        nettyMessage.setData(data);
        return nettyMessage;
    }

    /**
     * 构造PCM请求消息体
     *
     * @return
     */
    private byte[] buildPcmData() {
        byte[] timeByte = longToBytes(System.currentTimeMillis());

        return timeByte;
    }

  
  /**   * long转字节   *   * @param values   * @return   */  private byte[] longToBytes(long values) {      byte[] buffer = new byte[8];      for (int i = 0; i < 8; i++) {          int offset = 64 - (i + 1) * 8;          buffer[i] = (byte) ((values >> offset) & 0xff);      }      return buffer;  }
}

  5、服务端handler:

package com.wlf.netty.nettyserver.handler;

import com.wlf.netty.nettyapi.constant.MessageType;
import com.wlf.netty.nettyapi.javabean.NettyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class NettyServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyMessage nettyMessage = (NettyMessage) msg;

        if (nettyMessage.getHeader() != null && nettyMessage.getHeader().getType() == MessageType.PCM_TYPE.getType()) {
            log.info("[server] server receive client message : {}", nettyMessage);
            if (nettyMessage == null || nettyMessage.getData() == null) {
                log.error("nettyMessage is null.");
            }

            // 获取时间戳(8字节)
            byte[] data = nettyMessage.getData();
            ByteBuf buf = Unpooled.buffer(data.length);
            buf.writeBytes(data);
            long startTime = buf.readLong();
            log.info("startTime: {}", startTime);

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("server received failed, error : {}", cause.getMessage());
        cause.printStackTrace();
        ctx.close();
    }
}

  最后,为了启动,我们还得再补两个启动类:

  6、客户端:

package com.wlf.netty.nettyclient.client;

import com.wlf.netty.nettyapi.msgpack.NettyMessageDecoder;
import com.wlf.netty.nettyapi.msgpack.NettyMessageEncoder;
import com.wlf.netty.nettyclient.handler.NettyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;

/**
 * 客户端
 * 1.为初始化客户端,创建一个Bootstrap实例
 * 2.为进行事件处理分配了一个NioEventLoopGroup实例,其中事件处理包括创建新的连接以及处理入站和出站数据;
 * 3.当连接被建立时,一个NettyClientHandler实例会被安装到(该Channel的一个ChannelPipeline中;
 * 4.在一切都设置完成后,调用Bootstrap.connect()方法连接到远程节点。
 */
@Slf4j
public class NettyClient {

    private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);

    EventLoopGroup group = new NioEventLoopGroup();

    public void connect(int port, String host) throws Exception {
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(workGroup).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline().addLast(new NettyMessageDecoder());
                            channel.pipeline().addLast(new NettyMessageEncoder());
                            channel.pipeline().addLast(new NettyClientHandler());
                        }
                    });
            ChannelFuture future = bootstrap.connect(host, port).sync();
            future.channel().closeFuture().sync();
        } finally {
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 9911;
        new NettyClient().connect(port, "127.0.0.1");
    }
}

  7、服务端启动类:

package com.wlf.netty.nettyserver.server;

import com.wlf.netty.nettyapi.msgpack.NettyMessageDecoder;
import com.wlf.netty.nettyapi.msgpack.NettyMessageEncoder;
import com.wlf.netty.nettyserver.handler.NettyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NettyServer {

    private final EventLoopGroup bossGroup = new NioEventLoopGroup();
    private final EventLoopGroup workGroup = new NioEventLoopGroup();

    public void bind(int port) throws Exception{
        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline().addLast(new NettyMessageDecoder());
                            channel.pipeline().addLast(new NettyMessageEncoder());
                            channel.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            // 绑定端口
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 9911;
        new NettyServer().bind(port);
    }

}

  直接跑上面两个启动类,先跑服务端,再跑客户端:

  客户端输出:

17:20:04.258 [nioEventLoopGroup-1-1] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] client send data : NettyMessage{header=Header{delimiter=-1410399999, length=8, type=1, reserved=0}, data=[[email protected]}

  服务端输出:

17:20:04.295 [nioEventLoopGroup-1-1] INFO com.wlf.netty.nettyserver.handler.NettyServerHandler - [server] server receive client message : NettyMessage{header=Header{delimiter=-1410399999, length=8, type=1, reserved=0}, data=[[email protected]}
17:20:04.295 [nioEventLoopGroup-1-1] INFO com.wlf.netty.nettyserver.handler.NettyServerHandler - startTime: 1570785604258

  

原文地址:https://www.cnblogs.com/wuxun1997/p/11655537.html

时间: 2024-11-01 12:38:31

netty5自定义私有协议实例的相关文章

Netty5快速入门及实例视频教程(整合Spring)

Netty5快速入门及实例视频教程+源码(整合Spring) https://pan.baidu.com/s/1pL8qF0J 01.传统的Socket分析02.NIO的代码分析03.对于NIO的一些疑惑04.Netty服务端HelloWorld入门05.Netty服务端入门补充06.Netty客户端入门07.如何构建一个多线程NIO系统08.Netty源码分析一09.Netty源码分析二10.Netty5服务端入门案例11.Netty5客户端入门案例12.单客户端多连接程序13.Netty学习

[转]Windows 注册自定义的协议

[转自] http://blog.sina.com.cn/s/blog_86e4a51c01010nik.html 1.注册应用程序来处理自定义协议 你必须添加一个新的key以及相关的value到HKEY_CLASSES_ROOT中,来使应用程序可以处理特殊的URL协议. 新注册的key必须与协议scheme相匹配才可以被添加.例如,增加一个“alert:”协议,被增加到HKEY_CLASSES_ROOT的key必须是alert.在这个新的key之下,默认的字符串value将显示新协议的名字,并

(转)通过自定义URL协议在Web中启动本地应用程序

通过自定义URL协议在Web中启动本地应用程序 1.注册应用程序来处理自定义协议 你必须添加一个新的key以及相关的value到HKEY_CLASSES_ROOT中,来使应用程序可以处理特殊的URL协议. 新注册的key必须与协议scheme相匹配才可以被添加.例如,增加一个“alert:”协议,被增加到HKEY_CLASSES_ROOT的key必须是alert.在这个新的key之下,默认的字符串value将显示新协议的名字,并且URL协议字符串value将包含协议特有的信息或者空字符串.Key

私有协议的实现和文档的整理

在工作中,我遇到了实现私有协议的诸多麻烦,其中更多的是文档的残缺和错漏,以及对于某些领域采用的专业的表示.很多的私有的协议,在刚开始的时候,没有过多的考虑到可扩展性,所以使用的都是结构体,并且结构体在字节的控制上,并没有采用严格的规定,导致在实现的过程中,需要不断的尝试.我自己在实现的过中,遇到的一个问题就是如何正确的分析报文中的有效字符.究竟这些斧子的确切含义,对于实现功能是至关重要的.我刚开始没有老大的意见:详细的分析每一个字节代表的含义,实在是惭愧.可能是100多个字节,完全是无法了解的.

通过反射创建自定义泛型的实例。

比如有这样一个泛型:Demo.GenericsSimple<T,TT> 我想要通过反射创建一个Demo.GenericsSimple<string,int>的实例可以通过下面的格式进行创建: System.Reflection.Assembly.GetExecutingAssembly().CreateInstance("命名空间.User`形参数量N[[1形参类型全名,形参类型所在的程 序集名称],[2形参类型全名,形参类型所在的程序集名称],[3形参类型全名,形参类型

通过私有协议在Chrome浏览器网页中打开本地程序

最近甲方有这样一个需求:两套系统,一套基于Chrome开发,一套基于IE开发,想要在Chrome中增加一个链接,然后进入IE开发的系统.也就是说,想要在Chrome中创建链接跳转到IE浏览器指定页面,还要实现跳转动画效果.这个需求我们先来解决从Chrome跳转到IE的问题. 问题分析:从Chrome中跳转到IE,直接以http链接形式是不可能跳转到IE的,只能通过单独开发的本地程序打开IE,问题是如何让Chrome打开该程序.有一种方法可以实现:通过注册私有协议,用户点击链接的时候直接使用私有协

jquery mobile自定义webapp开发实例(一)——前言篇

用jquery mobile做了一段时间的webapp开发,准备用自己的一个小demo做一个模块化的分享 点击demo演示 手机演示二维码: 此demo已经是比较老的版本,用户体验流畅度确实还存在很大的问题,但在我写这一系列的webapp开发实例后,将会解决大部分问题 下面是以后准备写到的一些点: 1:谈谈关于jquery mobile 一些常见问题的解决.(常见问题的解决查看) 2:jquery mobile实现多页面跳转(不是官网上那样把所以的page在一个.html文件内,而是每个page

通过私有协议Chrome浏览器页面打开本地程序

近期方有这样的要求:这两个系统,根据一组Chrome开展,根据一组IE开展,需要Chrome添加一个链接,然后进入IE该系统的开发.这,需要Chrome跳转到创建一个链接IE浏览器指定的页面.同时也实现了跳跃的动画.第一解决的需要我们的Chrome转到IE的问题. 问题分析:从Chrome中跳转到IE,直接以http链接形式是不可能跳转到IE的,仅仅能通过单独开发的本地程序打开IE,问题是怎样让Chrome打开该程序.有一种方法能够实现:通过注冊私有协议,用户点击链接的时候直接使用私有协议调用本

呕心沥血的java复杂项目(包括自定义应用层协议、CS多线程、多客户端登录、上下线提醒等等)

建议大家先下源代码,导入到Eclipse,然后运行服务器和多个客户端,这样有个不错的体会.下载地址:http://download.csdn.net/detail/woshiwanghao_hi/7320927. 首先来看下整个系统的文件架构图: 系统是个基于UDP的聊天室,因为不能保持所有用户和聊天室的持续连接.同时为了保持数据传输的可靠性,就需要自定义应用层协议了. 程序大概的一个流程如下: 1.启动服务器,点击"start service",之后服务器及开始监听指定端口. 2.启