Netty Nio启动全流程

Netty Nio启动全流程

1. 各组件之间的关系

说明:EventLoopGroup类似线程池,EventLoop为单线程,每个EventLoop关联一个Nio Selector,用于注册Channel,形成一个EventLoop被多个channel公用。在EventLoop会执行通道Io选择操作,以及非Io任务。在Channel初始化后会创建pipeline,是handler的链表结构。

2. 服务端vs客户端启动

// 服务端启动
private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();

    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it‘s not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}
//客户端启动
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();

    if (regFuture.isDone()) {
        if (!regFuture.isSuccess()) {
            return regFuture;
        }
        return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
    } else {
        // Registration future is almost always fulfilled already, but just in case it‘s not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                // Directly obtain the cause and do a null check so we only need one volatile read in case of a
                // failure.
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();
                    doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

一言以蔽之,首先做初始化channel和channel注册操作,然后服务器启动做绑定操作,客户端启动做连接操作。而初始化channel和channel注册都是通过initAndRegister()实现。最大化重用代码。

3. 初始化创建通道以及通道注册

3.1 模板方法的创建通道->初始化通道->通道注册

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
      // 创建通道
        channel = channelFactory.newChannel();
     // 初始化通道
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    // 通道注册
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

3.2 创建通道

  1. 构造channel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

NioChannel将java SelectableChannel包装了一把,并添加了pipeline和unsafe操作,默认的pipeline是一个双向链表结构,只包含head和tail两个节点。

  1. 初始化channel

    对于客户端而言,直接向pipeline中添加builder方法的handler,以及一些nio操作的通用属性,对于服务端创建而言,除了一些基本nio属性外,只添加了一个初始化的handler

// 客户端创建
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());
//服务端创建
p.addLast(new ChannelInitializer<Channel>() {
    @Override
    public void initChannel(final Channel ch) throws Exception {
        final ChannelPipeline pipeline = ch.pipeline();
        ChannelHandler handler = config.handler();
        if (handler != null) {
            pipeline.addLast(handler);
        }

        ch.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });
    }
});

注:ChannelInitializer的initChannnel会在注册成功之后调用,以此实现动态扩展。

客户端创建时候pipeline中没有ChannelInitializer,需要自己添加。

  1. 通道注册

    主要将channel绑定到EventLoop上面,然后在eventLoop单线程中执行注册操作

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    AbstractChannel.this.eventLoop = eventLoop;

    // 此时在主线程中,不知eventLoop线程池中
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

register0主要干三件事,注册->调用ChannelInitializer的initChannnel完成添加handler->注册channel关心的操作

3.1 java channel注册,0表示只注册,不执行任何操作

selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

3.2 pipeline.fireChannelRegistered()

此时,pipeline中包含三个handler,其中一个是ChannelInitializer。

public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    if (initChannel(ctx)) {
        ctx.pipeline().fireChannelRegistered();
    } else {
        ctx.fireChannelRegistered();
    }
}

3.2 beginRead();

@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

注意,此时才会真实注册关系的事件,对服务端而言为Accept,对客户端创建,就是connect

public NioServerSocketChannel(ServerSocketChannel channel) {
     super(null, channel, SelectionKey.OP_ACCEPT);
     config = new NioServerSocketChannelConfig(this, javaChannel().socket());
 }
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
     super(parent, ch, SelectionKey.OP_READ);
 }

至此,客户端与服务端完成了初始化channel以及注册channel操作。

4. 服务端绑定到指定端口

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

在eventLoop中执行绑定端口操作

@Override
public void bind(
        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
        throws Exception {
    unsafe.bind(localAddress, promise);
}

最后都是会调用unsafe的bind方法完成端口绑定操作。

5. 客户端连接远程服务端

private static void doConnect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
    final Channel channel = connectPromise.channel();
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (localAddress == null) {
                channel.connect(remoteAddress, connectPromise);
            } else {
                channel.connect(remoteAddress, localAddress, connectPromise);
            }
            connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        }
    });
}

连接服务端最终也是在eventLoop中执行,最终调用unsafe的connect方法。

protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    if (localAddress != null) {
        doBind0(localAddress);
    }

    boolean success = false;
    try {
        boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
        if (!connected) {
            selectionKey().interestOps(SelectionKey.OP_CONNECT);
        }
        success = true;
        return connected;
    } finally {
        if (!success) {
            doClose();
        }
    }
}

connect有三种结果,成功,直接返回true,失败则暂时不知道结果,检测OP_CONNECT,异常直接关闭链路。

值得说明的是jdk默认不支持连接超时,netty添加了超时机制:在EventLoop中添加超时任务,触发超时时间后会关闭连接,连接成功会删除该超时任务。

// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
    connectTimeoutFuture = eventLoop().schedule(new Runnable() {
        @Override
        public void run() {
            ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
            ConnectTimeoutException cause =
                    new ConnectTimeoutException("connection timed out: " + remoteAddress);
            if (connectPromise != null && connectPromise.tryFailure(cause)) {
                close(voidPromise());
            }
        }
    }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}

promise.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        if (future.isCancelled()) {
            if (connectTimeoutFuture != null) {
                connectTimeoutFuture.cancel(false);
            }
            connectPromise = null;
            close(voidPromise());
        }
    }
});

6.EventLooop 处理IO事件

if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
    // See https://github.com/netty/netty/issues/924
    int ops = k.interestOps();
    ops &= ~SelectionKey.OP_CONNECT;
    k.interestOps(ops);

    unsafe.finishConnect();
}

// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
    ch.unsafe().forceFlush();
}

// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}

原文地址:https://www.cnblogs.com/dragonfei/p/9343654.html

时间: 2024-11-26 14:49:32

Netty Nio启动全流程的相关文章

【Heritrix源代码分析4】开始一个爬虫抓取的全流程代码分析

在创建一个job后,就要开始job的运行,运行的全流程如下: 1.在界面上启动job 2.index.jsp 查看上述页面对应的源代码 <a href='"+request.getContextPath()+"/console/action.jsp?action=start'>Start</a> 3.action.jsp String sAction = request.getParameter("action"); if(sAction !

CentOS6.6服务器系统配置(LAMP+phpMyAdmin)全流程

CentOS6.6服务器系统配置(LAMP+phpMyAdmin)全流程 昨天在腾讯云上买了个服务器,是CentOS6.6操作系统的裸机,里面什么都没,然后开始了一天一夜的LAMP(Apache+MySql+PHP)的环境配置.借鉴了很多人的做法,花了很多时间,流了很多的汗水,虽然成功了,但是考虑到以后可能还有用,或者能给别人做做参考,于是写下这篇博客.(这篇博客借鉴了其他很多博客,借鉴最多的就是osyunwei的这篇博客,但是那些博客都是配置低版本的php和mysql,不便现在我们使用,我整合

Mina 源码阅读:Server端基于NIO的处理流程

源码面前,了无秘密.继之前阅读了Prototype.Spring.Tomcat.以及JDK的部分.Digester等等源码之后,学习一门技术,了解源码成了必备流程.也深深的感受到了源码面前,了无秘密的含义,同时也体会到它给我带来的好处.同时,也希望所有开发者,不论前端后端,如果有时间的话,都尽量看看源码吧. 接下来进入正题,这里要对Mina流程做一个分析.因为是指对NIO流程做了分析,所以这里说的也是NIO的执行流程. 先看一下Mina中主要类的大致结构: 接下来看看Mina的整个生命周期: 1

用intellij Idea加载eclipse的maven项目全流程

eclipse的maven项目目录 全流程 加载项目 打开intellij Idea file -> new -> module from existing Sources  选择.pom的文件,系统将自动加载maven项目  加载后显示的界面如下图所示 设置tomcat 进入project Structure 配置web工程路径 添加tomcat 配置tomcat 添加web exploded 修改访问路径 启动tomcat 原文地址:https://www.cnblogs.com/cnnd

大数据分析挖掘全流程实战视频教程:电商市场与销售趋势预测

大数据分析挖掘全流程实战视频教程:电商市场与销售趋势预测资源下载:https://pan.baidu.com/s/1VPydETNHqhDDcJ1Lpko1AA 提取码:o9mk 课程特色:特色一:一套课程,搞定企业级数据分析与挖掘全栈技术特色二:基于Linux+Windows两套系统手把手教你搭建企业数据分析/挖掘开发环境,带你从0~1特色三:电商企业经典数据分析与挖掘项目全程贯穿,教你从1~100 课程目标:1.掌握预测分析的理论基础,一些数据分析挖掘软件的使用技巧2.通过掌握的分析技术及软

cts全流程自动化方案讨论

CTS全流程自动化方案讨论 功能愿景 cts全流程自动化.大致包括以下模块: 版本构建 强制OTA升级 自动retry 报告+日志备份并解析 邮件发送 方案痛点 目标 方案 问题等级&痛点 版本构建 目前方案:python库paramiko远程ssh登录详细:win端python脚本paramiko库远程ssh登录Linux编译服务器,将一键编译命令写入一个python文件,然后执行该"一键编译命令"py文件 问题等级:低版本有时候会下载不下来,需要人工干预.自动构建不是问题,

java编解码技术,netty nio

对于java提供的对象输入输出流ObjectInputStream与ObjectOutputStream,可以直接把java对象作为可存储的字节数组写入文件,也可以传输到网络上去.对与java开放人员来说,默认的jdk序列化机制可以避免操作底层的字节数组,从而提升开发效率. 1.为什么需要序列化 网络传输与对象序列化 2.java编解码技术指的什么 netty nio是基于网络传输,当进行远程跨进程服务调用时,需要把被传输的对象编码为字节数组或者bytebuffer对象.而当远程服务读取到byt

Orange&#39;s 自己动手写操作系统 第一章 十分钟完成的操作系统 U盘启动 全记录

材料: 1 nasm:编译汇编源代码,网上很多地方有下 2  WinHex:作为windows系统中的写U盘工具,需要是正版(full version)才有写的权限,推荐:http://down.liangchan.net/WinHex_16.7.rar 步骤: 1 编译得到引导程序的机器代码.用命令行编译汇编源代码:name boot.asm -o boot.bin,其中boot.bin文件产生在命令行的当前目录中. 2 将引导程序写入到U盘引导盘的第一个扇区的第一个字节处(后),即主引导区.

密码学——网间数据加密传输全流程(SSL加密原理)

0.导言 昨天写了一篇关于<秘钥与公钥>的文章,写的比较简单好理解,有点儿像过家家,如果详细探究起来会有不少出入,今天就来详细的说明一下数据加密的原理和过程.这个原理就是大名鼎鼎SSL的加密原理,哦,对了,有人说上个月SSL刚爆出本年度最轰动的漏洞"心脏滴血",可能危及全球好多互联网企业,那这个东西靠谱吗?其实,爆出漏洞的是SSL的一款产品openssl,是程序员编码时候的失误,是程序上的漏洞,而非SSL原理上出现了问题,所以SSL还是靠谱的,HTTP协议还是需要它来保护,