Google Protobuf在Netty中的使用

[toc]


Google Protobuf在Netty中的使用

程序代码来自于《Netty权威指南》第8章,已经加了注释,不过需要注意的是,使用的proto源代码是在Google Protobuf入门与使用中生成的,关于protobuf代码自动生成工具的使用可以参考这篇文章。

例子中,通过解码器ProtobufVarint32FrameDecoder和编码器ProtobufVarint32LengthFieldPrepender的使用已经解决了半包问题,测试时可以把其注释掉,这样就可以演示Netty中使用Protobuf出现的TCP粘包问题。

同时,通过protobuf的使用,也可以深刻感受到,其在Netty中的使用确实非常简单,编解码、半包问题,只需要添加相关的处理器即可,而且它可以方便地实现跨语言的远程服务调用。(protobuf本身提供了对不同语言的支持)

但其实在使用时会发现有一个问题,就是编解码的对象是需要使用其生成的特定的proto对象来进行操作的,也就是说,需要编写.proto文件,再通过protoc来生成相应语言的代码文件,显然这样做还是会有些麻烦(虽然其实也还好,不算麻烦),有没有方便点的方法呢?后面通过protostuff的使用即可解决这个问题。

服务端

SubReqServer.java

package cn.xpleaf.subscribe;

import cn.xpleaf.protobuf.SubscribeReqProto;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class SubReqServer {
    public void bind(int port) throws Exception {
        // 配置服务端的NIO线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                // 添加日志处理器
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {

                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // 添加ProtobufVarint32FrameDecoder,主要用于Protobuf的半包处理
                        ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                        // 添加ProtobufDecoder解码器,它的参数是com.google.protobuf.MessageLite
                        // 实际上就是要告诉ProtobufDecoder需要解码的目标类是什么,否则仅仅从字节数组中是
                        // 无法判断出要解码的目标类型信息的(服务端需要解析的是客户端请求,所以是Req)
                        ch.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));
                        /**
                         * 来自源码的代码注释,用于Protobuf的半包处理
                         * * An encoder that prepends the the Google Protocol Buffers
                         * <a href="https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints">Base
                         * 128 Varints</a> integer length field. For example:
                         * <pre>
                         * BEFORE ENCODE (300 bytes)       AFTER ENCODE (302 bytes)
                         * +---------------+               +--------+---------------+
                         * | Protobuf Data |-------------->| Length | Protobuf Data |
                         * |  (300 bytes)  |               | 0xAC02 |  (300 bytes)  |
                         * +---------------+               +--------+---------------+
                         * </pre> *
                         */
                        ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                        // 添加ProtobufEncoder编码器,这样就不需要对SubscribeResp进行手工编码
                        ch.pipeline().addLast(new ProtobufEncoder());
                        // 添加业务处理handler
                        ch.pipeline().addLast(new SubReqServerHandler());
                    }
                });

            // 绑定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();

            // 等待服务端监听端口关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if(args != null && args.length > 0) {
            try {
                port = Integer.valueOf(port);
            } catch (NumberFormatException e) {
                // TODO: handle exception
            }
        }
        new SubReqServer().bind(port);
    }
}

SubReqServerHandler.java

package cn.xpleaf.subscribe;

import cn.xpleaf.protobuf.SubscribeReqProto;
import cn.xpleaf.protobuf.SubscribeRespProto;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class SubReqServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 由于ProtobufDecoder已经对消息进行了自动解码,因此接收到的订购请求消息可以直接使用
     * 对用户名进行校验,校验通过后构造应答消息返回给客户端,由于使用了ProtobufEncoder,
     * 所以不需要对SubscribeRespProto.SubscribeResp进行手工编码
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq)msg;
        String username = req.getUserName();
        if("xpleaf".equalsIgnoreCase(username)) {
            System.out.println("Service accept client subscribe req : [" + req.toString() + "]");
            ctx.writeAndFlush(resp(req.getSubReqID()));
        }
    }

    /**
     * 构建SubscribeRespProto.SubscribeResp对象
     * @param subReqID
     * @return
     */
    private SubscribeRespProto.SubscribeResp resp(int subReqID) {
        SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder();
        builder.setSubReqID(subReqID);
        builder.setRespCode(0);
        builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
        return builder.build();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 发生异常,关闭链路
        ctx.close();
    }
}

