Netty整合SpringBoot并使用Protobuf进行数据传输

前言

本篇文章主要介绍的是SpringBoot整合Netty以及使用Protobuf进行数据传输的相关内容。Protobuf会介绍下用法,至于Netty在netty 之 telnet HelloWorld 详解中已经介绍过了,这里就不再过多细说了。

Protobuf

介绍

Protocol Buffer是Google的语言中立的,平台中立的,可扩展机制的,用于序列化结构化数据 - 对比XML,但更小,更快,更简单。您可以定义数据的结构化,然后可以使用特殊生成的源代码轻松地在各种数据流中使用各种语言编写和读取结构化数据。

官网地址: https://developers.google.com/protocol-buffers/

使用

这里的使用就只介绍Java相关的使用。具体protobuf3的使用可以看Protobuf 语言指南(proto3)。 首先我们需要在src/main文件夹下建立一个proto文件夹,然后在该文件夹新建一个user.proto文件,此文件定义我们需要传输的文件。

:使用grpc方式编译.proto时,会默认扫描src/main/proto文件夹下的protobuf文件。

例如我们需要定义一个用户的信息,包含的字段主要有编号、名称、年龄。 那么该protobuf文件的格式如下: :这里使用的是proto3,相关的注释我已写了,这里便不再过多讲述了。需要注意一点的是proto文件和生成的Java文件名称不能一致!

 1   //proto3语法注解:如果您不这样做,protobuf编译器将假定您正在使用proto2,这必须是文件的第一个非空的非注释行。
 2   syntax = "proto3";
 3   //生成的包名
 4   option java_package = "com.sanshengshui.netty.protobuf";
 5   //生成的java名
 6   option java_outer_classname = "UserMsg";
 7   ?
 8   message User{
 9       //ID
10       int32 id = 1;
11       //姓名
12       string name = 2;
13       //年龄
14       int32 age = 3;
15       //状态
16       int32 state = 4;
17   }

创建好该文件之后,我们cd到该工程的根目录下,执行mvn clean compile,输入完之后,回车即可在target文件夹中看到已经生成好的Java文件,然后直接在工程中使用此protobuf文件就可以了。因为能自动扫描到此类。详情请看下图:

注:生成protobuf的文件软件和测试的protobuf文件我也整合到该项目中了,可以直接获取的。

Java文件生成好之后,我们再来看怎么使用。 这里我就直接贴代码了,并且将注释写在代码中,应该更容易理解些吧。。。 代码示例:

  @RunWith(JUnit4.class)
  @Slf4j
  public class NettySpringbootProtostuffApplicationTests {
      @Test
      public void ProtobufTest() throws IOException {
          UserMsg.User.Builder userInfo = UserMsg.User.newBuilder();
          userInfo.setId(1);
          userInfo.setName("mushuwei");
          userInfo.setName("24");
          UserMsg.User user = userInfo.build();
          // 将数据写到输出流
          ByteArrayOutputStream output = new ByteArrayOutputStream();
          user.writeTo(output);
          // 将数据序列化后发送
          byte[] byteArray = output.toByteArray();
          // 接收到流并读取
          ByteArrayInputStream input = new ByteArrayInputStream(byteArray);
          // 反序列化
          UserMsg.User userInfo2 = UserMsg.User.parseFrom(input);
          log.info("id:" + userInfo2.getId());
          log.info("name:" + userInfo2.getName());
          log.info("age:" + userInfo2.getAge());
  ?
      }
  }

注:这里说明一点,因为protobuf是通过二进制进行传输,所以需要注意下相应的编码。还有使用protobuf也需要注意一下一次传输的最大字节长度。

输出结果:

  17:28:07.914 [main] INFO com.sanshengshui.nettyspringbootprotostuff.NettySpringbootProtostuffApplicationTests - id:1
  17:28:07.919 [main] INFO com.sanshengshui.nettyspringbootprotostuff.NettySpringbootProtostuffApplicationTests - name:24
  17:28:07.919 [main] INFO com.sanshengshui.nettyspringbootprotostuff.NettySpringbootProtostuffApplicationTests - age:0

Netty整合springboot并使用protobuf进行数据传输

说明:如果想直接获取工程那么可以直接跳到底部,通过链接下载工程代码。

开发准备

环境要求

JDK::1.8

Netty::4.0或以上(不包括5)

Protobuf:3.0或以上

如果对Netty不熟的话,可以看看我之前写的netty 之 telnet HelloWorld 详解。大神请无视~。~ 地址:https://www.cnblogs.com/sanshengshui/p/9726306.html

