Netty5 序列化方式(Jboss Marshalling)

Netty作为很多高性能的底层通讯工具,被很多开发框架应用再底层,今天来说说常用的序列化工具,用Jboss的Marshalling。

直接上代码,Marshalling的工厂类

package com.netty.serialize.marshalling;

import io.netty.handler.codec.marshalling.*;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

/**
 * Created by sdc on 2017/8/28.
 */
public class MarshallingCodeCFactory {

    /**
     * 解码
     * @return
     */
    public static MarshallingDecoder buildMarshallingDecoder() {
        //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        //创建了MarshallingConfiguration对象,配置了版本号为5
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        //根据marshallerFactory和configuration创建provider
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
        //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
        return decoder;
    }

    /**
     * 编码
     * @return
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }

}

这个是Marshalling的序列化方式,Marshalling自带编解码,所以不用担心中途编解码半包的问题。

服务端的Server实现:

package com.netty.serialize.server;

import com.netty.serialize.coder.MsgDecoder;
import com.netty.serialize.coder.MsgEncoder;
import com.netty.serialize.handler.ServerHandler;
import com.netty.serialize.marshalling.MarshallingCodeCFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * Created by sdc on 2017/8/26.
 */
public class MsgServer {

    public void bind(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap sb = new ServerBootstrap();
            sb.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
//                    .childHandler(new ChildChannelHandler())
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                            channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                            channel.pipeline().addLast(new ServerHandler());
                        }
                    })
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture cf = sb.bind(port).sync();
            System.out.println("服务端已启动");

            cf.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static class ChildChannelHandler extends ChannelInitializer {

        protected void initChannel(Channel channel) throws Exception {
            channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
            channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
            channel.pipeline().addLast(new ServerHandler());
        }

    }

    public static void main(String[] args){
        try {
            new MsgServer().bind(8080);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
package com.netty.serialize.handler;

import com.netty.serialize.message.Message;
import com.netty.serialize.message.MsgHeader;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
 * 用于测试服务端实现的
 * Created by sdc on 2017/8/29.
 */
public class ServerHandler extends ChannelHandlerAdapter{

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
//        System.out.println("active");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
//        ctx.close();
        super.exceptionCaught(ctx, cause);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Message newMsg = (Message)msg;
//        String msgStrClient = (String)msg;
        System.out.println("获取客户端里的内容:" + newMsg);

        Message message = new Message();
        String msgStr = "客户端接受到通知";
        MsgHeader header = new MsgHeader();
        header.setStartTag(new Byte("0"));
        header.setCmdCode("1234".getBytes());
        header.setLength(msgStr.length());
        header.setVersion("11".getBytes());

        message.setBody(msgStr);
        message.setHeader(header);

        ctx.writeAndFlush(message);
    }

}

客户端的实现:

package com.netty.serialize.client;

import com.netty.serialize.coder.MsgDecoder;
import com.netty.serialize.coder.MsgEncoder;
import com.netty.serialize.handler.ClientHandler;
import com.netty.serialize.handler.ServerHandler;
import com.netty.serialize.marshalling.MarshallingCodeCFactory;
import com.netty.serialize.message.Message;
import com.netty.serialize.message.MsgHeader;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * Created by sdc on 2017/8/26.
 */
public class MsgClient {

    public void connect(String ip, int port) throws Exception {
        EventLoopGroup workerGroup = new NioEventLoopGroup();

//        Message message = new Message();
//        String msgStr = "我想发送一条消息";
//        MsgHeader header = new MsgHeader();
//        header.setStartTag(new Byte("0"));
//        header.setCmdCode("1234".getBytes());
//        header.setLength(msgStr.length());
//        header.setVersion("11".getBytes());
//
//        message.setBody(msgStr);
//        message.setHeader(header);
        try {
            Bootstrap bs = new Bootstrap();
            bs.group(workerGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)//
                    .handler(new ChildChannelHandler());

            ChannelFuture f = bs.connect(ip,port).sync();

            //写入消息
//            f.channel().writeAndFlush(message).sync();

            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

    public static class ChildChannelHandler extends ChannelInitializer {
        protected void initChannel(Channel channel) throws Exception {
            channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
            channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
            channel.pipeline().addLast(new ClientHandler());
        }
    }

    public static void main(String[] args){
        try {
            new MsgClient().connect("127.0.0.1", 8080);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
package com.netty.serialize.handler;

import com.netty.serialize.message.Message;
import com.netty.serialize.message.MsgHeader;
import io.netty.channel.*;
import io.netty.util.ReferenceCountUtil;

/**
 * Created by sdc on 2017/8/29.
 */
public class ClientHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Message message = new Message();
        String msgStr = "我想发送一条消息";
        MsgHeader header = new MsgHeader();
        header.setStartTag(new Byte("0"));
        header.setCmdCode("1234".getBytes());
        header.setLength(msgStr.length());
        header.setVersion("11".getBytes());

        message.setBody(msgStr);
        message.setHeader(header);
        ctx.writeAndFlush(message).addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    // do sth
                    System.out.println("成功发送到服务端消息");
                } else {
                    // do sth
                    System.out.println("失败服务端消息失败");
                }
            }
        });
//        ctx.writeAndFlush(message);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//        ctx.close();
        super.exceptionCaught(ctx, cause);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            Message newMsg = (Message) msg;
            System.out.println("收到服务端的内容" + newMsg);
        }finally {
            ReferenceCountUtil.release(msg);
        }
    }

}

