Netty源码分析之处理新连接

Netty服务端处理新连接的流程:

1.检测新连接

2.基于NioServerSocketChannel创建客户端的NioSocketChannel

3.分配客户端channel的线程,注册线程所对应的selector

4.向selector注册读事件

新连接检测

  服务端在创建完服务端的NioServerSocketChannel之后,绑定完端口号之后,会注册accept事件。当有新连接进入的时候,会触发accpet事件。之前博客有分析过EventLoop的thread的run方法会循环select检测是否有新的IO事件,如果检测到有IO事件,就通过processSelectedKey来处理对应的IO事件,这里的IO事件是accept,就会调用channel内部聚合的UnSafe类的read()方法。

  这里循环调用doReadMessage()方法的条件是是否自动读,读取的连接数是否小于最大连接数,服务端channel默认一次最多读取16个新连接。

  当没有超过最大连接数,并且是自动读的状态时候,就会循环调用doReadMessage,直到没有读到新连接,跳出while循环,

public void read() {
    assert AbstractNioMessageChannel.this.eventLoop().inEventLoop();

    ChannelConfig config = AbstractNioMessageChannel.this.config();
    if(!config.isAutoRead() && !AbstractNioMessageChannel.this.isReadPending()) {
        this.removeReadOp();
    } else {
        int maxMessagesPerRead = config.getMaxMessagesPerRead();
        ChannelPipeline pipeline = AbstractNioMessageChannel.this.pipeline();
        boolean closed = false;
        Throwable exception = null;

        try {
            int size;
            try {
                do {
                    size = AbstractNioMessageChannel.this.doReadMessages(this.readBuf);
                    if(size == 0) {
                        break;
                    }

                    if(size < 0) {
                        closed = true;
                        break;
                    }
                } while(config.isAutoRead() && this.readBuf.size() < maxMessagesPerRead);
            } catch (Throwable var11) {
                exception = var11;
            }

            AbstractNioMessageChannel.this.setReadPending(false);
            size = this.readBuf.size();
            int i = 0;

            while(true) {
                if(i >= size) {
                    this.readBuf.clear();
                    pipeline.fireChannelReadComplete();
                    if(exception != null) {
                        if(exception instanceof IOException && !(exception instanceof PortUnreachableException)) {
                            closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
                        }

                        pipeline.fireExceptionCaught(exception);
                    }

                    if(closed && AbstractNioMessageChannel.this.isOpen()) {
                        this.close(this.voidPromise());
                    }
                    break;
                }

                pipeline.fireChannelRead(this.readBuf.get(i));
                ++i;
            }
        } finally {
            if(!config.isAutoRead() && !AbstractNioMessageChannel.this.isReadPending()) {
                this.removeReadOp();
            }

        }

    }
}

  

创建NioSocketChannel

  这里read()方法是通过循环调用NioServerSocket的doReadMessage(byteBuf)方法进行实现channel的读取新连接。而doReadMessage是通过java nio的channel的accept获取当前新连接的channel,这里获取的channel也是java nio中的channel,然后将这个channel封装成NioSocketChannel,将NioServerSocketChannel和javaChannel都作为参数构造NioSocketChannel,放到buf中去,返回1,表示已经读取一条连接。

protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = this.javaChannel().accept();

    try {
        if(ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable var6) {
        logger.warn("Failed to create a new channel from an accepted socket.", var6);

        try {
            ch.close();
        } catch (Throwable var5) {
            logger.warn("Failed to close a socket.", var5);
        }
    }

    return 0;
}

  NioSocketChannel的构造函数。

