基于netty框架的Socket传输

一、Netty框架介绍

什么是netty?先看下百度百科的解释:

Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。

“快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议的实现经验,这些协议包括FTP,SMTP,HTTP,各种二进制,文本协议,并经过相当精心设计的项目,最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性

为什么好多大公司都在使用netty框架?主要是基于netty框架的以下几个特点决定的:

1)健壮性,2)功能齐全,3)可定制,4)扩展性

二、框架优点

传统的RPC性能差,主要是由于客户端和远程调用采用了同步阻塞IO线程,当客户端的并发压力增大后,同步阻塞会由于频繁的等待导致I/O线程堵塞,线程无法高效的工作,IO处理能力自然会降低。影响性能的三个因素:第一,IO模型,IO模型在一定程度上决定了框架的性能。第二、协议,如:HTTP、TCP/IP等,协议选择的不同,性能模型也不同,通常情况下,内部私有协议的性能比较优,这是由于内部设计决定的。第三、线程,数据报文的接收、读取、编码、解码等,线程模型的不同,性能也不同。相比于传统的RPC框架,netty的优点主要体现在以下几个方面:

  1. API使用简单,封装非常完善,开发门槛低
  2. 功能上强大,预置了多种编码解码功能,多种协议支持
  3. 定制能力强,可以对ChannelHandler对通信框架灵活扩展
  4. 性能高,Reactor线程模型调度,ChannelFuture-Listener,通过Listener机制主动推送结果
  5. 版本成熟稳定,社区活跃,版本更新快,出现的Bug会被很快的修复,同时,有心功能的加入,经历了大规模的商业应用考验,质量的到了充分的验证。已经广泛应用到互联网、大数据、企业应用、电信软件、网络游戏等热门行业,他可以满足不同的商业标准。

三、Netty架构分析

Netty是一个基于三层网络架构模型的框架,三层网络架构分析包括调度层、链条传递层以及业务逻辑层。

  1. Reactor通信调度层,是一个模型,

NIO线程池组件{

监听网络读写连接

业务调度处理

NIO,AIO,配合NIO通道NioSocketChannel组件

}

Netty通过内部select巡查机制,能够实现IO多路复用,通过把多个IO阻塞复用到同一个select的阻塞上,从而能够使系统即使在单线程的情况下,也能够同时处理多个请求。这样就使得netty实现了IO多路复用的优势,与传统多线程相比,大大减少了系统的开销,因为系统不必创建新的线程和销毁线程了,减少了系统的维护难度,节省了资源。

ByteBuffer池化支持,不用手动切换标志位,实现零拷贝。传统的Socket读写,基本是使用堆内存进行,即jvm事先会把堆内存拷贝到内存中,然后再写入Socket,而netty采用的是DIRECT BUFFERS,不需要经过jvm内存拷贝,在堆外内存直接进行Socket读写,这样就少了一次缓冲区的内存拷贝,从而实现零拷贝。

2.Pipleline职责链条传递

拦截处理向前向后事件,外部传入的消息包对象,有POJO信息抽象,上层也只需要处理逻辑,类似SpringIOC处理BeanDefince。不同的Handler节点的功能也不同,通常情况下需要编码解码等,它可以完成外部协议到内部POJO对象的转化,这样上层只需要关注业务逻辑,不需要知道底层的协议和线程模型,从而实现解耦。

3.构建逻辑业务处理单元

底层的协议隔离,上层处理逻辑框架并不需要关心底层协议是什么。Netty框架的分层设计使得开发人员不需要关注协议框架的实现,只需要关注服务层的业务逻辑开发即可,实现了简单化。

之前有个项目是基于传统Socket和线程池的技术实现的,但是在高并发的时候发现并发能力不足,压测的时候发现TPS达不到理想值,所以经过考虑,决定使用netty框架来解决此问题。同样,netty框架也分为客户端和服务端,经过整理,先写一个demo初探netty框架,下面是代码的实现过程。