首先还是Maven的相关依赖:

      <properties>
          <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
          <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
          <java.version>1.8</java.version>
          <netty-all.version>4.1.29.Final</netty-all.version>
          <protobuf.version>3.6.1</protobuf.version>
          <grpc.version>1.15.0</grpc.version>
      </properties>
  ?
      <dependencies>
          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter</artifactId>
          </dependency>
  ?
          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-test</artifactId>
              <scope>test</scope>
          </dependency>
          <!--netty jar包导入-->
          <dependency>
              <groupId>io.netty</groupId>
              <artifactId>netty-all</artifactId>
              <version>${netty-all.version}</version>
          </dependency>
  ?
          <!--使用grpc优雅的编译protobuf-->
          <dependency>
              <groupId>com.google.protobuf</groupId>
              <artifactId>protobuf-java</artifactId>
              <version>${protobuf.version}</version>
          </dependency>
  ?
          <dependency>
              <groupId>io.grpc</groupId>
              <artifactId>grpc-netty</artifactId>
              <version>${grpc.version}</version>
          </dependency>
          <dependency>
              <groupId>io.grpc</groupId>
              <artifactId>grpc-protobuf</artifactId>
              <version>${grpc.version}</version>
          </dependency>
          <dependency>
              <groupId>io.grpc</groupId>
              <artifactId>grpc-stub</artifactId>
              <version>${grpc.version}</version>
          </dependency>
  ?
          <!--lombok用于日志,实体类的重复代码书写-->
          <dependency>
              <groupId>org.projectlombok</groupId>
              <artifactId>lombok</artifactId>
              <scope>provided</scope>
          </dependency>
      </dependencies>

添加了相应的maven依赖之后!我们还需要添加grpc优雅的编译protobuf的插件:

         <build>
          <extensions>
              <extension>
                  <groupId>kr.motd.maven</groupId>
                  <artifactId>os-maven-plugin</artifactId>
                  <version>1.5.0.Final</version>
              </extension>
          </extensions>
          <plugins>
              <plugin>
                  <groupId>org.apache.maven.plugins</groupId>
                  <artifactId>maven-compiler-plugin</artifactId>
                  <version>2.5.1</version>
                  <configuration>
                      <source>1.8</source>
                      <target>1.8</target>
                  </configuration>
              </plugin>
              <plugin>
                  <groupId>org.apache.maven.plugins</groupId>
                  <artifactId>maven-resources-plugin</artifactId>
                  <version>2.7</version>
              </plugin>
              <plugin>
                  <groupId>org.apache.maven.plugins</groupId>
                  <artifactId>maven-source-plugin</artifactId>
                  <version>2.2.1</version>
              </plugin>
              <plugin>
                  <groupId>org.apache.maven.plugins</groupId>
                  <artifactId>maven-jar-plugin</artifactId>
                  <version>3.0.2</version>
              </plugin>
              <plugin>
                  <groupId>org.apache.maven.plugins</groupId>
                  <artifactId>maven-assembly-plugin</artifactId>
                  <version>3.0.0</version>
              </plugin>
              <plugin>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-maven-plugin</artifactId>
              </plugin>
              <plugin>
                  <groupId>org.apache.maven.plugins</groupId>
                  <artifactId>maven-dependency-plugin</artifactId>
                  <executions>
                      <execution>
                          <id>copy-protoc</id>
                          <phase>generate-sources</phase>
                          <goals>
                              <goal>copy</goal>
                          </goals>
                          <configuration>
                              <artifactItems>
                                  <artifactItem>
                                      <groupId>com.google.protobuf</groupId>
                                      <artifactId>protoc</artifactId>
                                      <version>${protobuf.version}</version>
                                      <classifier>${os.detected.classifier}</classifier>
                                      <type>exe</type>
                                      <overWrite>true</overWrite>
                                      <outputDirectory>${project.build.directory}</outputDirectory>
                                  </artifactItem>
                              </artifactItems>
                          </configuration>
                      </execution>
                  </executions>
              </plugin>
              <plugin>
                  <groupId>org.xolstice.maven.plugins</groupId>
                  <artifactId>protobuf-maven-plugin</artifactId>
                  <version>0.5.0</version>
                  <configuration>
                      <!--
                        The version of protoc must match protobuf-java. If you don‘t depend on
                        protobuf-java directly, you will be transitively depending on the
                        protobuf-java version that grpc depends on.
                      -->
                      <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
                      </protocArtifact>
                      <pluginId>grpc-java</pluginId>
                      <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.0.0:exe:${os.detected.classifier}
                      </pluginArtifact>
                  </configuration>
                  <executions>
                      <execution>
                          <goals>
                              <goal>compile</goal>
                              <goal>compile-custom</goal>
                              <goal>test-compile</goal>
                          </goals>
                      </execution>
                  </executions>
              </plugin>
          </plugins>
      </build>