//配置Config类public NioSocketChannel(Channel parent, java.nio.channels.SocketChannel socket) {
    super(parent, socket);
    this.config = new NioSocketChannel.NioSocketChannelConfig(this, socket.socket(), null);
}
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {    super(parent, ch, 1);}
//保存channel感兴趣的读事件,并将channel设置为非阻塞的protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {    super(parent);    this.ch = ch;    this.readInterestOp = readInterestOp;

    try {        ch.configureBlocking(false);    } catch (IOException var7) {        try {            ch.close();        } catch (IOException var6) {            if(logger.isWarnEnabled()) {                logger.warn("Failed to close a partially initialized socket.", var6);            }        }

        throw new ChannelException("Failed to enter non-blocking mode.", var7);    }}

  这里配置channel的Config类使用了setTcpNoDelay(true),这里禁止了Nagle算法,Nagle算法的目的是让小的数据包尽量集合成大的数据包发送出去,Netty为了使数据能够及时发出去,禁止了Nagle算法。

public DefaultSocketChannelConfig(io.netty.channel.socket.SocketChannel channel, Socket javaSocket) {
    super(channel);
    if(javaSocket == null) {
        throw new NullPointerException("javaSocket");
    } else {
        this.javaSocket = javaSocket;
        if(PlatformDependent.canEnableTcpNoDelayByDefault()) {
            try {
                this.setTcpNoDelay(true);
            } catch (Exception var4) {
                ;
            }
        }

    }
}
public SocketChannelConfig setTcpNoDelay(boolean tcpNoDelay) {    try {        this.javaSocket.setTcpNoDelay(tcpNoDelay);        return this;    } catch (SocketException var3) {        throw new ChannelException(var3);    }}

  

新连接NioEventLoop的分配和selector的注册

  在读取完新连接之后,会调用fireChannelRead方法,而服务端的NioServerSocketChannel在初始化阶段,在上面的pipeline添加了连接处理器ServerBootstrap.ServerBootstrapAcceptor,read事件会从head传送到serverBootstrapAcceptor,serverBootstrapAcceptor也是一个ChannelHandler,它会对新连接进行处理。

处理流程:

  1.设置客户端channel的childHandler

  添加channelHandler,这里的channelHandler一般是一个ChannelInitializer,他可以获取channel的pipeline,并且在上面添加一系列的Handler,最后再将ChannelInitializer这个Handler删除。

  2.设置options和attrs

  options是底层tcp读写的相关参数,attrs可以在客户端channel上面绑定一些属性。这里的options和attrs都是用户通过代码设置的。比如

 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true)

  设置的这些都会保存到ServerBootstrap这个类,然后在initChannel的时候会将这些参数都传入,构造一个ServerBootstrapAcceptor,这样当连接器接受到新的连接之后,新建子channel,就会带有这些属性。

  3.选择NioEventLoop,并且注册selector

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel)msg;
    //添加ChannelHandler   child.pipeline().addLast(new ChannelHandler[]{this.childHandler});
    Map.Entry[] t = this.childOptions;
    int len$ = t.length;

    int i$;
    Map.Entry e;
    for(i$ = 0; i$ < len$; ++i$) {
        e = t[i$];

        try {
            if(!child.config().setOption((ChannelOption)e.getKey(), e.getValue())) {
                ServerBootstrap.logger.warn("Unknown channel option: " + e);
            }
        } catch (Throwable var10) {
            ServerBootstrap.logger.warn("Failed to set a channel option: " + child, var10);
        }
    }

    t = this.childAttrs;
    len$ = t.length;

    for(i$ = 0; i$ < len$; ++i$) {
        e = t[i$];
        child.attr((AttributeKey)e.getKey()).set(e.getValue());
    }

    try {
        this.childGroup.register(child).addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                if(!future.isSuccess()) {
                    ServerBootstrap.ServerBootstrapAcceptor.forceClose(child, future.cause());
                }

            }
        });
    } catch (Throwable var9) {
        forceClose(child, var9);
    }

}

  这里注册是使用用户传进来的workerGroup线程池,使用register方法完成注册。

public ChannelFuture register(Channel channel) {
    return this.next().register(channel);
}

  这里的next()函数返回一个NioEventLoop,相当于从线程池里面挑选一个线程与这个channel进行绑定。最后通过层层调用,还是调用了java nio中channel的register方法,这时注册的时候,不关心任何事件。

