[编织消息框架][netty源码分析]5 EventLoopGroup 实现类NioEventLoopGroup职责与实现

分析NioEventLoopGroup最主有两个疑问

1.next work如何分配NioEventLoop

2.boss group 与child group 是如何协作运行的

从EventLoopGroup接口约定通过register方法从channel或promise转换成ChannelFuture对象

next方法就是用来分配NioEventLoop

public interface EventLoopGroup extends EventExecutorGroup {

    @Override
    EventLoop next();

    ChannelFuture register(Channel channel);
    ChannelFuture register(ChannelPromise promise);
    @Deprecated
    ChannelFuture register(Channel channel, ChannelPromise promise);
}

为了节省篇副,做了代码整理

1.NioEventLoopGroup构造时绑定SelectorProvider.provider(),通过newChild生成单个EventLoop

2.next实现是个环形循环

3.register方法是将channel转换成ChannelFuture

读者如果感兴趣可以在这几个方法打上断点看看

public class NioEventLoopGroup extends MultithreadEventLoopGroup {
    public NioEventLoopGroup(int nThreads, Executor executor) {
        this(nThreads, executor, SelectorProvider.provider());
    }
    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
    /////////////////////////////GenericEventExecutorChooser实现next//////////////////////////////////
    @Override
    public EventExecutor next() {
        return executors[Math.abs(idx.getAndIncrement() % executors.length)];
    }

    /////////////////////////////SingleThreadEventLoop实现register//////////////////////////////////

    @Override
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }

    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
}

我们用过程的方式来模拟NioEventLoopGroup使用

如果读者有印象netty server 至少有两组NioEventLoopGroup 一个是boss 另一个是child

public class TestBossChildGroup {
    static SocketAddress address = new InetSocketAddress("localhost", 8877);

    @Test
    public void server() throws IOException {

        SelectorProvider bossProvider = SelectorProvider.provider();
        SelectorProvider childProvider = SelectorProvider.provider();

        int count = 2;
        AbstractSelector bossSelector = bossProvider.openSelector();
        AbstractSelector[] childSelectors = new AbstractSelector[count];
        for (int i = 0; i < count; i++) {
            childSelectors[i] = childProvider.openSelector();
        }

        //server绑定访问端口 并向Selector注册OP_ACCEPT
        ServerSocketChannel serverSocketChannel = bossProvider.openServerSocketChannel();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(address);
        serverSocketChannel.register(bossSelector, SelectionKey.OP_ACCEPT);  

        int i = 0;
        while (true) {
            int s = bossSelector.select(300);
            if (s > 0) {
            Set<SelectionKey> keys = bossSelector.selectedKeys();
            Iterator<SelectionKey> it = keys.iterator();
            while (it.hasNext()) {
                SelectionKey key = it.next();
                //为什么不用elseIf 因为 key interestOps 是多重叠状态,一次返回多个操作
                if (key.isAcceptable()) {
                System.out.println("isAcceptable");
                //这里比较巧妙,注册OP_READ交给别一个Selector处理
                key.channel().register(childSelectors[i++ % count], SelectionKey.OP_READ);
                }
                //这部分是child eventLoop处理
                if (key.isConnectable()) {
                System.out.println("isConnectable");
                }
                if (key.isWritable()) {
                System.out.println("isWritable");
                }
                if (key.isReadable()) {
                System.out.println("isReadable");
                }
                key.interestOps(~key.interestOps());
                it.remove();
            }
            }
        }
    }

    @Test
    public void client() throws IOException {
        SocketChannel clientSocketChannel = SelectorProvider.provider().openSocketChannel();
        clientSocketChannel.configureBlocking(true);
        clientSocketChannel.connect(address);
    }
}
时间: 2024-08-28 17:47:44

[编织消息框架][netty源码分析]5 EventLoopGroup 实现类NioEventLoopGroup职责与实现的相关文章

[编织消息框架][netty源码分析]4 eventLoop 实现类NioEventLoop职责与实现

