Netty学习——Netty和Protobuf的整合(一)

Netty学习——Netty和Protobuf的整合



Protobuf作为序列化的工具,将序列化后的数据,通过Netty来进行在网络上的传输

1.将proto文件里的java包的位置修改一下,然后再执行一下protoc

异常捕获:启动服务器端正常,在启动客户端的时候,发送消息,报错

警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.DecoderException: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:98)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:313)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:427)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
    at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
    at com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:102)
    at com.google.protobuf.CodedInputStream$ArrayDecoder.readTag(CodedInputStream.java:627)
    at com.dawa.netty.sixthexample.MyDataInfo$Person.<init>(MyDataInfo.java:109)
    at com.dawa.netty.sixthexample.MyDataInfo$Person.<init>(MyDataInfo.java:69)
    at com.dawa.netty.sixthexample.MyDataInfo$Person$1.parsePartialFrom(MyDataInfo.java:881)
    at com.dawa.netty.sixthexample.MyDataInfo$Person$1.parsePartialFrom(MyDataInfo.java:875)
    at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:158)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:191)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:197)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
    at io.netty.handler.codec.protobuf.ProtobufDecoder.decode(ProtobufDecoder.java:121)
    at io.netty.handler.codec.protobuf.ProtobufDecoder.decode(ProtobufDecoder.java:65)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
    ... 23 more

十二月 03, 2019 11:01:51 上午 io.netty.channel.DefaultChannelPipeline onUnhandledInboundException
警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.DecoderException: com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field.  This could mean either that the input has been truncated or that an embedded message misreported its own length.
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:98)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:300)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
    at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field.  This could mean either that the input has been truncated or that an embedded message misreported its own length.
    at com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:84)
    at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawByte(CodedInputStream.java:1238)
    at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawVarint64SlowPath(CodedInputStream.java:1126)
    at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawVarint32(CodedInputStream.java:1020)
    at com.google.protobuf.CodedInputStream$ArrayDecoder.readTag(CodedInputStream.java:623)
    at com.dawa.netty.sixthexample.MyDataInfo$Person.<init>(MyDataInfo.java:109)
    at com.dawa.netty.sixthexample.MyDataInfo$Person.<init>(MyDataInfo.java:69)
    at com.dawa.netty.sixthexample.MyDataInfo$Person$1.parsePartialFrom(MyDataInfo.java:881)
    at com.dawa.netty.sixthexample.MyDataInfo$Person$1.parsePartialFrom(MyDataInfo.java:875)
    at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:158)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:191)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:197)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
    at io.netty.handler.codec.protobuf.ProtobufDecoder.decode(ProtobufDecoder.java:121)
    at io.netty.handler.codec.protobuf.ProtobufDecoder.decode(ProtobufDecoder.java:65)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
    ... 21 more

开始找问题:
1.io.netty.handler.codec.DecoderException: 在解码过程中出错,可能是服务器端出的错
2.Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
3.While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either that the input has been truncated or that an embedded message misreported its own length.

检查了一下代码:猜测了一下:成了,效果图如下

可怕,原因竟然出现在编码器添加的顺序不同导致的: 正确的顺序是这样的:

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    //这里和之前的不一样了,但也就是处理器的不一样
    //编解码处理器,protobuf提供的专门的编解码器.4个处理器
    pipeline.addLast(new ProtobufVarint32FrameDecoder());
    //Decoder是重点,解码器,将字节码转换成想要的数据类型
    //参数  messageLite,外层的要转换的类的实例
    pipeline.addLast(new ProtobufDecoder(MyDataInfo.Person.getDefaultInstance()));
    pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
    pipeline.addLast(new ProtobufEncoder());
    pipeline.addLast(new TestServerHandler());
}

接下来存放一下完整的客户端和服务器端的代码
proto文件

yntax ="proto2";

package com.dawa.protobuf;

option optimize_for = SPEED;
option java_package ="com.dawa.netty.sixthexample";
option java_outer_classname = "MyDataInfo";

message Person{
    required string name = 1;
    optional int32 age = 2;
    optional string address = 3;
}

message Person2{
    required string name = 1;
    optional int32 age = 2;
    optional string address = 3;
}

服务器端代码:

package com.dawa.netty.sixthexample;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * @Title: TestServer
 * @Author: 大娃
 * @Date: 2019/12/3 10:01
 * @Description:
 */
public class TestServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
            .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new TestServerInitializer());

            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
package com.dawa.netty.sixthexample;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * @Title: TestServerHandler
 * @Author: 大娃
 * @Date: 2019/12/3 10:09
 * @Description: handler本身是个泛型,这里的泛型就是取 要处理的类型
 */
public class TestServerHandler extends SimpleChannelInboundHandler<MyDataInfo.Person> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.Person msg) throws Exception {
        //对接受到的消息进行处理:
        System.out.println(msg.getName());
        System.out.println(msg.getAge());
        System.out.println(msg.getAddress());
    }
}
package com.dawa.netty.sixthexample;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

/**
 * @Title: TestServerInitializer
 * @Author: 大娃
 * @Date: 2019/12/3 10:05
 * @Description:
 */
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //这里和之前的不一样了,但也就是处理器的不一样
        //编解码处理器,protobuf提供的专门的编解码器.4个处理器
        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        //Decoder是重点,解码器,将字节码转换成想要的数据类型
        //参数  messageLite,外层的要转换的类的实例
        pipeline.addLast(new ProtobufDecoder(MyDataInfo.Person.getDefaultInstance()));
        pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
        pipeline.addLast(new ProtobufEncoder());
        pipeline.addLast(new TestServerHandler());
    }
}

客户端代码

package com.dawa.netty.sixthexample;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * @Title: TestClient
 * @Author: 大娃
 * @Date: 2019/12/3 10:15
 * @Description: 客户端内容
 */
public class TestClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
            .handler(new TestClientInitializer());
            //注意此处,使用的是connect,不是使用的bind
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}
package com.dawa.netty.sixthexample;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

/**
 * @Title: TestClientInitializer
 * @Author: 大娃
 * @Date: 2019/12/3 10:43
 * @Description:
 */
public class TestClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //这里和之前的不一样了,但也就是处理器的不一样
        //编解码处理器,protobuf提供的专门的编解码器.4个处理器
        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        //Decoder是重点,解码器,将字节码转换成想要的数据类型
        //参数  messageLite,外层的要转换的类的实例
        pipeline.addLast(new ProtobufDecoder(MyDataInfo.Person.getDefaultInstance()));
        pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
        pipeline.addLast(new ProtobufEncoder());
        //自己的处理器
        pipeline.addLast(new TestClientHandler());
    }
}
package com.dawa.netty.sixthexample;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * @Title: TestClientHandler
 * @Author: 大娃
 * @Date: 2019/12/3 10:44
 * @Description:
 */
public class TestClientHandler extends SimpleChannelInboundHandler<MyDataInfo.Person> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.Person msg) throws Exception {

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //处于活动状态
        MyDataInfo.Person person = MyDataInfo.Person.newBuilder().setName("大娃").setAge(22).setAddress("北京").build();
        ctx.channel().writeAndFlush(person);
    }
}

这程序是有瑕疵的,解码器那里不通用,耦合性太强,有两个很明显的问题,但是要怎么解决呢?

会专门写个帖子进行分析,Netty学习——Netty和Protobuf的整合(二)

原文地址:https://www.cnblogs.com/bigbaby/p/11978746.html

时间: 2024-10-10 12:15:59

Netty学习——Netty和Protobuf的整合(一)的相关文章

Netty学习篇--整合springboot

经过前面的netty学习,大概了解了netty各个组件的概念和作用,开始自己瞎鼓捣netty和我们常用的项目的整合(很简单的整合) 项目准备 工具:IDEA2017 jar包导入:maven 项目框架:springboot+netty 项目操作 右键创建一个maven项目,项目名称: hetangyuese-netty-03(项目已上传github) 项目完整结构 ? maven导包 <!-- netty start --> <dependency> <groupId>

Netty学习总结(1)--Netty入门介绍

1.Netty是什么? Netty是一个基于JAVA NIO类库的异步通信框架,它的架构特点是:异步非阻塞.基于事件驱动.高性能.高可靠性和高可定制性. 2.使用Netty能够做什么? 开发异步.非阻塞的TCP网络应用程序: 开发异步.非阻塞的UDP网络应用程序: 开发异步文件传输应用程序: 开发异步HTTP服务端和客户端应用程序: 提供对多种编解码框架的集成,包括谷歌的Protobuf.Jboss marshalling.Java序列化.压缩编解码.XML解码.字符串编解码等,这些编解码框架可

netty 学习记录一

最近在学习netty相关知识,觉得<netty 权威指南>这本书还是挺好的,适合我这种初学者.加上netty本身自带的许多例子,学起来还是挺有兴趣的.简单记录下, 一般服务器代码如下: public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerB

netty 学习资料

最近在做一个网页遥控器的项目,用到了netty,但还未发现比较系统完整的netty博客教程,所以打算自己写一个netty学习教程,会每天更新一篇,欢迎交流. 先给大家提供一些资料: 1. 比较简短易懂的有实例的系列教程,涉及到了netty关键特性:但个人觉得比较速成,不系统,不深入 http://www.coderli.com/netty-course-hello-world http://www.coderli.com/netty-two-concepts http://www.coderli

一起学Netty(十)之 Netty使用Google的ProtoBuf

protobuf是由Google开发的一套对数据结构进行序列化的方法,可用做通信协议,数据存储格式,等等.其特点是不限语言.不限平台.扩展性强 Netty也提供了对Protobuf的天然支持,我们今天就写一个简单的示例,简单地了解一下Netty对Google的protoBuf的支持 我们的示例场景很简单的:客户端发送一个信息,这个信息用Protobuf来做序列化,然后服务器端接收这个信息,解码,读取信息 protobuf与xml,json这样的数据格式一样,都有自己的一套语法,且语法很简单,很容

netty学习资源收集

Netty学习笔记 Netty+In+Action中文版.pdf

Netty学习——基于netty实现简单的客户端聊天小程序

Netty学习——基于netty实现简单的客户端聊天小程序 效果图,聊天程序展示 (TCP编程实现) 后端代码: package com.dawa.netty.chatexample; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEven

Netty学习——通过websocket编程实现基于长连接的双攻的通信

Netty学习(一)基于长连接的双攻的通信,通过websocket编程实现 效果图,客户端和服务器端建立起长连接,客户端发送请求,服务器端响应 但是目前缺少心跳,如果两个建立起来的连接,一个断网之后,另外一个是感知不到对方已经断掉的.以后使用心跳技术来进行连接检测 须知: 状态码101,代表 协议转换,从HTTP协议升级为WebSocket协议 HTTP协议,一般访问的时候:是 Http://localhost:8080/ws WebSocket协议,访问的时候,需要是:ws://localho

Netty学习(1):IO模型之BIO

概述 Netty其实就是一个异步的.基于事件驱动的框架,其作用是用来开发高性能.高可靠的IO程序. 因此下面就让我们从Java的IO模型来逐步深入学习Netty. IO模型 IO模型简单来说,就是采用如何的方式来进行数据的接受和发送,因为存在着发送方和接收方两个角色,因此IO模型一般分为以下3类:BIO(同步阻塞).NIO(同步非阻塞).AIO(异步非阻塞).其3者的区别如下: BIO:是最传统的Java IO模型,采用的方式为服务器新接受到一个连接,就建立一个线程,但是如果这个连接不做任何事情