public ChannelFuture register(Channel channel, ChannelPromise promise) {
    if(channel == null) {
        throw new NullPointerException("channel");
    } else if(promise == null) {
        throw new NullPointerException("promise");
    } else {
        channel.unsafe().register(this, promise);
        return promise;
    }
}
//AbstractChannelpublic final void register(EventLoop eventLoop, final ChannelPromise promise) {    if(eventLoop == null) {        throw new NullPointerException("eventLoop");    } else if(AbstractChannel.this.isRegistered()) {        promise.setFailure(new IllegalStateException("registered to an event loop already"));    } else if(!AbstractChannel.this.isCompatible(eventLoop)) {        promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));    } else {        AbstractChannel.this.eventLoop = eventLoop;        if(eventLoop.inEventLoop()) {            this.register0(promise);        } else {            try {                eventLoop.execute(new OneTimeTask() {                    public void run() {                        AbstractUnsafe.this.register0(promise);                    }                });            } catch (Throwable var4) {                AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);                this.closeForcibly();                AbstractChannel.this.closeFuture.setClosed();                this.safeSetFailure(promise, var4);            }        }

    }}
private void register0(ChannelPromise promise) {    try {        if(!promise.setUncancellable() || !this.ensureOpen(promise)) {            return;        }

        AbstractChannel.this.doRegister();        AbstractChannel.this.registered = true;        this.safeSetSuccess(promise);        AbstractChannel.this.pipeline.fireChannelRegistered();        if(AbstractChannel.this.isActive()) {            AbstractChannel.this.pipeline.fireChannelActive();        }    } catch (Throwable var3) {        this.closeForcibly();        AbstractChannel.this.closeFuture.setClosed();        this.safeSetFailure(promise, var3);    }

}
protected void doRegister() throws Exception {    boolean selected = false;

    while(true) {        try {            this.selectionKey = this.javaChannel().register(this.eventLoop().selector, 0, this);            return;        } catch (CancelledKeyException var3) {            if(selected) {                throw var3;            }

            this.eventLoop().selectNow();            selected = true;        }    }}

  

NioSocketChannel读事件的注册

  通过传播channelActive方法,最终会调用channel的read()方法,channel在创建的时候都是默认自动读的。

public ChannelPipeline fireChannelActive() {
    this.head.fireChannelActive();
    if(this.channel.config().isAutoRead()) {
        this.channel.read();
    }

    return this;
}

  会将channel的Active状态在pipeline上面传播,调用read方法,最后会调用doBeginRead,去注册感兴趣的事件,NioSocketChannel感兴趣的事件是读事件,而NioServerSocketChannel感兴趣的事件则是Accept事件。

public ChannelHandlerContext read() {
    final AbstractChannelHandlerContext next = this.findContextOutbound();
    EventExecutor executor = next.executor();
    if(executor.inEventLoop()) {
        next.invokeRead();
    } else {
        Runnable task = next.invokeReadTask;
        if(task == null) {
            next.invokeReadTask = task = new Runnable() {
                public void run() {
                    next.invokeRead();
                }
            };
        }

        executor.execute(task);
    }

    return this;
}
protected void doBeginRead() throws Exception {    if(!this.inputShutdown) {        SelectionKey selectionKey = this.selectionKey;        if(selectionKey.isValid()) {            this.readPending = true;            int interestOps = selectionKey.interestOps();            if((interestOps & this.readInterestOp) == 0) {                selectionKey.interestOps(interestOps | this.readInterestOp);            }

        }    }}

原文地址:https://www.cnblogs.com/xiaobaituyun/p/10801495.html

时间: 2024-11-12 18:46:52

Netty源码分析之处理新连接的相关文章

Netty源码分析第3章(客户端接入流程)----&gt;第5节: 监听读事件

