Netty JDK序列化编解码传输对象

JDK序列化不需要额外的类库,只需要实现Serializable即可,但是序列化之后的码流只有Java才能反序列化,所以它不是跨语言的,另外由于Java序列化后码流比较大,效率也不高,所以在RPC中很少使用,本文只是做学习之用。

编解码器:

public class JdkDecoder extends MessageToMessageDecoder<ByteBuf> {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        final int length = byteBuf.readableBytes();
        final byte[] b = new byte[length];
        byteBuf.getBytes(byteBuf.readerIndex(), b, 0, length);

        ByteArrayInputStream bis = new ByteArrayInputStream(b);
        ObjectInputStream ois = new ObjectInputStream(bis);
        list.add(ois.readObject());
        ois.close();
    }
}

public class JdkEncoder extends MessageToByteEncoder<Object> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);

        oos.writeObject(o);
        oos.flush();
        byteBuf.writeBytes(bos.toByteArray());
        bos.close();
        oos.close();
    }
}

---

传输对象:

public class Person implements Serializable{

    private int age;

    private String name;

    private boolean man;

    private List<String> list;

    private Date birth;

    private Person son;

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    。。。

    @Override
    public String toString() {
        return "Person{" +
                "age=" + age +
                ", name=‘" + name + ‘\‘‘ +
                ", man=" + man +
                ", list=" + list +
                ", birth=" + birth +
                ", son=" + son +
                ‘}‘;
    }
}

---

Server端:

public class EchoServer {
    public static void main(String[] args) {
        new EchoServer().bind(8080);
    }

    public void bind(int port) {
        //配置服务端的线程组,一个用于服务端接收客户端连接,另一个进行SocketChannel的网络读写
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //ServerBootstrap用于启动NIO服务端的辅助启动类
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    //.handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //
                            ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));

                            //ch.pipeline().addLast(new MsgpackDecoder());
                            ch.pipeline().addLast(new JdkDecoder());

                            //在报文前增加2个字节,写消息长度
                            ch.pipeline().addLast(new LengthFieldPrepender(2));

                            //ch.pipeline().addLast(new MsgpackEncoder());
                            ch.pipeline().addLast(new JdkEncoder());

                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            //绑定端口,sync为同步阻塞方法,等待绑定成功,ChannelFuture用于异步操作的通知回调
            ChannelFuture future = bootstrap.bind(port).sync();
            System.out.println("server started");
            //等待服务端监听端口关闭
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println("server shuting down");
            //释放线程资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    int count = 0;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if(msg instanceof Person){
            Person p = (Person)msg;
            Q.p(p.toString());
        }else {
            System.out.println("The server received(" + count++ + "): " + msg);
        }

        ctx.writeAndFlush(msg);//异步发送
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

---

Client端:

public class EchoClient {

    public static void main(String[] args) {
        new EchoClient().connect("127.0.0.1", 8080);
    }

    public void connect(String host, int port) {
        //配置客户端NIO线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));

                            //ch.pipeline().addLast(new MsgpackDecoder());
                            ch.pipeline().addLast(new JdkDecoder());

                            ch.pipeline().addLast(new LengthFieldPrepender(2));

                            //ch.pipeline().addLast(new MsgpackEncoder());
                            ch.pipeline().addLast(new JdkEncoder());

                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            //发起异步连接操作,同步等待连接成功
            ChannelFuture future = bootstrap.connect(host, port).sync();
            System.out.println("client started");
            //等待客户端链路关闭
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println("client shuting down");
            //释放NIO线程组
            group.shutdownGracefully();
        }
    }
}

public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    private int count = 0;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        List l = new ArrayList<String>();
        l.add("abc");
        l.add("123");
        Person p = new Person();
        p.setName("luangeng");
        p.setMan(true);
        p.setBirth(new Date());
        p.setList(l);
        for (int i = 0; i < 10; i++) {
            p.setAge(i);
            ctx.write(p);
        }
        ctx.flush();
    }

    //服务端返回应答信息后调用
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if(msg instanceof Person){
            Person p = (Person)msg;
            Q.p(p.toString());
        }else {
            Q.p(count++ + " client get: " + msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

---

执行结果:

client started
Person{age=0, name=‘luangeng‘, man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=1, name=‘luangeng‘, man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=2, name=‘luangeng‘, man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=3, name=‘luangeng‘, man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=4, name=‘luangeng‘, man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=5, name=‘luangeng‘, man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=6, name=‘luangeng‘, man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=7, name=‘luangeng‘, man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=8, name=‘luangeng‘, man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=9, name=‘luangeng‘, man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}

MessagePack工具:

MessagePack是与JSON数据格式类似的二进制序列化格式,更快更小,并且是跨语言的,用于在多个语言之间交换数据。使用MessagePack实现的编解码器如下:

public class MsgpackEncoder extends MessageToByteEncoder<Object> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
        MessagePack mp = new MessagePack();
        byte[] raw = mp.write(o);
        byteBuf.writeBytes(raw);
    }
}

