Netty作为很多高性能的底层通讯工具,被很多开发框架应用再底层,今天来说说常用的序列化工具,用Jboss的Marshalling。
直接上代码,Marshalling的工厂类
package com.netty.serialize.marshalling; import io.netty.handler.codec.marshalling.*; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; import org.jboss.marshalling.MarshallingConfiguration; /** * Created by sdc on 2017/8/28. */ public class MarshallingCodeCFactory { /** * 解码 * @return */ public static MarshallingDecoder buildMarshallingDecoder() { //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); //创建了MarshallingConfiguration对象,配置了版本号为5 final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); //根据marshallerFactory和configuration创建provider UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024); return decoder; } /** * 编码 * @return */ public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组 MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
这个是Marshalling的序列化方式,Marshalling自带编解码,所以不用担心中途编解码半包的问题。
服务端的Server实现:
package com.netty.serialize.server; import com.netty.serialize.coder.MsgDecoder; import com.netty.serialize.coder.MsgEncoder; import com.netty.serialize.handler.ServerHandler; import com.netty.serialize.marshalling.MarshallingCodeCFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * Created by sdc on 2017/8/26. */ public class MsgServer { public void bind(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap sb = new ServerBootstrap(); sb.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // .childHandler(new ChildChannelHandler()) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); channel.pipeline().addLast(new ServerHandler()); } }) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture cf = sb.bind(port).sync(); System.out.println("服务端已启动"); cf.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static class ChildChannelHandler extends ChannelInitializer { protected void initChannel(Channel channel) throws Exception { channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); channel.pipeline().addLast(new ServerHandler()); } } public static void main(String[] args){ try { new MsgServer().bind(8080); } catch (Exception e) { e.printStackTrace(); } } }
package com.netty.serialize.handler; import com.netty.serialize.message.Message; import com.netty.serialize.message.MsgHeader; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; /** * 用于测试服务端实现的 * Created by sdc on 2017/8/29. */ public class ServerHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); // System.out.println("active"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); // ctx.close(); super.exceptionCaught(ctx, cause); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Message newMsg = (Message)msg; // String msgStrClient = (String)msg; System.out.println("获取客户端里的内容:" + newMsg); Message message = new Message(); String msgStr = "客户端接受到通知"; MsgHeader header = new MsgHeader(); header.setStartTag(new Byte("0")); header.setCmdCode("1234".getBytes()); header.setLength(msgStr.length()); header.setVersion("11".getBytes()); message.setBody(msgStr); message.setHeader(header); ctx.writeAndFlush(message); } }
客户端的实现:
package com.netty.serialize.client; import com.netty.serialize.coder.MsgDecoder; import com.netty.serialize.coder.MsgEncoder; import com.netty.serialize.handler.ClientHandler; import com.netty.serialize.handler.ServerHandler; import com.netty.serialize.marshalling.MarshallingCodeCFactory; import com.netty.serialize.message.Message; import com.netty.serialize.message.MsgHeader; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; /** * Created by sdc on 2017/8/26. */ public class MsgClient { public void connect(String ip, int port) throws Exception { EventLoopGroup workerGroup = new NioEventLoopGroup(); // Message message = new Message(); // String msgStr = "我想发送一条消息"; // MsgHeader header = new MsgHeader(); // header.setStartTag(new Byte("0")); // header.setCmdCode("1234".getBytes()); // header.setLength(msgStr.length()); // header.setVersion("11".getBytes()); // // message.setBody(msgStr); // message.setHeader(header); try { Bootstrap bs = new Bootstrap(); bs.group(workerGroup) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true)// .handler(new ChildChannelHandler()); ChannelFuture f = bs.connect(ip,port).sync(); //写入消息 // f.channel().writeAndFlush(message).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } public static class ChildChannelHandler extends ChannelInitializer { protected void initChannel(Channel channel) throws Exception { channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); channel.pipeline().addLast(new ClientHandler()); } } public static void main(String[] args){ try { new MsgClient().connect("127.0.0.1", 8080); } catch (Exception e) { e.printStackTrace(); } } }
package com.netty.serialize.handler; import com.netty.serialize.message.Message; import com.netty.serialize.message.MsgHeader; import io.netty.channel.*; import io.netty.util.ReferenceCountUtil; /** * Created by sdc on 2017/8/29. */ public class ClientHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Message message = new Message(); String msgStr = "我想发送一条消息"; MsgHeader header = new MsgHeader(); header.setStartTag(new Byte("0")); header.setCmdCode("1234".getBytes()); header.setLength(msgStr.length()); header.setVersion("11".getBytes()); message.setBody(msgStr); message.setHeader(header); ctx.writeAndFlush(message).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { // do sth System.out.println("成功发送到服务端消息"); } else { // do sth System.out.println("失败服务端消息失败"); } } }); // ctx.writeAndFlush(message); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // ctx.close(); super.exceptionCaught(ctx, cause); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { Message newMsg = (Message) msg; System.out.println("收到服务端的内容" + newMsg); }finally { ReferenceCountUtil.release(msg); } } }
传输的POJO的类,是自定义的封装好的信息。
package com.netty.serialize.message; import java.io.Serializable; /** * Created by sdc on 2017/8/26. */ public class Message implements Serializable{ /** * */ private static final long serialVersionUID = 4923081103118853877L; private MsgHeader header; private Object body; //检验和 // private byte crcCode; // public byte getCrcCode() { // return crcCode; // } // // public void setCrcCode(byte crcCode) { // this.crcCode = crcCode; // } public MsgHeader getHeader() { return header; } public void setHeader(MsgHeader header) { this.header = header; } public Object getBody() { return body; } public void setBody(Object body) { this.body = body; } @Override public String toString() { return "Message{" + "header=" + header + ", body=" + body + // ", crcCode=" + crcCode + ‘}‘; } }
package com.netty.serialize.message; import java.io.Serializable; import java.util.Arrays; /** * Created by sdc on 2017/8/26. */ public class MsgHeader implements Serializable{ /** * */ private static final long serialVersionUID = 4923081103118853877L; //固定头 private byte startTag; //命令码,4位 private byte[] cmdCode; //版本 2位 private byte[] version; private int length; public byte[] getVersion() { return version; } public void setVersion(byte[] version) { this.version = version; } public byte[] getCmdCode() { return cmdCode; } public void setCmdCode(byte[] cmdCode) { this.cmdCode = cmdCode; } public byte getStartTag() { return startTag; } public void setStartTag(byte startTag) { this.startTag = startTag; } public int getLength() { return length; } public void setLength(int length) { this.length = length; } @Override public String toString() { return "MsgHeader{" + "startTag=" + startTag + ", cmdCode=" + Arrays.toString(cmdCode) + ", version=" + Arrays.toString(version) + ", length=" + length + ‘}‘; } }
到此就完事了,netty的版本,和marshalling的版本,其他的版本我不清楚会不会有什么错误。
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <!--netty --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>5.0.0.Alpha2</version> </dependency> <!--jboss-marshalling --> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling-serial</artifactId> <version>2.0.0.Beta2</version> </dependency>
时间: 2024-10-10 12:36:28