netty 源码分析二

以服务端启动,接收客户端连接整个过程为例分析, 简略分为 五个过程:

1.NioServerSocketChannel 管道生成,

2.NioServerSocketChannel 管道完成初始化,

3.NioServerSocketChannel注册至Selector选择器,

4.NioServerSocketChannel管道绑定到指定端口,启动服务

5.NioServerSocketChannel接受客户端的连接,进行相应IO操作

Ps:netty内部过程远比这复杂,简略记录下方便以后回忆对整个流程的把控.

管道生成调用NioServerSocketChannel类的如下构造方法:

    /**
     * Create a new instance
     */
    public NioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
        super(null, eventLoop, childGroup, newSocket(), SelectionKey.OP_ACCEPT);
        config = new DefaultServerSocketChannelConfig(this, javaChannel().socket());
    }

由ServerBootStrap 这个启动工具类负责创建, 调用其内部类ServerBootstrapChannelFactory的newChannel()方法完成.

    @Override
    Channel createChannel() {
        EventLoop eventLoop = group().next();
        return channelFactory().newChannel(eventLoop, childGroup);
    }

构造方法需要传入两个参数,  EventLoop , EventLoopGroup .

EventLoop 内置单个线程池,主要负责 轮询selector 这个选择器获取准备就绪的channel管道,并交给EventLoopGroup进行读写操作

EventLoopGroup 内置多个线程池,负责处理IO读写操作.

管道初始化主要为NioServerSocketChannel配置一些可选option,attrs属性, 同时向ChannelPipeline类中添加ServerBootstrapAcceptor 处理器,代码如下:

    @Override
    void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options();
        synchronized (options) {
            channel.config().setOptions(options);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();
        if (handler() != null) {
            p.addLast(handler());
        }

        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions,
                        currentChildAttrs));
            }
        });
    }

netty是基于java的NIo开发的,所以NioServerSocketChannel管道注册类似NIO的注册,主要向Selector这个选择器完成注册.ServerBootstrap启动类中注册代码就如下一行:

channel.unsafe().register(regFuture);

由NioServerSocketChannel内部类unsafe(抽象实现类AbstractUnsafe)完成注册,代码如下:

        @Override
        public final void register(final ChannelPromise promise) {
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    promise.setFailure(t);
                }
            }
        }

        private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!ensureOpen(promise)) {
                    return;
                }
                doRegister();
                registered = true;
                promise.setSuccess();
                pipeline.fireChannelRegistered();
                if (isActive()) {
                    pipeline.fireChannelActive();
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                if (!promise.tryFailure(t)) {
                    logger.warn(
                            "Tried to fail the registration promise, but it is complete already. " +
                                    "Swallowing the cause of the registration failure:", t);
                }
            }
        }

register(final ChannelPromise promise)方法中代码片段

eventLoop.execute(new Runnable() {...}

execute()方法首先会启动 EventLoop 线程池不断轮询Selector,  然后先线程池内部丢一个task进去,内部代码不在展开.

register0()方法中doRegister()方法如下,本质就是通过NIo的ServersocketChannel完成注册:

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

NioServerSocketChannel管道创建,初始化,注册完毕之后就需要绑定到指定端口以提供服务.核心代码在AbstractBootstrap类的doBind0()方法中如下:

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

channel.eventLoop()用于获取到创建管道时传入的 EventLoop线程池(线程已经在注册时候启动),然后向线程池内部放入一个绑定端口任务.

channel.bind()内部实现代码如下:

 @Override
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }

通过DefaultChannelPipeline类的bind()方法执行,DefaultChannelPipeline内部实现如下,:

 @Override
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return tail.bind(localAddress, promise);
    }

每个DefaultChannelPipeline类内部都会维护着一些DefaultChannelHandlerContext, 可以通过addXXX()方法往DefaultChannelPipeline类里面增加DefaultChannelHandlerContext,每个DefaultChannelHandlerContext里面都会维护这个一个handler,用于后期invoke该handler.

tail 是 DefaultChannelPipeline内部最后一个DefaultChannelHandlerContext, bind方法内部实现如下:

    @Override
    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
        DefaultChannelHandlerContext next = findContextOutbound(MASK_BIND);
        next.invoker.invokeBind(next, localAddress, promise);
        return promise;
    }
    private DefaultChannelHandlerContext findContextOutbound(int mask) {
        DefaultChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while ((ctx.skipFlags & mask) != 0);
        return ctx;
    }

