Netty 系列一(核心组件和实例).

一、概念

早期的 Java API 只支持由本地系统套接字库提供所谓的阻塞函数来支持网络编程。由于是阻塞 I/O ,要管理多个并发客户端,需要为每个新的客户端Socket 创建一个 Thread 。这将导致一系列的问题,第一,在任何时候都可能有大量的线程处于休眠状态(不可能每时每刻都有对应的并发数);第二,需要为每个线程的调用栈都分配内存;第三,JVM 在线程的上下文切换所带来的开销会带来麻烦。

Java 在 2002 年引入了非阻塞 I/O,位于 JDK 1.4 的 java.nio 包中。class java.nio.channels.Selector 是Java 的非阻塞 I/O 实现的关键。它使用了事件通知以确定在一组非阻塞套接字中有哪些已经就绪能够进行 I/O 相关的操作。因为可以在任何的时间检查任意的读操作或者写操作的完成状态,所以如图 1-2 所示,一个单一的线程便可以处理多个并发的连接。

尽管可以直接使用 Java NIO API,但是在高负载下可靠和高效地处理和调度 I/O 操作是一项繁琐而且容易出错的任务,最好还是留给高性能的网络编程专家——Netty。

Netty 是一款异步的事件驱动的网络应用程序框架,支持快速的开发可维护的高性能的瞄向协议的服务端和客户端。它驾驭了Java高级API的能力,并将其隐藏在一个易于使用的API之后。首先,它的基于 Java NIO 的异步的和事件驱动的实现,保证了高负载下应用程序性能的最大化和可伸缩性。其次, Netty 也包含了一组设计模式,将应用程序逻辑从网络层解耦,简化了开发过程, 同时也最大限度地提高了可测试性、模块化以及代码的可重用性。

tips:面向对象的基本概念—> 用较简单的抽象隐藏底层实现的复杂性。

二、核心组件

  • Channel

Channel是Java NIO的一个基本构造。可以看作是传入或传出数据的载体。因此,它可以被打开或关闭,连接或者断开连接。以下是常用的Channel:

-- EmbeddedChannel
-- LocalServerChannel
-- NioDatagramChannel
-- NioSctpChannel
-- NioSocketChannel

  • 回调

    当一个回调被触发时,相应的事件可以被一个interface-ChannelHandler的实现处理。

  • Future

    Netty中所有的I/O操作都是异步的。因为一个操作可能不会立即返回,所以我们需要一种在之后的某个时间点确定其结果的方法。

Future 和 回调 是相互补充的机制,提供了另一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操作结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问。

Netty 提供了ChannelFuture,用于在执行异步操作的时候使用。每个Netty的出站I/O操作都会返回一个ChannelFuture。ChannelFuture能够注册一个或者多个ChannelFutureListener 实例。监听器的回调方法operationComplete(),将会在对应的操作完成时被调用。

  • ChannelHandler

    Netty 的主要组件是ChannelHandler,它充当了所有处理入站和出站数据的应用程序逻辑的容器。

Netty 使用不同的事件来通知我们状态的改变或者是操作的状态,每个事件都可以被分发给ChannelHandler类中某个用户实现的方法。Netty提供了大量预定义的可以开箱即用的ChannelHandler实现,包括用于各种协议的ChannelHandler。

现在,事件可以被分发给ChannelHandler类中某个用户实现的方法。那么,如果 ChannelHandler 处理完成后不直接返回给客户端,而是传递给下一个ChannelHandler 继续处理呢?那么就要说到 ChannelPipeline !

ChannelPipeline 提供了 ChannelHandler链 的容器,并定义了用于在该链上传播入站和出站事件流的API。使得事件流经 ChannelPipeline 是 ChannelHandler 的工作,它们是在应用程序的初始化或者引导阶段被安装的。这些对象接收事件、执行他们所实现的处理逻辑,并将数据传递给链中的下一个ChannelHandler:

1、一个ChannelInitializer的实现被注册到了ServerBootstrap中。
2、当 ChannelInitializer.initChannel()方法被调用时, ChannelInitializer将在 ChannelPipeline 中安装一组自定义的 ChannelHandler。
3、ChannelInitializer 将它自己从 ChannelPipeline 中移除。

  • EventLoop