首先是服务端,服务端包含两个方面,第一、服务端Server的主要作用就是通过辅助引导程序,设置NIO的连接方式处理客户端请求,通过绑定特定端口、设定解码方式以及监听来实现整个线程的处理请求;第二、服务端Handler需要继承ChannelInboundHandlerAdapter类,handler类的主要作用是读取客户端数据,处理业务,抛出异常,响应客户端请求。代码如下:

服务端Server:

public class Server {

private static Log logger = LogFactory.getLog(Server.class);

private int port;

public Server(int port) {

        super();

        this.port = port;

}

public  void start(){

        ServerBootstrap b = new ServerBootstrap();//引导辅助程序

        EventLoopGroup group = new NioEventLoopGroup();//通过nio方式来接收连接和处理请求

        try {

               b.group(group);

               b.channel(NioServerSocketChannel.class);//设置nio类型的channnel

               b.localAddress(new InetSocketAddress(port));//设置监听端口

               //b.option(ChannelOption.SO_BACKLOG, 2048);

               b.childHandler(new ChannelInitializer<SocketChannel>() {//有连接到达时会创建一个channel

                      @Override

                      protected void initChannel(SocketChannel ch) throws Exception {

                             //注册handler

                             ch.pipeline().addLast(new ByteArrayDecoder());

                             ch.pipeline().addLast(new ByteArrayEncoder());

                             ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));

                             ch.pipeline().addLast(new ServerHandler());

                      }

               });//.option(ChannelOption.SO_BACKLOG, 2048).childOption(ChannelOption.SO_KEEPALIVE, true);

               ChannelFuture f = b.bind().sync();//配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功

               logger.info(Server.class.getName()+"开始监听:"+f.channel().localAddress());

               f.channel().closeFuture().sync();//应用程序会一直等待直到channel关闭

        } catch (Exception e) {

               e.printStackTrace();

        } finally {

               try {

                      //关闭EventLoopGroup,释放掉所有资源包括创建的线程

                      group.shutdownGracefully().sync();

               } catch (InterruptedException e) {

                      e.printStackTrace();

               }

        }

}

}

服务端Handler

public class ServerHandler extends ChannelInboundHandlerAdapter {

private static Log logger=LogFactory.getLog(ServerHandler.class);

@Override

public void channelActive(ChannelHandlerContext ctx){

        logger.info(ctx.channel().localAddress().toString()+"通道活跃....");

}

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        logger.error(ctx.channel().localAddress().toString()+"通道不活跃....");

}

/**

 *

 * 读取客户端传过来的消息

 */

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        //业务处理类

        logger.info("开始业务处理....");

        new SocketController(ctx,msg).run();

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

        //出现异常,关闭连

        logger.error("服务端出现异常:"+cause.getMessage(),cause);

        ctx.close();

}

@Override

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

        logger.info("服务端完成请求!");

        ctx.flush();

}

}

客户端代码

客户端主要是用来向服务端发送数据,同样包含两个方面,第一、Client主要通过设定端口和IP和服务器建立连接,进行数据包的编码;第二、ClientHandler 需要继承 SimpleChannelInboundHandler<ByteBuf>类,针对不同的传输方式,继承不同的类,handler类同样处理业务请求,响应服务端的请求。代码如下:

客户端Client:

public class Client {
    private static Log logger=LogFactory.getLog(Client.class);
    private String host;
    private int port;
    public Client(String host, int port) {
        super();
        this.host = host;
        this.port = port;
    }
    public void connect(){
        EventLoopGroup workGroup=new NioEventLoopGroup();
        Bootstrap bootstrap=new Bootstrap();
        bootstrap.group(workGroup);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                logger.info("客户端触发连接......");
                ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
                ch.pipeline().addLast(new ClientHandler());
            }
        });
        //客户端开始连接
        try {
            logger.info("连接到服务器......");
            ChannelFuture future=bootstrap.connect(host,port).sync();
            //等待连接关闭
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally{
            workGroup.shutdownGracefully();
        }
    }
}