客户端

SubReqClient.java

package cn.xpleaf.subscribe;

import cn.xpleaf.protobuf.SubscribeRespProto;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class SubReqClient {
    public void connect(String host, int port) throws Exception {
        // 配置客户端NIO线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                // 设置TCP连接超时时间
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                .handler(new ChannelInitializer<SocketChannel>() {

                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // 添加ProtobufVarint32FrameDecoder,主要用于Protobuf的半包处理
                        ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                        // 添加ProtobufDecoder解码器,它的参数是com.google.protobuf.MessageLite
                        // 实际上就是要告诉ProtobufDecoder需要解码的目标类是什么,否则仅仅从字节数组中是
                        // 无法判断出要解码的目标类型信息的(客户端需要解析的是服务端请求,所以是Resp)
                        ch.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance()));
                        /**
                         * 来自源码的代码注释,用于Protobuf的半包处理
                         * * An encoder that prepends the the Google Protocol Buffers
                         * <a href="https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints">Base
                         * 128 Varints</a> integer length field. For example:
                         * <pre>
                         * BEFORE ENCODE (300 bytes)       AFTER ENCODE (302 bytes)
                         * +---------------+               +--------+---------------+
                         * | Protobuf Data |-------------->| Length | Protobuf Data |
                         * |  (300 bytes)  |               | 0xAC02 |  (300 bytes)  |
                         * +---------------+               +--------+---------------+
                         * </pre> *
                         */
                        ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                        // 添加ProtobufEncoder编码器,这样就不需要对SubscribeResp进行手工编码
                        ch.pipeline().addLast(new ProtobufEncoder());
                        // 添加业务处理handler
                        ch.pipeline().addLast(new SubReqClientHandler());
                    }
                });
            // 发起异步连接操作
            ChannelFuture f = b.connect(host, port).sync();

            // 等待客户端链路关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放NIO线程组
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if(args != null && args.length > 0) {
            try {
                port = Integer.valueOf(port);
            } catch (NumberFormatException e) {
                // 采用默认值
            }
        }
        new SubReqClient().connect("localhost", port);
    }
}

SubReqClientHandler.java

package cn.xpleaf.subscribe;

import java.util.ArrayList;
import java.util.List;