EventLoop 定义了Netty的核心抽象,用来处理连接的生命周期中所发生的事件,在内部,将会为每个Channel分配一个EventLoop。

EventLoop本身只由一个线程驱动,其处理了一个Channel的所有I/O事件,并且在该EventLoop的整个生命周期内都不会改变。这个简单而强大的设计消除了你可能有的在ChannelHandler实现中需要进行同步的任何顾虑。

这里需要说到,EventLoop的管理是通过EventLoopGroup来实现的。还要一点要注意的是,客户端引导类是 Bootstrap,只需要一个EventLoopGroup。服务端引导类是 ServerBootstrap,通常需要两个 EventLoopGroup,一个用来接收客户端连接,一个用来处理 I/O 事件(也可以只使用一个 EventLoopGroup,此时其将在两个场景下共用同一个 EventLoopGroup)。

1、一个 EventLoopGroup 包含一个或者多个 EventLoop;
2、一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;
3、所有由 EventLoop 处理的 I/O 事件都将在它专有的Thread 上被处理;
4、一个 Channel 在它的生命周期内只注册于一个EventLoop;
5、一个 EventLoop 可能会被分配给一个或者多个 Channel(面对多个Channel,一个 EventLoop 按照事件触发,顺序执行)。

三、实例

所有的Netty服务端/客户端都至少需要两个部分:

1、至少一个ChannelHandler —— 该组件实现了对数据的处理。

2、引导 —— 这是配置服务器的启动代码。

服务端:

public class EchoServer {

    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void start() throws InterruptedException {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        //1、创建EventLoopGroup以进行事件的处理,如接受新连接以及读/写数据
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //2、创建ServerBootstrap,引导和绑定服务器
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(group, group)
                    //3、指定所使用的NIO传输Channel
                    .channel(NioServerSocketChannel.class)
                    //4、使用指定的端口设置套接字地址
                    .localAddress(new InetSocketAddress(port))
                    //5、添加一个 EchoServerHandler 到子 Channel的 ChannelPipeline
                    //当一个新的连接被接受时,一个新的子Channel将会被创建,而 ChannelInitializer 将会把一个你的EchoServerHandler 的实例添加到该 Channel 的 ChannelPipeline 中
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(serverHandler);
                        }
                    });
            //6、异步地绑定服务器,调用sync()方法阻塞等待直到绑定完成
            ChannelFuture channelFuture = bootstrap.bind().sync();
            System.out.println(EchoServer.class.getName() + "started and listening for connections on" + channelFuture.channel().localAddress());
            //7、获取 Channel 的 CloseFuture,并且阻塞当前线程直到它完成
            channelFuture.channel().closeFuture().sync();

        } finally {
            //8、关闭 EventLoopGroup 释放所有的资源
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new EchoServer(9999).start();
    }
}

@ChannelHandler.Sharable //标识一个Channel-Handler 可以被多个Channel安全的共享
public class EchoServerHandler extends ChannelHandlerAdapter {

