自定义协议可以解决粘包和拆包问题,客户端发送数据时携带数据包长度,服务端接收数据后解析消息体,获取数据包长度值,据此继续获取数据包内容。我们来看具体例子,自定义的协议如下:
+--------------------------------------------------+----------+
| 消息头 | 消息体 |
| Delimiter | Length | Type | Reserved | data |
+-------------------------------------------------+----------+
1) Delimiter:4bytes,消息头,用于分割消息。
2) Length:数据长度。
3) Type:1bytes,消息类型。
4) Reserved:1bytes,保留。
5) Data包数据
接下来看如何实现编解码:
1、先定义好javabean:
总体的:
package com.wlf.netty.nettyapi.javabean; import lombok.Getter; import lombok.Setter; @Setter @Getter public class NettyMessage { private Header header; private byte[] data; @Override public String toString() { return "NettyMessage{" + "header=" + header + ", data=" + data + ‘}‘; } }
头的:
package com.wlf.netty.nettyapi.javabean; import lombok.Getter; import lombok.Setter; @Getter @Setter public class Header { /** * 4bytes,消息头,用于分割消息。如0xABEF0101 */ private int delimiter; /** * 1byte,类型 */ private byte type; /** * 1byte,保留 */ private byte reserved; /** * 数据长度 */ private int length; @Override public String toString() { return "Header{" + "delimiter=" + delimiter + ", length=" + length + ", type=" + type + ", reserved=" + reserved + ‘}‘; } }
2、编码:
package com.wlf.netty.nettyapi.msgpack; import com.wlf.netty.nettyapi.javabean.NettyMessage; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class NettyMessageEncoder extends MessageToByteEncoder<NettyMessage> { @Override protected void encode(ChannelHandlerContext channelHandlerContext, NettyMessage nettyMessage, ByteBuf byteBuf) throws Exception { if (nettyMessage == null || nettyMessage.getHeader() == null) { throw new Exception("The nettyMessage is null."); } // 1、写入分割标志 byteBuf.writeInt(nettyMessage.getHeader().getDelimiter()); // 2、写入数据包长度 byteBuf.writeInt(nettyMessage.getData() != null ? nettyMessage.getData().length : 0); // 3、写入请求类型 byteBuf.writeByte(nettyMessage.getHeader().getType()); // 4、写入预留字段 byteBuf.writeByte(nettyMessage.getHeader().getReserved()); // 5、写入数据 byteBuf.writeBytes(nettyMessage.getData() != null ? nettyMessage.getData() : null); } }
3、解码:
package com.wlf.netty.nettyapi.msgpack; import com.wlf.netty.nettyapi.constant.Delimiter; import com.wlf.netty.nettyapi.javabean.Header; import com.wlf.netty.nettyapi.javabean.NettyMessage; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import java.util.List; public class NettyMessageDecoder extends MessageToMessageDecoder<ByteBuf> { /** * 消息体字节大小:分割符字段4字节+长度字段4字节+请求类型字典1字节+预留字段1字节=10字节 */ private static final int HEAD_LENGTH = 10; @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { // 字节流开始位置 int packStartIndex; while (true) { // 获取字节流开始位置 packStartIndex = byteBuf.readerIndex(); // 若读取到分割标识,说明读取当前字节流开始位置了 if (byteBuf.readInt() == Delimiter.DELIMITER) { break; } // 重置读索引为0 byteBuf.resetReaderIndex(); // 长度校验,字节流长度至少10字节,小于10字节则等待下一次字节流过来 if (byteBuf.readableBytes() < HEAD_LENGTH) { return; } } // 2、获取data的字节流长度 int dataLength = byteBuf.readInt(); // 校验数据包是否全部发送过来,减去type和reserved两个字节后应该 int totalLength = byteBuf.readableBytes(); if ((totalLength - 2) < dataLength) { // 长度校验,字节流长度少于数据包长度,说明数据包拆包了,等待下一次字节流过来 byteBuf.readerIndex(packStartIndex); return; } // 3、请求类型 byte type = byteBuf.readByte(); // 4、预留字段 byte reserved = byteBuf.readByte(); // 5、数据包内容 byte[] data = null; if (dataLength > 0) { data = new byte[dataLength]; byteBuf.readBytes(data); } NettyMessage nettyMessage = new NettyMessage(); Header header = new Header(); header.setDelimiter(Delimiter.DELIMITER); header.setLength(dataLength); header.setType(type); header.setReserved(reserved); nettyMessage.setHeader(header); nettyMessage.setData(data); list.add(nettyMessage); // 回收已读字节 byteBuf.discardReadBytes(); } }
为了运行,我们需要写客户端和服务端的handler:
4、客户端handler:
package com.wlf.netty.nettyclient.handler; import com.wlf.netty.nettyapi.constant.Delimiter; import com.wlf.netty.nettyapi.constant.MessageType; import com.wlf.netty.nettyapi.javabean.Header; import com.wlf.netty.nettyapi.javabean.NettyMessage; import com.wlf.netty.nettyapi.util.CommonUtil; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ArrayUtils; import java.io.RandomAccessFile; import java.util.Arrays; /** * 客户端处理类 */ @Slf4j public class NettyClientHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(buildClientRequest()); } /** * 在处理过程中引发异常时被调用 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("[Client] netty client request error: {}", cause.getMessage()); ctx.close(); } /** * 创建请求消息体 * * @param audioData * @param time * @return */ private NettyMessage buildClientRequest() { NettyMessage nettyMessage = new NettyMessage(); Header header = new Header(); byte[] data = buildData(); header.setDelimiter(Delimiter.DELIMITER); header.setLength(data.length); header.setType(MessageType.PCM_TYPE.getType()); header.setReserved((byte) 0); nettyMessage.setHeader(header); // 设置数据包 nettyMessage.setData(data); return nettyMessage; } /** * 构造PCM请求消息体 * * @return */ private byte[] buildPcmData() { byte[] timeByte = longToBytes(System.currentTimeMillis()); return timeByte; }
/** * long转字节 * * @param values * @return */ private byte[] longToBytes(long values) { byte[] buffer = new byte[8]; for (int i = 0; i < 8; i++) { int offset = 64 - (i + 1) * 8; buffer[i] = (byte) ((values >> offset) & 0xff); } return buffer; }
}
5、服务端handler:
package com.wlf.netty.nettyserver.handler; import com.wlf.netty.nettyapi.constant.MessageType; import com.wlf.netty.nettyapi.javabean.NettyMessage; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Slf4j @Component public class NettyServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyMessage nettyMessage = (NettyMessage) msg; if (nettyMessage.getHeader() != null && nettyMessage.getHeader().getType() == MessageType.PCM_TYPE.getType()) { log.info("[server] server receive client message : {}", nettyMessage); if (nettyMessage == null || nettyMessage.getData() == null) { log.error("nettyMessage is null."); } // 获取时间戳(8字节) byte[] data = nettyMessage.getData(); ByteBuf buf = Unpooled.buffer(data.length); buf.writeBytes(data); long startTime = buf.readLong(); log.info("startTime: {}", startTime); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error("server received failed, error : {}", cause.getMessage()); cause.printStackTrace(); ctx.close(); } }
最后,为了启动,我们还得再补两个启动类:
6、客户端:
package com.wlf.netty.nettyclient.client; import com.wlf.netty.nettyapi.msgpack.NettyMessageDecoder; import com.wlf.netty.nettyapi.msgpack.NettyMessageEncoder; import com.wlf.netty.nettyclient.handler.NettyClientHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; /** * 客户端 * 1.为初始化客户端,创建一个Bootstrap实例 * 2.为进行事件处理分配了一个NioEventLoopGroup实例,其中事件处理包括创建新的连接以及处理入站和出站数据; * 3.当连接被建立时,一个NettyClientHandler实例会被安装到(该Channel的一个ChannelPipeline中; * 4.在一切都设置完成后,调用Bootstrap.connect()方法连接到远程节点。 */ @Slf4j public class NettyClient { private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); EventLoopGroup group = new NioEventLoopGroup(); public void connect(int port, String host) throws Exception { NioEventLoopGroup workGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workGroup).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast(new NettyMessageDecoder()); channel.pipeline().addLast(new NettyMessageEncoder()); channel.pipeline().addLast(new NettyClientHandler()); } }); ChannelFuture future = bootstrap.connect(host, port).sync(); future.channel().closeFuture().sync(); } finally { workGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 9911; new NettyClient().connect(port, "127.0.0.1"); } }
7、服务端启动类:
package com.wlf.netty.nettyserver.server; import com.wlf.netty.nettyapi.msgpack.NettyMessageDecoder; import com.wlf.netty.nettyapi.msgpack.NettyMessageEncoder; import com.wlf.netty.nettyserver.handler.NettyServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler;import lombok.extern.slf4j.Slf4j; @Slf4j public class NettyServer { private final EventLoopGroup bossGroup = new NioEventLoopGroup(); private final EventLoopGroup workGroup = new NioEventLoopGroup(); public void bind(int port) throws Exception{ try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast(new NettyMessageDecoder()); channel.pipeline().addLast(new NettyMessageEncoder()); channel.pipeline().addLast(new NettyServerHandler()); } }); // 绑定端口 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 9911; new NettyServer().bind(port); } }
直接跑上面两个启动类,先跑服务端,再跑客户端:
客户端输出:
17:20:04.258 [nioEventLoopGroup-1-1] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] client send data : NettyMessage{header=Header{delimiter=-1410399999, length=8, type=1, reserved=0}, data=[[email protected]}
服务端输出:
17:20:04.295 [nioEventLoopGroup-1-1] INFO com.wlf.netty.nettyserver.handler.NettyServerHandler - [server] server receive client message : NettyMessage{header=Header{delimiter=-1410399999, length=8, type=1, reserved=0}, data=[[email protected]} 17:20:04.295 [nioEventLoopGroup-1-1] INFO com.wlf.netty.nettyserver.handler.NettyServerHandler - startTime: 1570785604258
原文地址:https://www.cnblogs.com/wuxun1997/p/11655537.html
时间: 2024-11-01 12:38:31