此外我们还需要对application.yml配置文件作一点修改:

  server:
    enabled: true
    bind_address: 0.0.0.0
    bind_port: 9876
    netty:
      #不进行内存泄露的检测
      leak_detector_level: DISABLED
      boss_group_thread_count: 1
      worker_group_thread_count: 12
      #最大负载大小
      max_payload_size: 65536

项目结构

    netty-springboot-protobuf
      ├── client
        ├── NettyClient.class -- 客户端启动类
        ├── NettyClientHandler.class -- 客户端逻辑处理类
        ├── NettyClientHandler.class -- 客户端初始化类
      ├── server
        ├── NettyServer.class -- 服务端启动类
        ├── NettyServerHandler -- 服务端逻辑处理类
        ├── NettyServerInitializer -- 服务端初始化类
      ├── proto
        ├── user.proto -- protobuf文件

代码编写

代码模块主要分为服务端和客户端。 主要实现的业务逻辑: 服务端启动成功之后,客户端也启动成功,这时服务端会发送一条protobuf格式的信息给客户端,然后客户端给予相应的应答。客户端与服务端连接成功之后,客户端每个一段时间会发送心跳指令给服务端,告诉服务端该客户端还存过中,如果客户端没有在指定的时间发送信息,服务端会关闭与该客户端的连接。当客户端无法连接到服务端之后,会每隔一段时间去尝试重连,只到重连成功!

服务端

首先是编写服务端的启动类,相应的注释在代码中写得很详细了,这里也不再过多讲述了。不过需要注意的是,在之前的我写的Netty文章中,是通过main方法直接启动服务端,因此是直接new一个对象的。而在和SpringBoot整合之后,我们需要将Netty交给springBoot去管理,所以这里就用了相应的注解。 代码如下:

 1  @Service("nettyServer")
 2   @Slf4j
 3   public class NettyServer {
 4       /**
 5        * 通过springboot读取静态资源,实现netty配置文件的读写
 6        */
 7   ?
 8       @Value("${server.bind_port}")
 9       private Integer port;
10   ?
11       @Value("${server.netty.boss_group_thread_count}")
12       private Integer bossGroupThreadCount;
13   ?
14       @Value("${server.netty.worker_group_thread_count}")
15       private Integer workerGroupThreadCount;
16   ?
17       @Value("${server.netty.leak_detector_level}")
18       private String leakDetectorLevel;
19   ?
20       @Value("${server.netty.max_payload_size}")
21       private Integer maxPayloadSize;
22   ?
23       private  ChannelFuture channelFuture;
24       private  EventLoopGroup bossGroup;
25       private  EventLoopGroup workerGroup;
26   ?
27   ?
28       @PostConstruct
29       public void init() throws Exception {
30               log.info("Setting resource leak detector level to {}",leakDetectorLevel);
31               ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
32   ?
33               log.info("Starting Server");
34               //创建boss线程组 用于服务端接受客户端的连接
35               bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
36               // 创建 worker 线程组 用于进行 SocketChannel 的数据读写
37               workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
38               // 创建 ServerBootstrap 对象
39               ServerBootstrap b = new ServerBootstrap();
40               //设置使用的EventLoopGroup
41               b.group(bossGroup, workerGroup)
42                       //设置要被实例化的为 NioServerSocketChannel 类
43                       .channel(NioServerSocketChannel.class)
44                       // 设置 NioServerSocketChannel 的处理器
45                       .handler(new LoggingHandler(LogLevel.INFO))
46                       // 设置连入服务端的 Client 的 SocketChannel 的处理器
47                       .childHandler(new NettyServerInitializer());
48               // 绑定端口,并同步等待成功,即启动服务端
49               channelFuture = b.bind(port).sync();
50   ?
51               log.info("Server started!");
52   ?
53       }
54   ?
55       @PreDestroy
56       public void shutdown() throws InterruptedException {
57           log.info("Stopping Server");
58           try {
59               // 监听服务端关闭,并阻塞等待
60               channelFuture.channel().closeFuture().sync();
61           } finally {
62               // 优雅关闭两个 EventLoopGroup 对象
63               workerGroup.shutdownGracefully();
64               bossGroup.shutdownGracefully();
65           }
66           log.info("server stopped!");
67   ?
68       }
69   ?
70   }
  ?