传输的POJO的类,是自定义的封装好的信息。

package com.netty.serialize.message;

import java.io.Serializable;

/**
 * Created by sdc on 2017/8/26.
 */
public class Message implements Serializable{

    /**
     *
     */
    private static final long serialVersionUID = 4923081103118853877L;

    private MsgHeader header;

    private Object body;

    //检验和
//    private byte crcCode;

//    public byte getCrcCode() {
//        return crcCode;
//    }
//
//    public void setCrcCode(byte crcCode) {
//        this.crcCode = crcCode;
//    }

    public MsgHeader getHeader() {
        return header;
    }

    public void setHeader(MsgHeader header) {
        this.header = header;
    }

    public Object getBody() {
        return body;
    }

    public void setBody(Object body) {
        this.body = body;
    }

    @Override
    public String toString() {
        return "Message{" +
                "header=" + header +
                ", body=" + body +
//                ", crcCode=" + crcCode +
                ‘}‘;
    }
}
package com.netty.serialize.message;

import java.io.Serializable;
import java.util.Arrays;

/**
 * Created by sdc on 2017/8/26.
 */
public class MsgHeader implements Serializable{

    /**
     *
     */
    private static final long serialVersionUID = 4923081103118853877L;

    //固定头
    private byte startTag;

    //命令码,4位
    private byte[] cmdCode;

    //版本 2位
    private byte[] version;

    private int length;

    public byte[] getVersion() {
        return version;
    }

    public void setVersion(byte[] version) {
        this.version = version;
    }

    public byte[] getCmdCode() {
        return cmdCode;
    }

    public void setCmdCode(byte[] cmdCode) {
        this.cmdCode = cmdCode;
    }

    public byte getStartTag() {
        return startTag;
    }

    public void setStartTag(byte startTag) {
        this.startTag = startTag;
    }

    public int getLength() {
        return length;
    }

    public void setLength(int length) {
        this.length = length;
    }

    @Override
    public String toString() {
        return "MsgHeader{" +
                "startTag=" + startTag +
                ", cmdCode=" + Arrays.toString(cmdCode) +
                ", version=" + Arrays.toString(version) +
                ", length=" + length +
                ‘}‘;
    }
}

到此就完事了,netty的版本,和marshalling的版本,其他的版本我不清楚会不会有什么错误。

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.11</version>
    <scope>test</scope>
</dependency>

<!--netty -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>5.0.0.Alpha2</version>
</dependency>

<!--jboss-marshalling -->
<dependency>
    <groupId>org.jboss.marshalling</groupId>
    <artifactId>jboss-marshalling-serial</artifactId>
    <version>2.0.0.Beta2</version>
</dependency>
时间: 2024-10-10 12:36:28

Netty5 序列化方式(Jboss Marshalling)的相关文章