客户端Handler:

public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    private static Log logger=LogFactory.getLog(ClientHandler.class);
    /**
     * 向服务端发送消息
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.info(ctx.channel().localAddress().toString()+"客户点活跃...");
        //向服务端写字符串
        logger.info("客户端连接服务端,开始发送数据.....");
        String string ="hello server!";
        System.out.println("发送数据为:"+string);
        ByteBuf buf=ctx.alloc().buffer(4*string.length());
        buf.writeBytes(string.getBytes());
        ctx.writeAndFlush(buf);
        logger.info("发送完毕...");
    }

    /**
     * 读取服务端返回来的消息
     */
    @Override
    protected void channelRead0(ChannelHandlerContext arg0, ByteBuf in) throws Exception {
        logger.info("开始接受服务端数据");
        byte[] b=new byte[in.readableBytes()];
        in.readBytes(b);
        String string=new String(b);
        logger.info("服务端发送的数据为:"+string);
        in.release();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.info("客户端异常:"+cause.getMessage(),cause);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        logger.info("客户端完成请求....");
        ctx.flush();
    }
}

服务端启动:

public class ServerMain {

private static Log logger=LogFactory.getLog(ServerMain.class);

private static Server server =new Server(55550);

public static void main(String[] args) {

        logger.info("服务端启动.......");

        server.start();

}

}

客户端启动类:

public class Test {

private static Client client = new Client("127.0.0.1", 55550);

public static void main(String[] args) throws UnknownHostException, IOException {

        client.connect();

}

}

测试结果:

服务端:

客户端:

总结:

以上只是一个netty框架初探的小Demo,学习使用netty框架的开始,这里面涉及到了很多的技术以及非常多的组件,比如:Channels、Callbacks、Futures、Events和handlers等等,需要进一步的学习,另外,消息的编码解码、粘包、拆包的方式方法、消息格式的转换以及报文格式大小限制都需要进一步的研究学习。

原文地址:https://www.cnblogs.com/10158wsj/p/8428347.html

时间: 2024-10-10 04:19:10

基于netty框架的Socket传输的相关文章

基于netty框架的socket长连接负载均衡解决方案

socket通讯的单机瓶颈 物联网的项目socket使用方式有两种: 短连接的socket请求 维持socket长连接的请求 对于socket短链接来说就好比是http请求,请求服务器,服务器返回数据以后请求管道就关闭了,服务器与客户端的链接就释放了.但是对于socket长链接就不同了,当设备与服务器建立连接以后就要一直保持连接,或者说保持较长时间的链接,那么就会大量消耗服务器的资源.若存在大量的这样的请求以后服务器终究会受不了垮掉.通过对TcpClient/server最大连接数我们得知单机s

基于Java Netty框架构建高性能的Jt808协议的GPS服务器(转)

原文地址:http://www.jt808.com/?p=971 使用Java语言开发一个高质量和高性能的jt808 协议的GPS通信服务器,并不是一件简单容易的事情,开发出来一段程序和能够承受数十万台车载接入是两码事,除去开发部标jt808协议的固有复杂性和几个月长周期的协议Bug调试,作为大批量794车载终端接入的服务端,需要能够处理网络的闪断.客户端的重连.安全认证和消息的编解码.半包处理等.如果没有足够的网络编程经验积累和深入了解部标jt808协议文档,自研的GPS服务器往往需要半年甚至

Java编写基于netty的RPC框架