服务端主类编写完毕之后,我们再来设置下相应的过滤条件。 这里需要继承Netty中ChannelInitializer类,然后重写initChannel该方法,进行添加相应的设置,如心跳超时设置,传输协议设置,以及相应的业务实现类。 代码如下:

  public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
  ?
  ?
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
          ChannelPipeline ph = ch.pipeline();
  ?
          //入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
          ph.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
          // 解码和编码,应和客户端一致
          //传输的协议 Protobuf
          ph.addLast(new ProtobufVarint32FrameDecoder());
          ph.addLast(new ProtobufDecoder(UserMsg.User.getDefaultInstance()));
          ph.addLast(new ProtobufVarint32LengthFieldPrepender());
          ph.addLast(new ProtobufEncoder());
  ?
          //业务逻辑实现类
          ph.addLast("nettyServerHandler", new NettyServerHandler());
      }
  }
  ?

服务相关的设置的代码写完之后,我们再来编写主要的业务代码。 使用Netty编写业务层的代码,我们需要继承ChannelInboundHandlerAdapterSimpleChannelInboundHandler类,在这里顺便说下它们两的区别吧。 继承SimpleChannelInboundHandler类之后,会在接收到数据后会自动release掉数据占用的Bytebuffer资源。并且继承该类需要指定数据格式。 而继承ChannelInboundHandlerAdapter则不会自动释放,需要手动调用ReferenceCountUtil.release()等方法进行释放。继承该类不需要指定数据格式。 所以在这里,个人推荐服务端继承ChannelInboundHandlerAdapter,手动进行释放,防止数据未处理完就自动释放了。而且服务端可能有多个客户端进行连接,并且每一个客户端请求的数据格式都不一致,这时便可以进行相应的处理。 客户端根据情况可以继承SimpleChannelInboundHandler类。好处是直接指定好传输的数据格式,就不需要再进行格式的转换了。

代码如下:

 @Slf4j
  public class NettyServerHandler extends ChannelInboundHandlerAdapter {
      /** 空闲次数 */
      private AtomicInteger idle_count = new AtomicInteger(1);
      /** 发送次数 */
      private AtomicInteger count = new AtomicInteger(1);
  ?
  ?
      /**
       * 建立连接时,发送一条消息
       */
      @Override
      public void channelActive(ChannelHandlerContext ctx) throws Exception {
          log.info("连接的客户端地址:" + ctx.channel().remoteAddress());
          UserMsg.User user = UserMsg.User.newBuilder().setId(1).setAge(24).setName("穆书伟").setState(0).build();
          ctx.writeAndFlush(user);
          super.channelActive(ctx);
      }
  ?
      /**
       * 超时处理 如果5秒没有接受客户端的心跳,就触发; 如果超过两次,则直接关闭;
       */
      @Override
      public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
          if (obj instanceof IdleStateEvent) {
              IdleStateEvent event = (IdleStateEvent) obj;
              // 如果读通道处于空闲状态,说明没有接收到心跳命令
              if (IdleState.READER_IDLE.equals(event.state())) {
                  log.info("已经5秒没有接收到客户端的信息了");
                  if (idle_count.get() > 1) {
                      log.info("关闭这个不活跃的channel");
                      ctx.channel().close();
                  }
                  idle_count.getAndIncrement();
              }
          } else {
              super.userEventTriggered(ctx, obj);
          }
      }
  ?
      /**
       * 业务逻辑处理
       */
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
          log.info("第" + count.get() + "次" + ",服务端接受的消息:" + msg);
          try {
              // 如果是protobuf类型的数据
              if (msg instanceof UserMsg.User) {
                  UserMsg.User user = (UserMsg.User) msg;
                  if (user.getState() == 1) {
                      log.info("客户端业务处理成功!");
                  } else if(user.getState() == 2){
                      log.info("接受到客户端发送的心跳!");
                  }else{
                      log.info("未知命令!");
                  }
              } else {
                  log.info("未知数据!" + msg);
                  return;
              }
          } catch (Exception e) {
              e.printStackTrace();
          } finally {
              ReferenceCountUtil.release(msg);
          }
          count.getAndIncrement();
      }
  ?
      /**
       * 异常处理
       */
      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
          cause.printStackTrace();
          ctx.close();
      }
  }