Netty源码分析第三章: 客户端接入流程 第五节: 监听读事件 我们回到AbstractUnsafe的register0()方法: private void register0(ChannelPromise promise) { try { //省略代码 //做实际的注册 doRegister(); neverRegistered = false; registered = true; //触发事件 pipeline.invokeHandlerAddedIfNeeded(); safeSetS

Netty源码分析第8章(高性能工具类FastThreadLocal和Recycler)----&gt;第6节: 异线程回收对象

Netty源码分析第八章: 高性能工具类FastThreadLocal和Recycler 第六节: 异线程回收对象 异线程回收对象, 就是创建对象和回收对象不在同一条线程的情况下, 对象回收的逻辑 我们之前小节简单介绍过, 异线程回收对象, 是不会放在当前线程的stack中的, 而是放在一个WeakOrderQueue的数据结构中, 回顾我们之前的一个图: 8-6-1 相关的逻辑, 我们跟到源码中: 首先从回收对象的入口方法开始, DefualtHandle的recycle方法: public

4. Netty源码分析之Unsafe

Unsafe类实际上是Channel接口的辅助类,实际的IO操作都是由Unsafe接口完成的. 一.Unsafe继承关系图 二.AbstractUnsafe源码分析 1. register方法 register方法主要用于将当前Unsafe对应的Channel注册到EventLoop的多路复用器上,然后调用DefaultChannelPipeline的fireChannelRegisted方法,如果Channel被激活,则调用fireChannelActive方法. public final v

netty 源码分析二

以服务端启动,接收客户端连接整个过程为例分析, 简略分为 五个过程: 1.NioServerSocketChannel 管道生成, 2.NioServerSocketChannel 管道完成初始化, 3.NioServerSocketChannel注册至Selector选择器, 4.NioServerSocketChannel管道绑定到指定端口,启动服务 5.NioServerSocketChannel接受客户端的连接,进行相应IO操作 Ps:netty内部过程远比这复杂,简略记录下方便以后回忆

netty源码分析之揭开reactor线程的面纱(二)

如果你对netty的reactor线程不了解,建议先看下上一篇文章netty源码分析之揭开reactor线程的面纱(一),这里再把reactor中的三个步骤的图贴一下 reactor线程 我们已经了解到netty reactor线程的第一步是轮询出注册在selector上面的IO事件(select),那么接下来就要处理这些IO事件(process selected keys),本篇文章我们将一起来探讨netty处理IO事件的细节 我们进入到reactor线程的 run 方法,找到处理IO事件的代

Netty源码分析第2章(NioEventLoop)----&gt;第6节: 执行selector操作

Netty源码分析第二章: NioEventLoop 第六节: 执行select操作 分析完了selector的创建和优化的过程, 这一小节分析select相关操作 跟到跟到NioEventLoop的run方法: protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE

Netty源码分析第3章(客户端接入流程)----&gt;第4节: NioSocketChannel注册到selector

Netty源码分析第三章: 客户端接入流程 第四节: NioSocketChannel注册到selector 我们回到最初的NioMessageUnsafe的read()方法: public void read() { //必须是NioEventLoop方法调用的, 不能通过外部线程调用 assert eventLoop().inEventLoop(); //服务端channel的config final ChannelConfig config = config(); //服务端channel

Netty源码分析第4章(pipeline)----&gt;第7节: 前章节内容回顾

Netty源码分析第四章: pipeline 第七节: 前章节内容回顾 我们在第一章和第三章中, 遗留了很多有关事件传输的相关逻辑, 这里带大家一一回顾 首先看两个问题: 1.在客户端接入的时候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))为什么会调用到ServerBootstrap的内部类ServerBootstrapAcceptor中的channelRead()方法 2.客户端handler是什么时候被添加

Netty源码分析第5章(ByteBuf)----&gt;第1节: AbstractByteBuf

Netty源码分析第五章: ByteBuf 概述: 熟悉Nio的小伙伴应该对jdk底层byteBuffer不会陌生, 也就是字节缓冲区, 主要用于对网络底层io进行读写, 当channel中有数据时, 将channel中的数据读取到字节缓冲区, 当要往对方写数据的时候, 将字节缓冲区的数据写到channel中 但是jdk的byteBuffer是使用起来有诸多不便, 比如只有一个标记位置的指针position, 在进行读写操作时要频繁的通过flip()方法进行指针位置的移动, 极易出错, 并且by