public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        final int length = byteBuf.readableBytes();
        final byte[] b = new byte[length];
        byteBuf.getBytes(byteBuf.readerIndex(), b, 0, length);
        MessagePack mp = new MessagePack();
        list.add(mp.read(b));
    }
}

---

使用这种编解码后,服务端和客户端接收到的对象都不能转换为Person对象。

end

时间: 2024-10-29 03:54:11

Netty JDK序列化编解码传输对象的相关文章

Netty编解码框架分析

1. 背景 1.1. 编解码技术 通常我们也习惯将编码(Encode)称为序列化(serialization),它将对象序列化为字节数组,用于网络传输.数据持久化或者其它用途. 反之,解码(Decode)/反序列化(deserialization)把从网络.磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷贝),以方便后续的业务逻辑操作. 进行远程跨进程服务调用时(例如RPC调用),需要使用特定的编解码技术,对需要进行网络传输的对象做编码或者解码,以便完成远程调用. 1.2. 常用的编解码框

【转】Netty系列之Netty编解码框架分析

http://www.infoq.com/cn/articles/netty-codec-framework-analyse/ 1. 背景 1.1. 编解码技术 通常我们也习惯将编码(Encode)称为序列化(serialization),它将对象序列化为字节数组,用于网络传输.数据持久化或者其它用途. 反之,解码(Decode)/反序列化(deserialization)把从网络.磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷贝),以方便后续的业务逻辑操作. 进行远程跨进程服务调用时(

Netty实战-对象编解码,Netty对象网络传递

书籍推荐:        实例代码 :http://download.csdn.net/detail/jiangtao_st/7677503 Server端代码 <span style="font-size:12px;">/** * * <p> * Netty Server Simple * </p> * * LineBasedFrameDecoder + 消息中得换行符 * * @author 卓轩 * @创建时间:2014年7月7日 * @ver

java编解码技术,netty nio

对于java提供的对象输入输出流ObjectInputStream与ObjectOutputStream,可以直接把java对象作为可存储的字节数组写入文件,也可以传输到网络上去.对与java开放人员来说,默认的jdk序列化机制可以避免操作底层的字节数组,从而提升开发效率. 1.为什么需要序列化 网络传输与对象序列化 2.java编解码技术指的什么 netty nio是基于网络传输,当进行远程跨进程服务调用时,需要把被传输的对象编码为字节数组或者bytebuffer对象.而当远程服务读取到byt

Netty 编解码技术 数据通信和心跳监控案例

Netty 编解码技术 数据通信和心跳监控案例 多台服务器之间在进行跨进程服务调用时,需要使用特定的编解码技术,对需要进行网络传输的对象做编码和解码操作,以便完成远程调用.Netty提供了完善,易扩展,易使用的编解码技术.本章除了介绍Marshalling的使用,还会基于编解码技术实现数据通信和心跳检测案例.通过本章,你将学到Java序列化的优缺点,主流编解码框架的特点,模拟特殊长连接通信,心跳监控案例.还在等什么,丰满的知识等你来拿! 技术:编解码,数据通信,心跳监控 说明:github上有完

Netty入门系列(3) --使用Netty进行编解码的操作

前言 何为编解码,通俗的来说,我们需要将一串文本信息从A发送到B并且将这段文本进行加工处理,如:A将信息文本信息编码为2进制信息进行传输.B接受到的消息是一串2进制信息,需要将其解码为文本信息才能正常进行处理. 上章我们介绍的Netty如何解决拆包和粘包问题,就是运用了解码的这一功能. java默认的序列化机制 使用Netty大多是java程序猿,我们基于一切都是对象的原则,经常会将对象进行网络传输,那么对于序列化操作肯定大家都是非常熟悉的. 一个对象是不能直接进行网络I/O传输的,jdk默认是

Google 的Protobuf 的使用方式(序列化和反序列化工具-编解码)

1.google的protobuf是什么? 用于rpc的自定义协议,体积更小,序列化和反序列化的第三方库,和apache thrift是同一种技术. 2.rpc库的介绍? (1) RMI    remote  method  invocation   广泛用于EJB,实际上是一种跨机器的调用,通过网络传输,调用方A调用序列化字节码传输到B机器反序列化,调用B的方法,B回传结果后序列化网路传输,A反序列化成最终结果. 限制 : 只针对于Java语言. 特点 :网络传输代码自动生成   client

Java网络编程-对象编解码方案、优劣对比

书籍推荐:   实例代码 :http://download.csdn.net/detail/jiangtao_st/7677503  用户对象 /** * * <p> *用户实体对象定义 * </p> * * @author 卓轩 * @创建时间:2014年6月20日 * @产品: UIC * @version: V1.0 */ public class UserDO implements Serializable { private static final long seria

Netty 4 传输对象

对于Netty在这里就不做过多介绍了,详情咨询http://netty.io/wiki/user-guide-for-4.x.html 我们在使用netty的过程中肯定会遇到传输对象的情况,Netty4通过ObjectEncoder和ObjectDecoder来支持. 首先我们定义一个User对象,一定要实现Serializable接口: import java.io.Serializable; /** * User: hupeng * Date: 14-6-3 * Time: 上午1:31 *