还有个服务端的启动类,之前是通过main方法直接启动, 不过这里改成了通过springBoot进行启动,差别不大。 代码如下:

  @SpringBootApplication
  @ComponentScan({"com.sanshengshui.netty.server"})
  public class NettyServerApp {
      /**
       * @param args
       */
      public static void main(String[] args) {
         SpringApplication.run(NettyServerApp.class);
      }
  }
  ?

到这里服务端相应的代码就编写完毕了??。

客户端

客户端这边的代码和服务端的很多地方都类似,我就不再过多细说了,主要将一些不同的代码拿出来简单的讲述下。 首先是客户端的主类,基本和服务端的差不多,也就是多了监听的端口和一个监听器(用来监听是否和服务端断开连接,用于重连)。 主要实现的代码逻辑如下:

      
/**
       * 重连
       */
      public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) {
          try {
              if (bootstrap != null) {
                  bootstrap.group(eventLoopGroup);
                  bootstrap.channel(NioSocketChannel.class);
                  bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
                  bootstrap.handler(new NettyClientInitializer());
                  bootstrap.remoteAddress(host, port);
                  f = bootstrap.connect().addListener((ChannelFuture futureListener) -> {
                      final EventLoop eventLoop = futureListener.channel().eventLoop();
                      if (!futureListener.isSuccess()) {
                          log.info("与服务端断开连接!在10s之后准备尝试重连!");
                          eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS);
                      }
                  });
                  if(initFalg){
                      log.info("Netty客户端启动成功!");
                      initFalg=false;
                  }
              }
          } catch (Exception e) {
              log.info("客户端连接失败!"+e.getMessage());
          }
  ?
      }

注:监听器这块的实现用的是JDK1.8的写法。

客户端过滤其这块基本和服务端一直。不过需要注意的是,传输协议、编码和解码应该一致,还有心跳的读写时间应该小于服务端所设置的时间。 改动的代码如下:

1       ChannelPipeline ph = ch.pipeline();
2           /*
3            * 解码和编码,应和服务端一致
4            * */
5           //入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
6           ph.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));

客户端的业务代码逻辑。 主要实现的几点逻辑是心跳按时发送以及解析服务发送的protobuf格式的数据。 这里比服务端多个个注解, 该注解Sharable主要是为了多个handler可以被多个channel安全地共享,也就是保证线程安全。 废话就不多说了,代码如下:

 1       @ChannelHandler.Sharable
 2   @Slf4j
 3   public class NettyClientHandler extends ChannelInboundHandlerAdapter {
 4       @Autowired
 5       private NettyClient nettyClient;
 6   ?
 7       /** 循环次数 */
 8       private AtomicInteger fcount = new AtomicInteger(1);
 9   ?
10       /**
11        * 建立连接时
12        */
13       @Override
14       public void channelActive(ChannelHandlerContext ctx) throws Exception {
15           log.info("建立连接时:" + new Date());
16           ctx.fireChannelActive();
17       }
18   ?
19       /**
20        * 关闭连接时
21        */
22       @Override
23       public void channelInactive(ChannelHandlerContext ctx) throws Exception {
24           log.info("关闭连接时:" + new Date());
25           final EventLoop eventLoop = ctx.channel().eventLoop();
26           nettyClient.doConnect(new Bootstrap(), eventLoop);
27           super.channelInactive(ctx);
28       }
29   ?
30       /**
31        * 心跳请求处理 每4秒发送一次心跳请求;
32        *
33        */
34       @Override
35       public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
36           log.info("循环请求的时间:" + new Date() + ",次数" + fcount.get());
37           if (obj instanceof IdleStateEvent) {
38               IdleStateEvent event = (IdleStateEvent) obj;
39               // 如果写通道处于空闲状态,就发送心跳命令
40               if (IdleState.WRITER_IDLE.equals(event.state())) {
41                   UserMsg.User.Builder userState = UserMsg.User.newBuilder().setState(2);
42                   ctx.channel().writeAndFlush(userState);
43                   fcount.getAndIncrement();
44               }
45           }
46       }
47   ?
48       /**
49        * 业务逻辑处理
50        */
51       @Override
52       public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
53           // 如果不是protobuf类型的数据
54           if (!(msg instanceof UserMsg.User)) {
55               log.info("未知数据!" + msg);
56               return;
57           }
58           try {
59   ?
60               // 得到protobuf的数据
61               UserMsg.User userMsg = (UserMsg.User) msg;
62               // 进行相应的业务处理。。。
63               // 这里就从简了,只是打印而已
64               log.info(
65                       "客户端接受到的用户信息。编号:" + userMsg.getId() + ",姓名:" + userMsg.getName() + ",年龄:" + userMsg.getAge());
66   ?
67               // 这里返回一个已经接受到数据的状态
68               UserMsg.User.Builder userState = UserMsg.User.newBuilder().setState(1);
69               ctx.writeAndFlush(userState);
70               log.info("成功发送给服务端!");
71           } catch (Exception e) {
72               e.printStackTrace();
73           } finally {
74               ReferenceCountUtil.release(msg);
75           }
76       }
77   ?
78   }

