Netty实践(三):实际场景下的数据通信

数据通信的场景:长连接 OR 短连接

在实际场景中,我们如何使用Netty进行通信呢?大致有3种方式:

第一种,使用长连接通道不断开的形式进行通信,也就是服务器和客户端的通道一直处于开启的状态。如果服务器性能足够好,并且我们的客户端数量也比较少的情况下,是适合使用长连接的通道。

第二种,采用短连接方式,一次性批量提交数据,也就是我们会把数据保存在本地临时缓冲区或者临时表里。当达到数量时,就进行批量提交;或者通过定时任务轮询提交。这种情况是有弊端的,就是无法做到实时传输。如果应用程序对实时性要求不高,可以考虑使用。

第三种,采用一种特殊的长连接。特殊在哪里呢?在指定的某一时间之内,服务器与某台客户端没有任何通信,则断开连接,如果断开连接后,客户端又需要向服务器发送请求,那么再次建立连接。这里有点CachedThreadPool的味道。

本篇博客将采用Netty来实现第三种方式的数据通信,接下来我们一起来看看吧~

Netty数据通信代码实例

请求消息对象

package day3;

import java.io.Serializable;

public class Request implements Serializable{

   private static final long  SerialVersionUID = 1L;
   
   private String id ;
   private String name ;
   private String requestMessage ;
   
   public String getId() {
      return id;
   }
   public void setId(String id) {
      this.id = id;
   }
   public String getName() {
      return name;
   }
   public void setName(String name) {
      this.name = name;
   }
   public String getRequestMessage() {
      return requestMessage;
   }
   public void setRequestMessage(String requestMessage) {
      this.requestMessage = requestMessage;
   }

}

响应消息对象

package day3;

import java.io.Serializable;

public class Response implements Serializable{
   
   private static final long serialVersionUID = 1L;
   
   private String id;
   private String name;
   private String responseMessage;
   
   public String getId() {
      return id;
   }
   public void setId(String id) {
      this.id = id;
   }
   public String getName() {
      return name;
   }
   public void setName(String name) {
      this.name = name;
   }
   public String getResponseMessage() {
      return responseMessage;
   }
   public void setResponseMessage(String responseMessage) {
      this.responseMessage = responseMessage;
   }
   

}

编解码处理器

package day3;

import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;

import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

/**
 * Marshalling工厂
 */
public final class MarshallingCodeCFactory {

    /**
     * 创建Jboss Marshalling解码器MarshallingDecoder
     * @return MarshallingDecoder
     */
    public static MarshallingDecoder buildMarshallingDecoder() {
       //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
      final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
      //创建了MarshallingConfiguration对象,配置了版本号为5 
      final MarshallingConfiguration configuration = new MarshallingConfiguration();
      configuration.setVersion(5);
      //根据marshallerFactory和configuration创建provider
      UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
      //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
      MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
      return decoder;
    }

    /**
     * 创建Jboss Marshalling编码器MarshallingEncoder
     * @return MarshallingEncoder
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
      final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
      final MarshallingConfiguration configuration = new MarshallingConfiguration();
      configuration.setVersion(5);
      MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
      //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
      MarshallingEncoder encoder = new MarshallingEncoder(provider);
      return encoder;
    }
}

注意,在上一篇博客《Netty实践(二):TCP拆包、粘包问题》中,我们是自己继承ByteToMessageDecoder、MessageToByteEncoder来实现ByteBuff与消息对象的转化的,其实这是有点麻烦的。在实际中,我们完全可以利用相关序列化框架(JBoss Marshlling/Protobuf/Kryo/MessagePack)来帮助我们快速完成编解码,这里我使用的是JBoss Marshalling(jboss-marshalling-1.3.0.CR9.jar+jboss-marshalling-serial-1.3.0.CR9.jar)。具体来说,客户端和服务端交互的消息对象只需要实现JDK默认的序列化接口,同时利用JBoss Marshalling 生成编码器和解码器,用于后续Client/Server端即可。

Client Handler

package day3;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class ClientHandler extends ChannelHandlerAdapter{
   
   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      try {
         Response resp = (Response)msg;
         System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());       
      } finally {
         ReferenceCountUtil.release(msg);
      }
   }

   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
      ctx.close();
   }
   
}

在这里可以清楚的看到,我们直接将Object转化成了自定义消息响应对象,可见JBoss Marshalling与Netty结合后,编解码是如此简单。

Client

package day3;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;

import java.util.concurrent.TimeUnit;

/**
 *
 */
public class Client {
   