    /**
     * 对于每个传入的消息都要调用
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        System.out.println("Server received:" + in.toString(CharsetUtil.UTF_8));
        //将接收到的消息写给发送者,而不冲刷出站消息
        //ChannelHandlerContext 发送消息。导致消息向下一个ChannelHandler流动
        //Channel 发送消息将会导致消息从 ChannelPipeline的尾端开始流动
        ctx.write(in);
    }

    /**
     * 通知 ChannelHandlerAdapter 最后一次对channel-Read()的调用是当前批量读取中的最后一条消息
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //暂存于ChannelOutboundBuffer中的消息,在下一次调用flush()或者writeAndFlush()方法时将会尝试写出到套接字
        //将这份暂存消息冲刷到远程节点,并且关闭该Channel
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }

    /**
     * 在读取操作期间,有异常抛出时会调用
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

EchoServerHandler.java

客户端:

public class EchoClient {

    private final String host;
    private final int port;

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //创建Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            //指定 EventLoopGroup 以处理客户端事件;适应于NIO的实现
            bootstrap.group(group)
                    //适用于NIO传输的Channel类型
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(host, port))
                    //在创建Channel时,向ChannelPipeline中添加一个EchoClientHandler实例
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            //连接到远程节点,阻塞等待直到连接完成
            ChannelFuture channelFuture = bootstrap.connect().sync();
            //阻塞,直到Channel 关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            //关闭线程池并且释放所有的资源
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new EchoClient("127.0.0.1", 9999).start();

        System.out.println("------------------------------------");

        new EchoClient("127.0.0.1", 9999).start();

        System.out.println("------------------------------------");

        new EchoClient("127.0.0.1", 9999).start();
    }

}

@ChannelHandler.Sharable //标记该类的实例可以被多个Channel共享
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    /**
     * 当从服务器接收到一条消息时被调用
     *
     * @param ctx
     * @param msg ByteBuf (Netty 的字节容器) 作为一个面向流的协议,TCP 保证了字节数组将会按照服务器发送它们的顺序接收
     * @throws Exception
     */
    @Override
    protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("Client" + ctx.channel().remoteAddress() + "connected");
        System.out.println(msg.toString(CharsetUtil.UTF_8));
    }

    /**
     * 在到服务器的连接已经建立之后将被调用
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx)  {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rock!", CharsetUtil.UTF_8));
    }

    /**
     * 在处理过程中引发异常时被调用
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

EchoClientHandler.java

四、结语

带着一阵迷糊就开始了Netty学习之旅,学到现在还是对Netty一堆专有名词头大!没办法,只好硬着头皮学下去了,毕竟,熟读唐诗三百首,不会作诗也会吟嘛!

来总结下,一个Netty服务端处理客户端连接的过程:1、创建一个channel同该用户端进行绑定;2、channel从EventLoopGroup获得一个EventLoop,并注册到该EventLoop,channel生命周期内都和该EventLoop在一起(注册时获得selectionKey);3、channel同用户端进行网络连接、关闭和读写,生成相对应的event(改变selectinKey信息),触发eventloop调度线程进行执行;4、ChannelPipeline 找到对应 ChannelHandler 方法处理用户逻辑。

参考资料:《Netty IN ACTION》

演示源代码:https://github.com/JMCuixy/NettyDemo

原文地址:https://www.cnblogs.com/jmcui/p/9154842.html

时间: 2024-07-31 19:19:41

Netty 系列一(核心组件和实例).的相关文章

5. 彤哥说netty系列之Java NIO核心组件之Channel

你好,我是彤哥,本篇是netty系列的第五篇. 简介 上一章我们一起学习了如何使用Java原生NIO实现群聊系统,这章我们一起来看看Java NIO的核心组件之一--Channel. 思维转变 首先,我想说的最重要的一个点是,学习NIO思维一定要从BIO那种一个连接一个线程的模式转变成多个连接(Channel)共用一个线程来处理的这种思维. 1个Connection = 1个Socket = 1个Channel,这几个概念可以看作是等价的,都表示一个连接,只不过是用在不同的场景中. 如果单从阻塞

6. 彤哥说netty系列之Java NIO核心组件之Buffer

--日拱一卒,不期而至! 你好,我是彤哥,本篇是netty系列的第六篇. 简介 上一章我们一起学习了Java NIO的核心组件Channel,它可以看作是实体与实体之间的连接,而且需要与Buffer交互,这一章我们就来学习一下Buffer的特性. 概念 Buffer用于与Channel交互时使用,通过上一章的学习我们知道,数据从Channel读取到Buffer,或者从Buffer写入Channel. Buffer本质上是一个内存块,可以向里面写入数据,或者从里面读取数据,在Java中它被包装成了

7. 彤哥说netty系列之Java NIO核心组件之Selector

<p align="right">--日拱一卒,不期而至!</p> 你好,我是彤哥,本篇是netty系列的第七篇. 简介 上一章我们一起学习了Java NIO的核心组件Buffer,它通常跟Channel一起使用,但是它们在网络IO中又该如何使用呢,今天我们将一起学习另一个NIO核心组件--Selector,没有它可以说就干不起来网络IO. 概念 我们先来看两段Selector的注释,见类java.nio.channels.Selector. 注释I A mul

Netty系列之Netty高性能之道(转载InfoQ)

1. 背景 1.1. 惊人的性能数据 最近一个圈内朋友通过私信告诉我,通过使用Netty4 + Thrift压缩二进制编解码技术,他们实现了10W TPS(1K的复杂POJO对象)的跨节点远程服务调用.相比于传统基于Java序列化+BIO(同步阻塞IO)的通信框架,性能提升了8倍多. 事实上,我对这个数据并不感到惊讶,根据我5年多的NIO编程经验,通过选择合适的NIO框架,加上高性能的压缩二进制编解码技术,精心的设计Reactor线程模型,达到上述性能指标是完全有可能的. 下面我们就一起来看下N

【读后感】Netty 系列之 Netty 高性能之道 - 相比 Mina 怎样 ?

太阳火神的漂亮人生 (http://blog.csdn.net/opengl_es) 本文遵循"署名-非商业用途-保持一致"创作公用协议 转载请保留此句:太阳火神的漂亮人生 -  本博客专注于 敏捷开发及移动和物联设备研究:iOS.Android.Html5.Arduino.pcDuino,否则,出自本博客的文章拒绝转载或再转载,谢谢合作. [读后感] 不知道这是什么节奏,或许人家早就春意盎然了.仅仅是我方才感觉到而已! 研究 Mina 的过程中,偶然发现了 Netty,有人说 Min

【读后感】Netty系列之Netty高性能之道 - 相比 Mina 如何 ?

[读后感] 不知道这是什么节奏,也许人家早就春意盎然了,只是我方才感觉到而已! 研究 Mina 的过程中,偶然发现了 Netty,有人说 Mina 好久不更新了,而 Netty 一直很活跃, 这只能说, Netty 在高速的完善当中, 至于 Mina,是没有后劲儿了呢,还是已经很完善了,不需要再继续更新,这个到是不得而知, 至少目前看,还不错, 相比 Netty 的大数据.大并发.高效率...还有什么,请移步下文,自品其详吧! Netty系列之Netty高性能之道 1. 背景 1.1. 惊人的性

小编带你了解Netty 系列七(那些开箱即用的 ChannelHandler).

一.前言Netty 为许多通用协议提供了编×××和处理器,几乎可以开箱即用, 这减少了你在那些相当繁琐的事务上本来会花费的时间与精力.另外,这篇文章中,就不涉及 Netty 对 WebSocket协议 的支持了,因为涉及的篇幅有点大,会在下一篇文章做一个具体的介绍. 回到顶部二.SSL 协议SSL 协议是安全协议,层叠在其他协议之上.为了支持 SSL/TLS, Java 提供了 javax.net.ssl 包,它的 SSLContext 和 SSLEngine 类使得实现解密和加密相当简单直接.

Java JUC之Atomic系列12大类实例讲解和原理分解

Java JUC之Atomic系列12大类实例讲解和原理分解 2013-02-21      0个评论       作者:xieyuooo 收藏    我要投稿 在java6以后我们不但接触到了Lock相关的锁,也接触到了很多更加乐观的原子修改操作,也就是在修改时我们只需要保证它的那个瞬间是安全的即可,经过相应的包装后可以再处理对象的并发修改,以及并发中的ABA问题,本文讲述Atomic系列的类的实现以及使用方法,其中包含: 基本类:AtomicInteger.AtomicLong.Atomic

【从零学习经典算法系列】分治策略实例——二分查找

1.二分查找算法简介 二分查找算法是一种在有序数组中查找某一特定元素的搜索算法.搜素过程从数组的中间元素开始,如果中间元素正好是要查找的元素,则搜索过程结束:如果某一特定元素大于或者小于中间元素,则在数组大于或小于中间元素的那一半中查找,而且跟开始一样从中间元素开始比较.如果在某一步骤数组 为空,则代表找不到.这种搜索算法每一次比较都使搜索范围缩小一半.折半搜索每次把搜索区域减少一半,时间复杂度为Ο(logn). 二分查找的优点是比较次数少,查找速度快,平均性能好:其缺点是要求待查表为有序表,且

ASP.NET MVC+EF框架+EasyUI实现权限管理系列(6)- EF上下文实例管理

原文:ASP.NET MVC+EF框架+EasyUI实现权限管理系列(6)- EF上下文实例管理 ASP.NET MVC+EF框架+EasyUI实现权限管系列 (开篇)   (1):框架搭建    (2):数据库访问层的设计Demo    (3):面向接口编程   (4 ):业务逻辑层的封装  (5):前台Jquery easyUI实现 前言:通过前面的五篇博客我们已经对权限系统的后台架构进行了详细的说明,那么我再前面的博客中也说到了我们的后台架构还会再改的,我准备这段时间我们继续完善我们的后台