tail作为DefaultChannelPipeline内部最后一个DefaultChannelHandlerContext,会一直先前遍历,直到找到某个DefaultChannelHandlerContext内部的handlerA实现了bind()方法,然后找到该类内部ChannelHandlerInvoker实例实现者(DefaultChannelHandlerInvoker), 并调用invokeBind()方法, 该方法本质上是通过handlerA的bind()方法结束操作.

服务现在已经起来,等待客户端连接,并读取客户端数据.

EventLoop线程池在注册时已经启动,已经能够接受客户端的信息.主要代码在NioEventLoop类的run()方法中.如下:

    @Override
    protected void run() {
        for (;;) {
            oldWakenUp = wakenUp.getAndSet(false);
            try {
                if (hasTasks()) {
                    selectNow();
                } else {
                    select();

                    // 'wakenUp.compareAndSet(false, true)' is always evaluated
                    // before calling 'selector.wakeup()' to reduce the wake-up
                    // overhead. (Selector.wakeup() is an expensive operation.)
                    //
                    // However, there is a race condition in this approach.
                    // The race condition is triggered when 'wakenUp' is set to
                    // true too early.
                    //
                    // 'wakenUp' is set to true too early if:
                    // 1) Selector is waken up between 'wakenUp.set(false)' and
                    //    'selector.select(...)'. (BAD)
                    // 2) Selector is waken up between 'selector.select(...)' and
                    //    'if (wakenUp.get()) { ... }'. (OK)
                    //
                    // In the first case, 'wakenUp' is set to true and the
                    // following 'selector.select(...)' will wake up immediately.
                    // Until 'wakenUp' is set to false again in the next round,
                    // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                    // any attempt to wake up the Selector will fail, too, causing
                    // the following 'selector.select(...)' call to block
                    // unnecessarily.
                    //
                    // To fix this problem, we wake up the selector again if wakenUp
                    // is true immediately after selector.select(...).
                    // It is inefficient in that it wakes up the selector for both
                    // the first case (BAD - wake-up required) and the second case
                    // (OK - no wake-up required).

                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                }

                cancelledKeys = 0;

                final long ioStartTime = System.nanoTime();
                needsToSelectAgain = false;
                if (selectedKeys != null) {
                    processSelectedKeysOptimized(selectedKeys.flip());
                } else {
                    processSelectedKeysPlain(selector.selectedKeys());
                }
                final long ioTime = System.nanoTime() - ioStartTime;

                final int ioRatio = this.ioRatio;
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        break;
                    }
                }
            } catch (Throwable t) {
                logger.warn("Unexpected exception in the selector loop.", t);

                // Prevent possible consecutive immediate failures that lead to
                // excessive CPU consumption.
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // Ignore.
                }
            }
        }
    }

主要方法processSelectedKeysPlain()代码如下:

    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        // check if the set is empty and if so just return to not create garbage by
        // creating a new Iterator every time even if there is nothing to process.
        // See https://github.com/netty/netty/issues/597
        if (selectedKeys.isEmpty()) {
            return;
        }

        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            i.remove();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (!i.hasNext()) {
                break;
            }

            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();

                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    i = selectedKeys.iterator();
                }
            }
        }
    }

上面代码有两句还没明白原因,请教中.

final SelectionKey k = i.next();

final Object a = k.attachment();   //这个attachment()值啥时候被放进去的?

接着看代码,processSelectedKey方法内部主要实现对感兴趣事件的业务操作,比如读取数据操作,大致的操作流程其实跟端口绑定的流程类似.

    private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
        } catch (CancelledKeyException e) {
            unsafe.close(unsafe.voidPromise());
        }
    }

netty 源码分析二,布布扣,bubuko.com

时间: 2024-10-24 10:27:50

netty 源码分析二的相关文章

netty源码分析之揭开reactor线程的面纱(二)

如果你对netty的reactor线程不了解,建议先看下上一篇文章netty源码分析之揭开reactor线程的面纱(一),这里再把reactor中的三个步骤的图贴一下 reactor线程 我们已经了解到netty reactor线程的第一步是轮询出注册在selector上面的IO事件(select),那么接下来就要处理这些IO事件(process selected keys),本篇文章我们将一起来探讨netty处理IO事件的细节 我们进入到reactor线程的 run 方法,找到处理IO事件的代

Netty源码分析第5章(ByteBuf)----&gt;第1节: AbstractByteBuf

Netty源码分析第五章: ByteBuf 概述: 熟悉Nio的小伙伴应该对jdk底层byteBuffer不会陌生, 也就是字节缓冲区, 主要用于对网络底层io进行读写, 当channel中有数据时, 将channel中的数据读取到字节缓冲区, 当要往对方写数据的时候, 将字节缓冲区的数据写到channel中 但是jdk的byteBuffer是使用起来有诸多不便, 比如只有一个标记位置的指针position, 在进行读写操作时要频繁的通过flip()方法进行指针位置的移动, 极易出错, 并且by

Netty源码分析--内存模型(上)(十一)

前两节我们分别看了FastThreadLocal和ThreadLocal的源码分析,并且在第八节的时候讲到了处理一个客户端的接入请求,一个客户端是接入进来的,是怎么注册到多路复用器上的.那么这一节我们来一起看下客户端接入完成之后,是怎么实现读写操作的?我们自己想一下,应该就是为刚刚读取的数据分配一块缓冲区,然后把channel中的信息写入到缓冲区中,然后传入到各个handler链上,分别进行处理.那Netty是怎么去分配一块缓冲区的呢?这个就涉及到了Netty的内存模型. 当然,我们在第一节的时

4. Netty源码分析之Unsafe

Unsafe类实际上是Channel接口的辅助类,实际的IO操作都是由Unsafe接口完成的. 一.Unsafe继承关系图 二.AbstractUnsafe源码分析 1. register方法 register方法主要用于将当前Unsafe对应的Channel注册到EventLoop的多路复用器上,然后调用DefaultChannelPipeline的fireChannelRegisted方法,如果Channel被激活,则调用fireChannelActive方法. public final v

[Android]Volley源码分析(二)Cache

Cache作为Volley最为核心的一部分,Volley花了重彩来实现它.本章我们顺着Volley的源码思路往下,来看下Volley对Cache的处理逻辑. 我们回想一下昨天的简单代码,我们的入口是从构造一个Request队列开始的,而我们并不直接调用new来构造,而是将控制权反转给Volley这个静态工厂来构造. com.android.volley.toolbox.Volley: public static RequestQueue newRequestQueue(Context conte

哇!板球 源码分析二

游戏主页面布局 创建屏下Score标签 pLabel = CCLabelTTF::create("Score", "Arial", TITLE_FONT_SIZE); //分数标签 //设置标签字体的颜色 pLabel->setColor (ccc3(0, 0, 0)); //设置文本标签的位置 pLabel->setPosition ( ccp ( SCORE_X, //X坐标 SCORE_Y //Y坐标 ) ); //将文本标签添加到布景中 this

baksmali和smali源码分析(二)

这一节,主要介绍一下 baksmali代码的框架. 我们经常在反编译android apk包的时候使用apktool这个工具,其实本身这个工具里面对于dex文件解析和重新生成就是使用的baksmali 和smali这两个jar包其中 baksmali是将 dex文件转换成便于阅读的smali文件的,具体使用命令如下:java -jar baksmali.jar classes.dex -o myout其中myout是输出的文件夹 而smali是将smali文件重新生成回 dex文件的具体使用的命

【梦幻连连连】源码分析(二)

转载请注明出处:http://blog.csdn.net/oyangyufu/article/details/24736711 GameLayer场景界面效果: 源码分析: GameLayer场景初始化,主要是初始化加载界面及背景音乐 bool GameLayer::init() { float dt=0.0f; if ( !CCLayerColor::initWithColor(ccc4(255, 255, 255, 255))) { return false; } this->initLoa

[Android]Fragment源码分析(二) 状态

我们上一讲,抛出来一个问题,就是当Activity的onCreateView的时候,是如何构造Fragment中的View参数.要回答这个问题我们先要了解Fragment的状态,这是Fragment管理中非常重要的一环.我们先来看一下FragmentActivity提供的一些核心回调: @Override protected void onCreate(Bundle savedInstanceState) { mFragments.attachActivity(this, mContainer,