   private static class SingletonHolder {
      static final Client instance = new Client();
   }
   
   public static Client getInstance(){
      return SingletonHolder.instance;
   }
   
   private EventLoopGroup group;
   private Bootstrap b;

   //通过ChannelFuture实现读写操作
   private ChannelFuture cf ;
   
   private Client(){
         group = new NioEventLoopGroup();
         b = new Bootstrap();
         b.group(group)
          .channel(NioSocketChannel.class)
          .handler(new LoggingHandler(LogLevel.INFO))
          .handler(new ChannelInitializer<SocketChannel>() {
               @Override
               protected void initChannel(SocketChannel sc) throws Exception {

                  sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                  sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                  //超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭相应的通道,主要为减小服务端资源占用)
                  sc.pipeline().addLast(new ReadTimeoutHandler(3)); 
                  sc.pipeline().addLast(new ClientHandler());
               }
          });
   }
   
   public void connect(){
      try {
         this.cf = b.connect("127.0.0.1", 8765).sync();
         System.out.println("远程服务器已经连接, 可以进行数据交换..");            
      } catch (Exception e) {
         e.printStackTrace();
      }
   }

   //这里是通道关闭,再次建立连接的核心代码
   public ChannelFuture getChannelFuture(){
      
      if(this.cf == null){
         this.connect();
      }
      if(!this.cf.channel().isActive()){
         this.connect();
      }
      
      return this.cf;
   }
   
   public static void main(String[] args) throws Exception{

      final Client c = Client.getInstance();

      //注意client好像没有调用connect()方法进行连接,但是实际上在下面的代码中做了
      ChannelFuture cf = c.getChannelFuture();

      for(int i = 1; i <= 3; i++ ){
         Request request = new Request();
         request.setId("" + i);
         request.setName("pro" + i);
         request.setRequestMessage("数据信息" + i);
         cf.channel().writeAndFlush(request);
         TimeUnit.SECONDS.sleep(4);
      }

      cf.channel().closeFuture().sync();
      
      //通道关闭后,通过另一个线程模拟客户端再次建立连接发送请求
      new Thread(new Runnable() {
         @Override
         public void run() {
            try {
               System.out.println("进入子线程...");
               ChannelFuture cf = c.getChannelFuture();
               System.out.println(cf.channel().isActive());
               System.out.println(cf.channel().isOpen());
               
               //再次发送数据
               Request request = new Request();
               request.setId("" + 4);
               request.setName("pro" + 4);
               request.setRequestMessage("数据信息" + 4);
               cf.channel().writeAndFlush(request);               
               cf.channel().closeFuture().sync();
               System.out.println("子线程结束...");

            } catch (InterruptedException e) {
               e.printStackTrace();
            }
         }
      }).start();
      
      System.out.println("断开连接,主线程结束..");
      
   }
   
   
   
}

这里对Client进行了初步的封装,采用静态内部类实现单例。

Client的Handler不仅仅有Marshalling的编解码器,还加入了Netty自带的ReadTimeoutHandler,这是客户端与服务端一段时间没有通信就断开连接的依据。从这里也看到Netty的强大之处了,通过提供一些预定义的Handler让你的代码变得简单,只需要专注于业务实现即可。客户端超时断开通道后,如何再次建立连接进行通信呢?通过getChannelFuture()你会知道。

客户端代码模拟了一个线程通信超时,关闭通道后,另一个线程与服务器端再次通信。

Server Handler

package day3;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class ServerHandler extends ChannelHandlerAdapter{

   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      Request request = (Request)msg;
      System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage());
      Response response = new Response();
      response.setId(request.getId());
      response.setName("response" + request.getId());
      response.setResponseMessage("响应内容" + request.getId());
      ctx.writeAndFlush(response);//.addListener(ChannelFutureListener.CLOSE);
   }

   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
      ctx.close();
   }

   
   
}

Server

package day3;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;

public class Server {

   public static void main(String[] args) throws Exception{
      
      EventLoopGroup pGroup = new NioEventLoopGroup();
      EventLoopGroup cGroup = new NioEventLoopGroup();
      
      ServerBootstrap b = new ServerBootstrap();
      b.group(pGroup, cGroup)
       .channel(NioServerSocketChannel.class)
       .option(ChannelOption.SO_BACKLOG, 1024)
       //设置日志
       .handler(new LoggingHandler(LogLevel.INFO))
       .childHandler(new ChannelInitializer<SocketChannel>() {
         protected void initChannel(SocketChannel sc) throws Exception {
            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
            sc.pipeline().addLast(new ReadTimeoutHandler(3));
            sc.pipeline().addLast(new ServerHandler());
         }
      });
      
      ChannelFuture cf = b.bind(8765).sync();
      
      cf.channel().closeFuture().sync();
      pGroup.shutdownGracefully();
      cGroup.shutdownGracefully();
      
   }
}

运行结果

说明:由于客户端一开始是发送3条消息给服务端,但是每条消息发送间隔4S,由于超时设置为3S,于是发送第一条消息后,通道便断开连接。接下来,客户端又启动了一个线程再次与服务端通信。

到这里,这篇博客就结束了,对你有用吗?

下周我们再来看Netty在心跳检测方面的应用,^_^

时间: 2024-10-13 19:01:24

Netty实践(三):实际场景下的数据通信的相关文章

混合云场景下容器技术在新能源功率预测产品中的最佳实践

能源互联网是物联网和"互联网+"在能源行业深度融合的产物,是中国制造2025的重要组成部分,我们现在还处于能源互联网的早期阶段.绝大部分能源行业的应用都部署在私有局域网内,并且网络结构异常复杂,这是阻碍互联网技术在能源行业落地的最大挑战. 6月28日,金风科技数据平台架构师张利出席了Rancher Labs举办的Container Day 2018容器技术大会,并做了题为<混合云场景下容器技术在新能源功率预测产品中的最佳实践>的演讲. 金风科技是中国成立最早.自主研发能力最

自己总结的C#编码规范--3.特定场景下的命名最佳实践

特定场景下的命名最佳实践 命名空间 要使用PascalCasing,并用点号来分隔名字空间中的各个部分. 如Microsof.Office.PowerPoint 要用公司名作为命名空间的前缀,这样就可以避免与另外一家公司使用相同的名字. 要用稳定的,与版本无关的产品名称作为命名空间的第二层 不要使用公司的组织架构来决定命名空间的层次结构,因为内部组织结构经常改变. 不要用相同的名字来命名命名空间和该空间内的类型. 例如,不要先将命名空间命名为Debug,然后又在该空间中提供Debug类.大部分编

【转】MySQL乐观锁在分布式场景下的实践

背景 在电商购物的场景下,当我们点击购物时,后端服务就会对相应的商品进行减库存操作.在单实例部署的情况,我们可以简单地使用JVM提供的锁机制对减库存操作进行加锁,防止多个用户同时点击购买后导致的库存不一致问题. 但在实践中,为了提高系统的可用性,我们一般都会进行多实例部署.而不同实例有各自的JVM,被负载均衡到不同实例上的用户请求不能通过JVM的锁机制实现互斥. 因此,为了保证在分布式场景下的数据一致性,我们一般有两种实践方式:一.使用MySQL乐观锁:二.使用分布式锁. 本文主要介绍MySQL

高并发场景下请求合并的实践

前言 项目中一般会请求第三方的接口,也会对外提供接口,可能是RPC,也可能是HTTP等方式.在对外提供接口时,有必要提供相应的批量接口,好的批量实现能够提升性能. 高并发场景中,调用批量接口相比调用非批量接口有更大的性能优势.但有时候,请求更多的是单个接口,不能够直接调用批量接口,如果这个接口是高频接口,对其做请求合并就很有必要了.比如电影网站的获取电影详情接口,APP的一次请求是单个接口调用,用户量少的时候请求也不多,完全没问题:但同一时刻往往有大量用户访问电影详情,是个高并发的高频接口,如果

51信用卡金融风控场景下实时计算引擎的设计与实践

https://mp.weixin.qq.com/s/Rx43XfhgdwerQWLn1eI3Ww 51信用卡金融风控场景下实时计算引擎的设计与实践 原创: 周来 51NB技术 5月7日 原文地址:https://www.cnblogs.com/yuanjiangw/p/10961583.html

秒杀场景下MySQL的低效(转)

秒杀场景下MySQL的低效 2016-01-14 17:12 178人阅读 评论(0) 收藏 举报 最近业务试水电商,接了一个秒杀的活.之前经常看到淘宝的同行们讨论秒杀,讨论电商,这次终于轮到我们自己理论结合实际一次了. ps:进入正文前先说一点个人感受,之前看淘宝的ppt感觉都懂了,等到自己出解决方案的时候发现还是有很多想不到的地方其实都没懂,再次验证了"细节是魔鬼"的理论.并且一个人的能力有限,只有大家一起讨论才能想的更周全,更细致.好了,闲话少说,下面进入正文. 一.秒杀带来了什

还没弄懂分布式场景下数据一致性问题?一文教你轻松解决!

文章纲要 此次分享的缘由 目前分布式事务问题是怎么解决的 行业中有什么解决方案 这些解决方案分别有什么优缺点 别人是怎么做的 我们可以怎么来做 此次分享的缘由 支付重构 考虑支付重构的时候,自然想到原本属于一个本地事务中的处理,现在要跨应用了要怎么处理.拿充值订单举个栗子吧,假设:原本订单模块和账户模块是放在一起的,现在需要做服务拆分,拆分成订单服务,账户服务.原本收到充值回调后,可以将修改订单状态和增加金币放在一个mysql事务中完成的,但是呢,因为服务拆分了,就面临着需要协调2个服务才能完成

多线程场景下延迟初始化的策略

1.什么是延迟初始化 延迟初始化(lazy initialization,即懒加载)是延迟到需要域的值时才将它初始化的行为.如果永远不需要这个值,这个域就永远不会被初始化.这种方法既静态域,也适用于实例域. 最好建议“除非绝对必要,否则就不要这么做”. 2.延迟初始化线程安全的一个策略:同步 延迟初始化的一个好处,是当域只在类的实例部分被访问,并且初始化这个域的开销很高,那就可能值得进行延迟初始化. 但是在大多数情况下,正常的初始化要优先于延迟初始化.因为在多线程的场景下,采用某种形式的同步是很

MVC项目实践,在三层架构下实现SportsStore-02,DbSession层、BLL层

SportsStore是<精通ASP.NET MVC3框架(第三版)>中演示的MVC项目,在该项目中涵盖了MVC的众多方面,包括:使用DI容器.URL优化.导航.分页.购物车.订单.产品管理.图像上传......是不错的MVC实践项目,但该项目不是放在多层框架下开发的,离真实项目还有一段距离.本系列将尝试在多层框架下实现SportsStore项目,并用自己的方式实现一些功能. 本篇为系列第二篇,包括: ■ 4.三层架构设计    □ 4.2 创建DbSession层 数据访问层的统一入口