import cn.xpleaf.protobuf.SubscribeReqProto;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class SubReqClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        for(int i = 0; i < 10; i++) {
            ctx.write(subReq(i));
        }
        ctx.flush();
    }

    /**
     * 构建SubscribeReqProto.SubscribeReq对象
     * @param i
     * @return
     */
    private SubscribeReqProto.SubscribeReq subReq(int i) {
        SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
        builder.setSubReqID(i);
        builder.setUserName("xpleaf");
        builder.setProductName("Netty Book For Protobuf");
        List<String> address = new ArrayList<>();
        address.add("NanJing YuHuaTai");
        address.add("BeiJing LiuLiChange");
        address.add("ShenZhen HongShuLin");
        builder.addAllAddress(address);
        return builder.build();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Service accept server subscribe response : [" + msg + "]");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

测试

服务端输出如下:

Service accept client subscribe req : [subReqID: 0
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 1
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 2
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 3
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 4
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 5
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 6
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 7
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 8
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]
Service accept client subscribe req : [subReqID: 9
userName: "xpleaf"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChange"
address: "ShenZhen HongShuLin"
]

客户端输出如下:

Service accept server subscribe response : [subReqID: 0
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 1
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 2
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 3
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 4
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 5
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 6
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 7
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 8
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
Service accept server subscribe response : [subReqID: 9
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]

原文地址:http://blog.51cto.com/xpleaf/2071715

时间: 2024-11-08 20:53:23

Google Protobuf在Netty中的使用的相关文章

Netty(五)序列化protobuf在netty中的使用

protobuf是google序列化的工具,主要是把数据序列化成二进制的数据来传输用的.它主要优点如下: 1.性能好,效率高: 2.跨语言(java自带的序列化,不能跨语言) protobuf参考文档:Protobuf详解 其实,在netty中使用Protobuf需要注意的是: protobufDecoder仅仅负责编码,并不支持读半包,所以在之前,一定要有读半包的处理器. 有三种方式可以选择: 使用netty提供ProtobufVarint32FrameDecoder 继承netty提供的通用

《Dotnet9》系列-Google ProtoBuf在C#中的简单应用

时间如流水,只能流去不流回! 点赞再看,养成习惯,这是您给我创作的动力! 本文 Dotnet9 https://dotnet9.com 已收录,站长乐于分享dotnet相关技术,比如Winform.WPF.ASP.NET Core等,亦有C++桌面相关的Qt Quick和Qt Widgets等,只分享自己熟悉的.自己会的. 简介 什么是 Google Protocol Buffer? 假如您在网上搜索,应该会得到类似这样的文字介绍: Google Protocol Buffer( 简称 Prot

Google Protobuf 使用 Java 版

一 . Protobuf 的入门 Protobuf 是一个灵活,高效,结构化的数据序列化框架, 相比于 XML 等传统的序列化工具,它更小,更快,更灵活,更简单. Protobuf 支持数据结构化一次可以到处使用.甚至跨语言使用.同通过代码生成工具可以自动生成不同语言版本的源代码,甚至可以在使用不同版本的数据结构中进行数据传递,实现数据结构的向前兼容. Google 的 protobuf 在业界非常流行,很多商业项目选择 protobuf 作为编码解码框架,这里我们一起回顾一下 Protobuf

vs2012 编译 并使用 google protobuf

参考网址: http://my.oschina.net/chenleijava/blog/261263 http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/ http://blog.csdn.net/yi_ya/article/details/40404231 http://blog.csdn.net/yi_ya/article/details/40404059 http://blog.csdn.net/chenkjiang/article/d

GOOGLE PROTOBUF开发者指南

ProtoBuf开发者指南 译者: gashero 目录 1   概览 1.1   什么是protocol buffer 1.2   他们如何工作 1.3   为什么不用XML? 1.4   听起来像是为我的解决方案,如何开始? 1.5   一点历史 2   语言指导 2.1   定义一个消息类型 2.2   值类型 2.3   可选字段与缺省值 2.4   枚举 2.5   使用其他消息类型 2.6   嵌套类型 2.7   更新一个数据类型 2.8   扩展 2.9   包 2.10   定

VS下使用Google Protobuf完成SOCKET通信

如何在Windows环境下的VS中安装使用Google Protobuf完成SOCKET通信 出处:如何在Windows环境下的VS中安装使用Google Protobuf完成SOCKET通信 最近一段时间,由于项目的需要,接触到了Protobuf这个东东,在Linux环境下,体验了一把,感觉挺不错,很方便,且高效.是一个非常值得学习掌握和应用的数据抽象.平台无关.功能强大.…(此处省略1000字)的开源工具. Google虽然把Protobuf做成了跨平台.跨语言,但作为微软的死对头,它在re

google protobuf 简单实例

1.定义proto文件: User.proto package netty; option java_package="myprotobuf"; option java_outer_classname="UserProto"; message User{ required int32 ID=1; required string userName=2; required string Password=3; repeated string address=4; } 2

在Android Studio配置google protobuf

1.在project的build.gradle中配置 buildscript { repositories { jcenter() mavenCentral() } dependencies { classpath 'com.android.tools.build:gradle:2.2.3' classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.0' } } 注意 Gradle版本至少是 2.12 并且Java 7,本例子使用的是2.

Google protobuf proto文件编写规则

转载自: http://blog.csdn.net/yi_ya/article/details/40404231 1. 简单介绍 protobuf文件:就是定义你要的消息(类似java中的类)和消息中的各个字段及其数据类型(类似java类中的成员变量和他的数据类型) 2. Protobuf消息定义 消息由至少一个字段组合而成,类似于C语言中的结构.每个字段都有一定的格式. 字段格式:限定修饰符① | 数据类型② | 字段名称③ | = | 字段编码值④ | [字段默认值⑤] 1)限定修饰符包含