NioEventLoop 是jdk nio多路处理实现同修复jdk nio的bug 1.NioEventLoop继承SingleThreadEventLoop 重用单线程处理 2.NioEventLoop是组成 pool EventLoopGroup 基本单元 总之好多边界判断跟业务经验之类的代码,非常烦碎 重要属性 public final class NioEventLoop extends SingleThreadEventLoop { //绑定 selector Selector sel

[编织消息框架][netty源码分析]9 Promise 实现类DefaultPromise职责与实现

netty Future是基于jdk Future扩展,以监听完成任务触发执行Promise是对Future修改任务数据DefaultPromise是重要的模板类,其它不同类型实现基本是一层简单的包装,如DefaultChannelPromise主要是分析await是如何等侍结果的 public interface Future<V> extends java.util.concurrent.Future<V> { Future<V> addListener(Gener

[编织消息框架][netty源码分析]7 Unsafe 实现类NioSocketChannelUnsafe职责与实现

Unsafe 是channel的内部接口, 负责跟socket底层打交道.从书写跟命名上看是不公开给开发者使用的,直到最后实现NioSocketChannelUnsafe也没有公开出去 public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> { interface Unsafe { RecvByteBufAllocator.Handle recvBufAllocHand

[编织消息框架][netty源码分析]11 UnpooledHeapByteBuf 与 ByteBufAllocator

每种ByteBuf都有相应的分配器ByteBufAllocator,类似工厂模式.我们先学习UnpooledHeapByteBuf与其对应的分配器UnpooledByteBufAllocator 如何知道alloc分配器那是个? 可以从官方下载的TimeServer 例子来学习,本项目已有源码可在 TestChannelHandler.class里断点追踪 从图可以看出netty 4.1.8默认的ByteBufAllocator是PooledByteBufAllocator,可以参过启动参数-D

[编织消息框架][netty源码分析]14 PoolChunk 的 PoolSubpage

final class PoolSubpage<T> implements PoolSubpageMetric { //该page分配的chunk final PoolChunk<T> chunk; //内存使用记录 private final long[] bitmap; //该page是否已释放 boolean doNotDestroy; //该page在chunk中的id,通过区段计算偏移 private final int memoryMapIdx; //该page在chu

Netty源码分析第4章(pipeline)----&gt;第4节: 传播inbound事件

Netty源码分析第四章: pipeline 第四节: 传播inbound事件 有关于inbound事件, 在概述中做过简单的介绍, 就是以自己为基准, 流向自己的事件, 比如最常见的channelRead事件, 就是对方发来数据流的所触发的事件, 己方要对这些数据进行处理, 这一小节, 以激活channelRead为例讲解有关inbound事件的处理流程 在业务代码中, 我们自己的handler往往会通过重写channelRead方法来处理对方发来的数据, 那么对方发来的数据是如何走到chan

Netty源码分析第6章(解码器)----&gt;第1节: ByteToMessageDecoder

Netty源码分析第六章: 解码器 概述: 在我们上一个章节遗留过一个问题, 就是如果Server在读取客户端的数据的时候, 如果一次读取不完整, 就触发channelRead事件, 那么Netty是如何处理这类问题的, 在这一章中, 会对此做详细剖析 之前的章节我们学习过pipeline, 事件在pipeline中传递, handler可以将事件截取并对其处理, 而之后剖析的编解码器, 其实就是一个handler, 截取byteBuf中的字节, 然后组建成业务需要的数据进行继续传播 编码器,

Netty源码分析第7章(编码器和写数据)----&gt;第3节: 写buffer队列

Netty源码分析七章: 编码器和写数据 第三节: 写buffer队列 之前的小结我们介绍过, writeAndFlush方法其实最终会调用write和flush方法 write方法最终会传递到head节点, 调用HeadContext的write方法: public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, prom

Netty源码分析--内存模型(上)(十一)

前两节我们分别看了FastThreadLocal和ThreadLocal的源码分析,并且在第八节的时候讲到了处理一个客户端的接入请求,一个客户端是接入进来的,是怎么注册到多路复用器上的.那么这一节我们来一起看下客户端接入完成之后,是怎么实现读写操作的?我们自己想一下,应该就是为刚刚读取的数据分配一块缓冲区,然后把channel中的信息写入到缓冲区中,然后传入到各个handler链上,分别进行处理.那Netty是怎么去分配一块缓冲区的呢?这个就涉及到了Netty的内存模型. 当然,我们在第一节的时