netty 的 JBoss Marshalling 编码解码

一. JBoss Marshalling 简介. JBoss Marshalling 是一个Java 对象序列化包,对 JDK 默认的序列化框架进行了优化,但又保持跟 Java.io.Serializable 接口的兼容,同时增加了一些可调的参数和附件的特性, 这些参数和附加的特性, 这些参数和特性可通过工厂类进行配置. 二. JBoss Marshalling 的使用. 1. 下载  org.jboss.marshalling <dependency> <groupId>org.

Android中两种序列化方式的比较Serializable和Parcelable

Serializable和Parcelable接口可以完成对象的序列化过程,当我们需要通过Intent和Binder传输数据时就需要使用者两种序列化方式.还有,我们需要对象持久化到存储设备或者通过网络传输给其他客户端,这个使用也需要使用Serializale来完成对象的序列化.在Android应用开发中,这两种方式都很常见,但两者方式并不相同. 1.Serializable接口 Serializable接口是Java提供的一个序列化接口,它是一个空接口,为对象提供标准的序列化和反序列化操作.使用

几种Java序列化方式的实现

0.前言 本文主要对几种常见Java序列化方式进行实现.包括Java原生以流的方法进行的序列化.Json序列化.FastJson序列化.Protobuff序列化. 1.Java原生序列化 Java原生序列化方法即通过Java原生流(InputStream和OutputStream之间的转化)的方式进行转化.需要注意的是JavaBean实体类必须实现Serializable接口,否则无法序列化.Java原生序列化代码示例如下所示: package serialize; import java.io

Android 进阶6:两种序列化方式 Serializable 和 Parcelable

什么是序列化 我们总是说着或者听说着"序列化",它的定义是什么呢? 序列化 (Serialization)将对象的状态信息转换为可以存储或传输的形式的过程.在序列化期间,对象将其当前状态写入到临时或持久性存储区.以后,可以通过从存储区中读取或反序列化对象的状态,重新创建该对象. 二进制序列化保持类型保真度,这对于在应用程序的不同调用之间保留对象的状态很有用.例如,通过将对象序列化到剪贴板,可在不同的应用程序之间共享对象.您可以将对象序列化到流.磁盘.内存和网络等等.远程处理使用序列化&

.net 各种序列化方式效率对比

在服务与服务之间传输的是二进制数据,而在此之前有多种方法将数据内容进行序列化来减小数据传递大小,现针对于目前主流的几种序列化方式做了简单数据统计对比. 先做下简单介绍↓↓↓ 1.protobuf-net protobuf-net is a contract based serializer for .NET code, that happens to write data in the "protocol buffers" serialization format engineered

移动App通讯协议及序列化方式的选择

简单列一下不同协议,序列化方式等的考虑. http还是私有协议? http协议优/缺点: 在服务器端只需要提供一份接口,浏览器和app共用.在app中嵌入web view也很容易. http协议的相关工具非常多.开发人员很方便 .比如负载均衡,直接nginx搞定. 比如统计一个接口的调用次数,相当的方便,有现在的分析工具. 压力测试也很方便. http协议可能http服务器有漏洞,但是这种漏洞真的是很少. 手机可以设置http proxy,这对于某些用户可能是很关键的.如果是私有协议,设置htt

Redis 序列化方式StringRedisSerializer、FastJsonRedisSerializer和KryoRedisSerializer

当我们的数据存储到Redis的时候,我们的键(key)和值(value)都是通过Spring提供的Serializer序列化到数据库的.RedisTemplate默认使用的是JdkSerializationRedisSerializer,StringRedisTemplate默认使用的是StringRedisSerializer. Spring Data JPA为我们提供了下面的Serializer:GenericToStringSerializer.Jackson2JsonRedisSeria

改变可识别redis序列化方式

原来系统所看到的是jdk的默认序列化方式,需要更改设置才可以变为可识别的字体 package com.redisSeri.data.redis; import com.fasterxml.jackson.annotation.JsonAutoDetect;import com.fasterxml.jackson.annotation.PropertyAccessor;import com.fasterxml.jackson.databind.ObjectMapper;import org.spr

Spring Cache Redis 修改序列化方式

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId&