数据通信的场景:长连接 OR 短连接
在实际场景中,我们如何使用Netty进行通信呢?大致有3种方式:
第一种,使用长连接通道不断开的形式进行通信,也就是服务器和客户端的通道一直处于开启的状态。如果服务器性能足够好,并且我们的客户端数量也比较少的情况下,是适合使用长连接的通道。
第二种,采用短连接方式,一次性批量提交数据,也就是我们会把数据保存在本地临时缓冲区或者临时表里。当达到数量时,就进行批量提交;或者通过定时任务轮询提交。这种情况是有弊端的,就是无法做到实时传输。如果应用程序对实时性要求不高,可以考虑使用。
第三种,采用一种特殊的长连接。特殊在哪里呢?在指定的某一时间之内,服务器与某台客户端没有任何通信,则断开连接,如果断开连接后,客户端又需要向服务器发送请求,那么再次建立连接。这里有点CachedThreadPool的味道。
本篇博客将采用Netty来实现第三种方式的数据通信,接下来我们一起来看看吧~
Netty数据通信代码实例
请求消息对象
package day3; import java.io.Serializable; public class Request implements Serializable{ private static final long SerialVersionUID = 1L; private String id ; private String name ; private String requestMessage ; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getRequestMessage() { return requestMessage; } public void setRequestMessage(String requestMessage) { this.requestMessage = requestMessage; } }
响应消息对象
package day3; import java.io.Serializable; public class Response implements Serializable{ private static final long serialVersionUID = 1L; private String id; private String name; private String responseMessage; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getResponseMessage() { return responseMessage; } public void setResponseMessage(String responseMessage) { this.responseMessage = responseMessage; } }
编解码处理器
package day3; import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider; import io.netty.handler.codec.marshalling.MarshallerProvider; import io.netty.handler.codec.marshalling.MarshallingDecoder; import io.netty.handler.codec.marshalling.MarshallingEncoder; import io.netty.handler.codec.marshalling.UnmarshallerProvider; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; import org.jboss.marshalling.MarshallingConfiguration; /** * Marshalling工厂 */ public final class MarshallingCodeCFactory { /** * 创建Jboss Marshalling解码器MarshallingDecoder * @return MarshallingDecoder */ public static MarshallingDecoder buildMarshallingDecoder() { //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); //创建了MarshallingConfiguration对象,配置了版本号为5 final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); //根据marshallerFactory和configuration创建provider UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024); return decoder; } /** * 创建Jboss Marshalling编码器MarshallingEncoder * @return MarshallingEncoder */ public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组 MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
注意,在上一篇博客《Netty实践(二):TCP拆包、粘包问题》中,我们是自己继承ByteToMessageDecoder、MessageToByteEncoder来实现ByteBuff与消息对象的转化的,其实这是有点麻烦的。在实际中,我们完全可以利用相关序列化框架(JBoss Marshlling/Protobuf/Kryo/MessagePack)来帮助我们快速完成编解码,这里我使用的是JBoss Marshalling(jboss-marshalling-1.3.0.CR9.jar+jboss-marshalling-serial-1.3.0.CR9.jar)。具体来说,客户端和服务端交互的消息对象只需要实现JDK默认的序列化接口,同时利用JBoss Marshalling 生成编码器和解码器,用于后续Client/Server端即可。
Client Handler
package day3; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public class ClientHandler extends ChannelHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { Response resp = (Response)msg; System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage()); } finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
在这里可以清楚的看到,我们直接将Object转化成了自定义消息响应对象,可见JBoss Marshalling与Netty结合后,编解码是如此简单。
Client
package day3; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import java.util.concurrent.TimeUnit; /** * */ public class Client { private static class SingletonHolder { static final Client instance = new Client(); } public static Client getInstance(){ return SingletonHolder.instance; } private EventLoopGroup group; private Bootstrap b; //通过ChannelFuture实现读写操作 private ChannelFuture cf ; private Client(){ group = new NioEventLoopGroup(); b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); //超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭相应的通道,主要为减小服务端资源占用) sc.pipeline().addLast(new ReadTimeoutHandler(3)); sc.pipeline().addLast(new ClientHandler()); } }); } public void connect(){ try { this.cf = b.connect("127.0.0.1", 8765).sync(); System.out.println("远程服务器已经连接, 可以进行数据交换.."); } catch (Exception e) { e.printStackTrace(); } } //这里是通道关闭,再次建立连接的核心代码 public ChannelFuture getChannelFuture(){ if(this.cf == null){ this.connect(); } if(!this.cf.channel().isActive()){ this.connect(); } return this.cf; } public static void main(String[] args) throws Exception{ final Client c = Client.getInstance(); //注意client好像没有调用connect()方法进行连接,但是实际上在下面的代码中做了 ChannelFuture cf = c.getChannelFuture(); for(int i = 1; i <= 3; i++ ){ Request request = new Request(); request.setId("" + i); request.setName("pro" + i); request.setRequestMessage("数据信息" + i); cf.channel().writeAndFlush(request); TimeUnit.SECONDS.sleep(4); } cf.channel().closeFuture().sync(); //通道关闭后,通过另一个线程模拟客户端再次建立连接发送请求 new Thread(new Runnable() { @Override public void run() { try { System.out.println("进入子线程..."); ChannelFuture cf = c.getChannelFuture(); System.out.println(cf.channel().isActive()); System.out.println(cf.channel().isOpen()); //再次发送数据 Request request = new Request(); request.setId("" + 4); request.setName("pro" + 4); request.setRequestMessage("数据信息" + 4); cf.channel().writeAndFlush(request); cf.channel().closeFuture().sync(); System.out.println("子线程结束..."); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); System.out.println("断开连接,主线程结束.."); } }
这里对Client进行了初步的封装,采用静态内部类实现单例。
Client的Handler不仅仅有Marshalling的编解码器,还加入了Netty自带的ReadTimeoutHandler,这是客户端与服务端一段时间没有通信就断开连接的依据。从这里也看到Netty的强大之处了,通过提供一些预定义的Handler让你的代码变得简单,只需要专注于业务实现即可。客户端超时断开通道后,如何再次建立连接进行通信呢?通过getChannelFuture()你会知道。
客户端代码模拟了一个线程通信超时,关闭通道后,另一个线程与服务器端再次通信。
Server Handler
package day3; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class ServerHandler extends ChannelHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Request request = (Request)msg; System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage()); Response response = new Response(); response.setId(request.getId()); response.setName("response" + request.getId()); response.setResponseMessage("响应内容" + request.getId()); ctx.writeAndFlush(response);//.addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
Server
package day3; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; public class Server { public static void main(String[] args) throws Exception{ EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //设置日志 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ReadTimeoutHandler(3)); sc.pipeline().addLast(new ServerHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
运行结果
说明:由于客户端一开始是发送3条消息给服务端,但是每条消息发送间隔4S,由于超时设置为3S,于是发送第一条消息后,通道便断开连接。接下来,客户端又启动了一个线程再次与服务端通信。
到这里,这篇博客就结束了,对你有用吗?
下周我们再来看Netty在心跳检测方面的应用,^_^