一 简单概念 RPC: ( Remote Procedure Call),远程调用过程,是通过网络调用远程计算机的进程中某个方法,从而获取到想要的数据,过程如同调用本地的方法一样. 阻塞IO :当阻塞I/O在调用InputStream.read()方法是阻塞的,一直等到数据到来时才返回,同样ServerSocket.accept()方法时,也是阻塞,直到有客户端连接才返回,I/O通信模式如下: 图片描述(最多50字) 缺点:当客户端多时,会创建大量的处理线程,并且为每一个线程分配一定的资源;阻塞

《Java 编写基于 Netty 的 RPC 框架》

一 简单概念 RPC: ( Remote Procedure Call),远程调用过程,是通过网络调用远程计算机的进程中某个方法,从而获取到想要的数据,过程如同调用本地的方法一样. 阻塞IO :当阻塞I/O在调用InputStream.read()方法是阻塞的,一直等到数据到来时才返回,同样ServerSocket.accept()方法时,也是阻塞,直到有客户端连接才返回,I/O通信模式如下: 缺点:当客户端多时,会创建大量的处理线程,并且为每一个线程分配一定的资源;阻塞可能带来频繁切换上下文,

基于Netty的高性能JAVA的RPC框架

前言 今年7月份左右报名参加了阿里巴巴组织的高性能中间件挑战赛,这次比赛不像以往的比赛,是从一个工程的视角来比赛的. 这个比赛有两个赛题,第一题是实现一个RPC框架,第二道题是实现一个Mom消息中间件. RPC题目如下 一个简单的RPC框架 RPC(Remote Procedure Call )--远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议.RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据.在OSI网络通信模型中,RPC

基于netty轻量的高性能分布式RPC服务框架forest&lt;下篇&gt;

基于netty轻量的高性能分布式RPC服务框架forest<上篇> 文章已经简单介绍了forest的快速入门,本文旨在介绍forest用户指南. 基本介绍 Forest是一套基于java开发的RPC框架,除了常规的点对点调用外,Motan还提供服务治理功能,包括服务节点的自动发现.摘除.高可用和负载均衡等. 架构概述 Forest中分为服务提供方(RPC Server),服务调用方(RPC Client)和服务注册中心(Registry)三个角色. Server提供服务,向Registry注册

基于Netty实现的RESTful框架--netty-rest-server

在工作中用Netty做了几个服务,感觉Netty做出来的程序性能好,资源占用少,但是实现Http服务比较麻烦,于是就参考Spring MVC的注解基于Netty实现了一个轻量级的RESTful框架. 该框架提供了控制器注解.全局异常控制器.拦截器等功能. 注解名称参考了Spring MVC,编译理解和记忆,主要包括如下注解: @RestController @RequestMapping @GetMapping @PostMapping @DeleteMapping @PutMapping @P

基于Netty和SpringBoot实现一个轻量级RPC框架-Client篇

前提 前置文章: <基于Netty和SpringBoot实现一个轻量级RPC框架-协议篇> <基于Netty和SpringBoot实现一个轻量级RPC框架-Server篇> 前一篇文章相对简略地介绍了RPC服务端的编写,而这篇博文最要介绍服务端(Client)的实现.RPC调用一般是面向契约编程的,而Client的核心功能就是:把契约接口方法的调用抽象为使用Netty向RPC服务端通过私有协议发送一个请求.这里最底层的实现依赖于动态代理,因此动态代理是动态实现接口的最简单方式(如果

基于Netty构建高性能的部标808协议的GPS服务器

使用Java语言开发一个高质量和高性能的jt808 协议的GPS通信服务器,并不是一件简单容易的事情,开发出来一段程序和能够承受数十万台车载接入是两码事,除去开发部标808协议的固有复杂性和几个月长周期的协议Bug调试,作为大批量794车载终端接入的服务端,需要能够处理网络的闪断.客户端的重连.安全认证和消息的编解码.半包处理等.如果没有足够的网络编程经验积累和深入了解部标808协议文档,自研的GPS服务器往往需要半年甚至数年的时间才能最终稳定下来,这种成本即便对一个大公司而言也是个严重的挑战.