那么到这里客户端的代码也编写完毕了??。

功能测试

protobuf传输

首先启动服务端,然后再启动客户端。 我们来看看结果是否如上述所说。

服务端输出结果:

2018-10-03 19:58:41.098  INFO 23644 --- [ntLoopGroup-3-1] c.s.netty.server.NettyServerHandler      : 第1次,服务端接受的消息:state: 1
  ?
  2018-10-03 19:58:41.098  INFO 23644 --- [ntLoopGroup-3-1] c.s.netty.server.NettyServerHandler      : 客户端业务处理成功!
  2018-10-03 19:58:45.058  INFO 23644 --- [ntLoopGroup-3-1] c.s.netty.server.NettyServerHandler      : 第2次,服务端接受的消息:state: 2
  ?
  2018-10-03 19:58:45.059  INFO 23644 --- [ntLoopGroup-3-1] c.s.netty.server.NettyServerHandler      : 接受到客户端发送的心跳!
  2018-10-03 19:58:49.060  INFO 23644 --- [ntLoopGroup-3-1] c.s.netty.server.NettyServerHandler      : 第3次,服务端接受的消息:state: 2
  ?
  2018-10-03 19:58:49.061  INFO 23644 --- [ntLoopGroup-3-1] c.s.netty.server.NettyServerHandler      : 接受到客户端发送的心跳!
  2018-10-03 19:58:53.063  INFO 23644 --- [ntLoopGroup-3-1] c.s.netty.server.NettyServerHandler      : 第4次,服务端接受的消息:state: 2
  ?
  2018-10-03 19:58:53.064  INFO 23644 --- [ntLoopGroup-3-1] c.s.netty.server.NettyServerHandler      : 接受到客户端发送的心跳!
  2018-10-03 19:58:57.066  INFO 23644 --- [ntLoopGroup-3-1] c.s.netty.server.NettyServerHandler      : 第5次,服务端接受的消息:state: 2

客户端输入结果:

 2018-10-03 19:58:40.733  INFO 23737 --- [           main] c.sanshengshui.netty.client.NettyClient  : Netty客户端启动成功!
  2018-10-03 19:58:40.897  INFO 23737 --- [ntLoopGroup-2-1] c.s.netty.client.NettyClientHandler      : 建立连接时:Wed Oct 03 19:58:40 CST 2018
  2018-10-03 19:58:41.033  INFO 23737 --- [ntLoopGroup-2-1] c.s.netty.client.NettyClientHandler      : 客户端接受到的用户信息。编号:1,姓名:穆书伟,年龄:24
  2018-10-03 19:58:41.044  INFO 23737 --- [ntLoopGroup-2-1] c.s.netty.client.NettyClientHandler      : 成功发送给服务端!
  2018-10-03 19:58:41.053  INFO 23737 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
  2018-10-03 19:58:41.067  INFO 23737 --- [           main] com.sanshengshui.netty.NettyClientApp    : Started NettyClientApp in 1.73 seconds (JVM running for 2.632)
  2018-10-03 19:58:45.054  INFO 23737 --- [ntLoopGroup-2-1] c.s.netty.client.NettyClientHandler      : 循环请求的时间:Wed Oct 03 19:58:45 CST 2018,次数1
  2018-10-03 19:58:49.057  INFO 23737 --- [ntLoopGroup-2-1] c.s.netty.client.NettyClientHandler      : 循环请求的时间:Wed Oct 03 19:58:49 CST 2018,次数2
  2018-10-03 19:58:53.060  INFO 23737 --- [ntLoopGroup-2-1] c.s.netty.client.NettyClientHandler      : 循环请求的时间:Wed Oct 03 19:58:53 CST 2018,次数3
  2018-10-03 19:58:57.063  INFO 23737 --- [ntLoopGroup-2-1] c.s.netty.client.NettyClientHandler      : 循环请求的时间:Wed Oct 03 19:58:57 CST 2018,次数4
  2018-10-03 19:59:01.066  INFO 23737 --- [ntLoopGroup-2-1] c.s.netty.client.NettyClientHandler      : 循环请求的时间:Wed Oct 03 19:59:01 CST 2018,次数5
  ?

通过打印信息可以看出如上述所说。

断线重连

接下来我们再来看看客户端是否能够实现重连。 先启动客户端,再启动服务端。

客户端输入结果:

  2018-10-03 20:02:33.549  INFO 23990 --- [ntLoopGroup-2-1] c.sanshengshui.netty.client.NettyClient  : 与服务端断开连接!在10s之后准备尝试重连!
  2018-10-03 20:02:43.571  INFO 23990 --- [ntLoopGroup-2-1] c.s.netty.client.NettyClientHandler      : 建立连接时:Wed Oct 03 20:02:43 CST 2018
  2018-10-03 20:02:43.718  INFO 23990 --- [ntLoopGroup-2-1] c.s.netty.client.NettyClientHandler      : 客户端接受到的用户信息。编号:1,姓名:穆书伟,年龄:24
  2018-10-03 20:02:43.727  INFO 23990 --- [ntLoopGroup-2-1] c.s.netty.client.NettyClientHandler      : 成功发送给服务端!
  2018-10-03 20:02:47.733  INFO 23990 --- [ntLoopGroup-2-1] c.s.netty.client.NettyClientHandler      : 循环请求的时间:Wed Oct 03 20:02:47 CST 2018,次数1
  2018-10-03 20:02:51.735  INFO 23990 --- [ntLoopGroup-2-1] c.s.netty.client.NettyClientHandler      : 循环请求的时间:Wed Oct 03 20:02:51 CST 2018,次数2
  ?

服务端输出结果:

  2018-10-03 20:02:43.661  INFO 24067 --- [ntLoopGroup-3-1] c.s.netty.server.NettyServerHandler      : 连接的客户端地址:/127.0.0.1:55690
  2018-10-03 20:02:43.760  INFO 24067 --- [ntLoopGroup-3-1] c.s.netty.server.NettyServerHandler      : 第1次,服务端接受的消息:state: 1
  ?
  2018-10-03 20:02:43.760  INFO 24067 --- [ntLoopGroup-3-1] c.s.netty.server.NettyServerHandler      : 客户端业务处理成功!
  2018-10-03 20:02:47.736  INFO 24067 --- [ntLoopGroup-3-1] c.s.netty.server.NettyServerHandler      : 第2次,服务端接受的消息:state: 2
  ?
  2018-10-03 20:02:47.737  INFO 24067 --- [ntLoopGroup-3-1] c.s.netty.server.NettyServerHandler      : 接受到客户端发送的心跳!
  2018-10-03 20:02:51.736  INFO 24067 --- [ntLoopGroup-3-1] c.s.netty.server.NettyServerHandler      : 第3次,服务端接受的消息:state: 2

结果也如上述所说!

读写超时

服务端输出结果:

  2018-10-03 20:12:19.193  INFO 24507 --- [ntLoopGroup-3-1] c.s.netty.server.NettyServerHandler      : 连接的客户端地址:/127.0.0.1:56132
  2018-10-03 20:12:24.173  INFO 24507 --- [ntLoopGroup-3-1] c.s.netty.server.NettyServerHandler      : 已经5秒没有接收到客户端的信息了
  2018-10-03 20:12:29.171  INFO 24507 --- [ntLoopGroup-3-1] c.s.netty.server.NettyServerHandler      : 已经5秒没有接收到客户端的信息了
  2018-10-03 20:12:29.172  INFO 24507 --- [ntLoopGroup-3-1] c.s.netty.server.NettyServerHandler      : 关闭这个不活跃的channel

telnet输出结果:

如下图:

其它

关于netty整合springboot并使用protobuf进行数据传输到这里就结束了。

netty整合springboot并使用protobuf进行数据传输 项目工程地址: https://github.com/sanshengshui/netty-learning-example/tree/master/netty-springboot-protobuf

对了,也有Netty整合的其他中间件项目工程地址: https://github.com/sanshengshui/netty-learning-example

原创不易,如果感觉不错,希望给个推荐!您的支持是我写作的最大动力!

版权声明: 作者:穆书伟

博客园出处:https://www.cnblogs.com/sanshengshui

github出处:https://github.com/sanshengshui    

个人博客出处:https://sanshengshui.github.io/

原文地址:https://www.cnblogs.com/sanshengshui/p/9740787.html

时间: 2024-10-05 04:19:53

Netty整合SpringBoot并使用Protobuf进行数据传输的相关文章

Netty学习篇--整合springboot

经过前面的netty学习,大概了解了netty各个组件的概念和作用,开始自己瞎鼓捣netty和我们常用的项目的整合(很简单的整合) 项目准备 工具:IDEA2017 jar包导入:maven 项目框架:springboot+netty 项目操作 右键创建一个maven项目,项目名称: hetangyuese-netty-03(项目已上传github) 项目完整结构 ? maven导包 <!-- netty start --> <dependency> <groupId>

dubbo入门学习(三)-----dubbo整合springboot

springboot节省了大量的精力去配置各种bean,因此通过一个简单的demo来整合springboot与dubbo 一.创建boot-user-service-provider 本篇博文基于上篇中的dubbo项目,整体工程如下: 1.pom.xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"

消息中间件——RabbitMQ(十)RabbitMQ整合SpringBoot实战!(全)

前言 1. SpringBoot整合配置详解 publisher-confirms,实现一个监听器用于监听Broker端给我们返回的确认请求:RabbitTemplate.ConfirmCallback publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功:RabbitTemplate.ReturnCallback 注意一点,在发送消息的时候对template进行配置mandatory=tr

Windows平台整合SpringBoot+KAFKA__环境配置部分

项目需要,需要整合 SpringBoot+KAFKA 我调查了一下,发现Linux中,要先装zoomkeeper,再装KAFKA,如  https://blog.csdn.net/zhangcongyi420/article/details/88674491 我CA,我的机器搞不动,而且要搞的话,也要搞好几个虚机一起弄个大数据平台环境,太麻烦,正考虑着呢,看到了下面 https://www.jianshu.com/p/5da86afed228 不错哦,试试 下载了zoopkeeper apach

基于Netty和SpringBoot实现一个轻量级RPC框架-Client篇

前提 前置文章: <基于Netty和SpringBoot实现一个轻量级RPC框架-协议篇> <基于Netty和SpringBoot实现一个轻量级RPC框架-Server篇> 前一篇文章相对简略地介绍了RPC服务端的编写,而这篇博文最要介绍服务端(Client)的实现.RPC调用一般是面向契约编程的,而Client的核心功能就是:把契约接口方法的调用抽象为使用Netty向RPC服务端通过私有协议发送一个请求.这里最底层的实现依赖于动态代理,因此动态代理是动态实现接口的最简单方式(如果

Netty(五)序列化protobuf在netty中的使用

protobuf是google序列化的工具,主要是把数据序列化成二进制的数据来传输用的.它主要优点如下: 1.性能好,效率高: 2.跨语言(java自带的序列化,不能跨语言) protobuf参考文档:Protobuf详解 其实,在netty中使用Protobuf需要注意的是: protobufDecoder仅仅负责编码,并不支持读半包,所以在之前,一定要有读半包的处理器. 有三种方式可以选择: 使用netty提供ProtobufVarint32FrameDecoder 继承netty提供的通用

初识elasticsearch_2(查询和整合springboot)

初始化 首先将官网所下载的json文件,放入到es中,采用如下命令: curl -H "Content-Type: application/json" -XPOST 'localhost:9200/bank/account/_bulk?pretty&refresh' --data-binary "@accounts.json" curl 'localhost:9200/_cat/indices?v' search API 接下来可以开始查询啦.可以通过2种方

教你 Shiro 整合 SpringBoot,避开各种坑

最近搞了下 Shiro 安全框架,找了一些网上的博客文章,但是一到自己实现的时候就遇到了各种坑,需要各种查资料看源码以及各种测试. 那么这篇文章就教大家如何将 Shiro 整合到 SpringBoot 中,并避开一些小坑,这次实现了基本的登陆以及角色权限,往后的文章也讲解了其他的功能,如 <教你 Shiro + SpringBoot 整合 JWT> 附上源码:https://github.com/HowieYuan/shiro 依赖包 <dependency> <groupI

使用RESTful风格整合springboot+mybatis

说明: 本文是springboot和mybatis的整合,Controller层使用的是RESTful风格,数据连接池使用的是c3p0,通过postman进行测试 项目结构如下: 1.引入pom.xml依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency