netty源码分析之一:server的启动

nio server启动的第一步,都是要创建一个serverSocketChannel,我截取一段启动代码,一步步分析:

public void afterPropertiesSet() throws Exception {    // 创建rpc工厂    ThreadFactory threadRpcFactory = new NamedThreadFactory("NettyRPC ThreadFactory");    //执行worker线程池数量    int parallel = Runtime.getRuntime().availableProcessors() * 2;    // boss    EventLoopGroup boss = new NioEventLoopGroup(1);    // worker    EventLoopGroup worker = new NioEventLoopGroup(parallel,threadRpcFactory, SelectorProvider.provider());    try {        ServerBootstrap bootstrap = new ServerBootstrap();        bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)                .childHandler(new MessageRecvChannelInitializer(handlerMap))                .option(ChannelOption.SO_BACKLOG, 128)                .childOption(ChannelOption.SO_KEEPALIVE, true);

ChannelFuture future = bootstrap.bind(serverAddress, Integer.valueOf(port)).sync();        System.out.printf("[author chenjianye] Netty RPC Server start success ip:%s port:%s\n", serverAddress, port);

// 注册zookeeper服务        serviceRegistry.register(serverAddress + ":" + port);        // wait        future.channel().closeFuture().sync();    } finally {        worker.shutdownGracefully();        boss.shutdownGracefully();    }}入口就在
ChannelFuture future = bootstrap.bind(serverAddress, Integer.valueOf(port)).sync();

顺序往下执行,直到AbstractBootstrap类的doBind方法:
private ChannelFuture doBind(final SocketAddress localAddress) {    final ChannelFuture regFuture = this.initAndRegister();    final Channel channel = regFuture.channel();    if(regFuture.cause() != null) {        return regFuture;    } else if(regFuture.isDone()) {        ChannelPromise promise1 = channel.newPromise();        doBind0(regFuture, channel, localAddress, promise1);        return promise1;    } else {        final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel, null);        regFuture.addListener(new ChannelFutureListener() {            public void operationComplete(ChannelFuture future) throws Exception {                Throwable cause = future.cause();                if(cause != null) {                    promise.setFailure(cause);                } else {                    promise.executor = channel.eventLoop();                }

AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);            }        });        return promise;    }}=============================================这个方法需要重点分析,我做个标记,doBind方法分析如下:

1.final ChannelFuture regFuture = this.initAndRegister();
final ChannelFuture initAndRegister() {    // 通过channel工厂反射创建ServerSocketChannel,并创建了作用于serverSocketChannel的channelPipeline管道    // 这个管道维护了ctx为元素的双向链表,到目前为止,pipeline的顺序为:head(outBound) ----> tail(inbound)    Channel channel = this.channelFactory().newChannel();

try {     // 这个init方法第一个主要是设置channel的属性,我不细说了        // 第二个作用是增加了inbound处理器,channelInitializer,里面有一个initChannel()方法会在特定时刻被触发,什么时候被触发,后面我会说到。        this.init(channel);    } catch (Throwable var3) {        channel.unsafe().closeForcibly();        return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);    }

   // 到了这里,就开始执行register逻辑,这个是关键,我把register的代码贴出来,跟着后面的代码继续看,跳转到2。    ChannelFuture regFuture = this.group().register(channel);    if(regFuture.cause() != null) {        if(channel.isRegistered()) {            channel.close();        } else {            channel.unsafe().closeForcibly();        }    }

return regFuture;}
2.AbstractChannel里的register方法:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {    if(eventLoop == null) {        throw new NullPointerException("eventLoop");    } else if(AbstractChannel.this.isRegistered()) {        promise.setFailure(new IllegalStateException("registered to an event loop already"));    } else if(!AbstractChannel.this.isCompatible(eventLoop)) {        promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));    } else {        AbstractChannel.this.eventLoop = eventLoop;        // 到这里还是主线程启动,所以会执行else,启动了boss线程,register0方法跳转到3        if(eventLoop.inEventLoop()) {            this.register0(promise);        } else {            try {                eventLoop.execute(new OneTimeTask() {                    public void run() {                        AbstractUnsafe.this.register0(promise);                    }                });            } catch (Throwable var4) {                AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);                this.closeForcibly();                AbstractChannel.this.closeFuture.setClosed();                this.safeSetFailure(promise, var4);            }        }

}}
3.register0方法
private void register0(ChannelPromise promise) {    try {        if(!promise.setUncancellable() || !this.ensureOpen(promise)) {            return;        }

boolean t = this.neverRegistered;        // 这个方法主要是 this.selectionKey = this.javaChannel().register(this.eventLoop().selector, 0, this);
        AbstractChannel.this.doRegister();        this.neverRegistered = false;        AbstractChannel.this.registered = true;

// 回调之前doBind方法的listener,boss线程添加了新的任务:bind任务        this.safeSetSuccess(promise);

// 这个方法东西内容很多        // 第一步:this.head.fireChannelRegistered()没有什么实质内容,最终执行到inbound的next.invokeChannelRegistered()方法        // 第二步:根据上文,目前的pipeline顺序为head---->initializer---->tail        // 第三步:执行initializer        // 第四步:添加一个新的inbound处理器,ServerBootstrapAcceptor,此时的顺序为:head---->initizlizer---->ServerBootstrapAcceptor---->tail        // 第五步:移除initizlizer,此时pipeline顺序为:head---->ServerBootstrapAcceptor---->tail        // 第六步:么有什么实质内容,这个方法就算执行结束了        AbstractChannel.this.pipeline.fireChannelRegistered();

// 这里channel还没有绑定,所以isActive()方法返回false,不会继续执行,目前boss线程还剩下bind任务        if(t && AbstractChannel.this.isActive()) {            AbstractChannel.this.pipeline.fireChannelActive();        }    } catch (Throwable var3) {        this.closeForcibly();        AbstractChannel.this.closeFuture.setClosed();        this.safeSetFailure(promise, var3);    }

}
4.bind任务bind任务是一个outbound,所以会按照tail---->head的顺序执行,目前只有head是outbound。headHandler最终会执行AbstractUnsafe的bind方法:
AbstractChannel.this.doBind(localAddress);
public final void bind(SocketAddress localAddress, ChannelPromise promise) {    if(promise.setUncancellable() && this.ensureOpen(promise)) {        if(Boolean.TRUE.equals(AbstractChannel.this.config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress)localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {            AbstractChannel.logger.warn("A non-root user can\‘t receive a broadcast packet if the socket is not bound to a wildcard address; binding to a non-wildcard address (" + localAddress + ") anyway as requested.");        }

boolean wasActive = AbstractChannel.this.isActive();

try {        //由于我们是nioServerSocketChannel,所以:this.javaChannel().socket().bind(localAddress, this.config.getBacklog());

AbstractChannel.this.doBind(localAddress);

} catch (Throwable var5) {
            this.safeSetFailure(promise, var5);            this.closeIfClosed();            return;        }

     // 此时已经绑定了,所以isActive()返回true,执行        if(!wasActive && AbstractChannel.this.isActive()) {            // 重新往boss线程加入了任务            this.invokeLater(new OneTimeTask() {                public void run() {                    AbstractChannel.this.pipeline.fireChannelActive();                }            });        }

this.safeSetSuccess(promise);    }}

public ChannelPipeline fireChannelActive() {    // 来回调用,貌似这里没有什么实质内容    this.head.fireChannelActive();    if(this.channel.config().isAutoRead()) {        // 这个是outBound,最终会触发head     // head会执行 this.unsafe.beginRead(),最终会执行abstractNioChannel里的doBeginRead()方法,最终会执行到5
        this.channel.read();    }

return this;}
5.
protected void doBeginRead() throws Exception {    if(!this.inputShutdown) {        SelectionKey selectionKey = this.selectionKey;        if(selectionKey.isValid()) {            this.readPending = true;            int interestOps = selectionKey.interestOps();            if((interestOps & this.readInterestOp) == 0) {                selectionKey.interestOps(interestOps | this.readInterestOp);            }

}    }}
终于看到我们熟悉的东西了,最终把selectionKey的interestOps设置为SelectionKey.OP_ACCEPT。
				
时间: 2024-10-08 20:27:15

netty源码分析之一:server的启动的相关文章

netty源码分析之服务端启动

ServerBootstrap与Bootstrap分别是netty中服务端与客户端的引导类,主要负责服务端与客户端初始化.配置及启动引导等工作,接下来我们就通过netty源码中的示例对ServerBootstrap与Bootstrap的源码进行一个简单的分析.首先我们知道这两个类都继承自AbstractBootstrap类 接下来我们就通过netty源码中ServerBootstrap的实例入手对其进行一个简单的分析. // Configure the server. EventLoopGrou

netty 源码分析二

以服务端启动,接收客户端连接整个过程为例分析, 简略分为 五个过程: 1.NioServerSocketChannel 管道生成, 2.NioServerSocketChannel 管道完成初始化, 3.NioServerSocketChannel注册至Selector选择器, 4.NioServerSocketChannel管道绑定到指定端口,启动服务 5.NioServerSocketChannel接受客户端的连接,进行相应IO操作 Ps:netty内部过程远比这复杂,简略记录下方便以后回忆

Netty源码分析第3章(客户端接入流程)---->第4节: NioSocketChannel注册到selector

Netty源码分析第三章: 客户端接入流程 第四节: NioSocketChannel注册到selector 我们回到最初的NioMessageUnsafe的read()方法: public void read() { //必须是NioEventLoop方法调用的, 不能通过外部线程调用 assert eventLoop().inEventLoop(); //服务端channel的config final ChannelConfig config = config(); //服务端channel

Netty源码分析第6章(解码器)---->第1节: ByteToMessageDecoder

Netty源码分析第六章: 解码器 概述: 在我们上一个章节遗留过一个问题, 就是如果Server在读取客户端的数据的时候, 如果一次读取不完整, 就触发channelRead事件, 那么Netty是如何处理这类问题的, 在这一章中, 会对此做详细剖析 之前的章节我们学习过pipeline, 事件在pipeline中传递, handler可以将事件截取并对其处理, 而之后剖析的编解码器, 其实就是一个handler, 截取byteBuf中的字节, 然后组建成业务需要的数据进行继续传播 编码器,

Netty源码分析-- 处理客户端接入请求(八)

这一节我们来一起看下,一个客户端接入进来是什么情况.首先我们根据之前的分析,先启动服务端,然后打一个断点. 这个断点打在哪里呢?就是NioEventLoop上的select方法上. 然后我们启动一个客户端. 然后我们debug看到,selectedKey的数量 = 1,说明有accept或者读写等事件发生. 接下就会进 processSelectedKeys() 我们上一节讲到,这里的attach就是NioServerSocketChannel, 我们进入 processSelectedKey(

Netty源码分析--内存模型(上)(十一)

前两节我们分别看了FastThreadLocal和ThreadLocal的源码分析,并且在第八节的时候讲到了处理一个客户端的接入请求,一个客户端是接入进来的,是怎么注册到多路复用器上的.那么这一节我们来一起看下客户端接入完成之后,是怎么实现读写操作的?我们自己想一下,应该就是为刚刚读取的数据分配一块缓冲区,然后把channel中的信息写入到缓冲区中,然后传入到各个handler链上,分别进行处理.那Netty是怎么去分配一块缓冲区的呢?这个就涉及到了Netty的内存模型. 当然,我们在第一节的时

netty源码分析之揭开reactor线程的面纱(二)

如果你对netty的reactor线程不了解,建议先看下上一篇文章netty源码分析之揭开reactor线程的面纱(一),这里再把reactor中的三个步骤的图贴一下 reactor线程 我们已经了解到netty reactor线程的第一步是轮询出注册在selector上面的IO事件(select),那么接下来就要处理这些IO事件(process selected keys),本篇文章我们将一起来探讨netty处理IO事件的细节 我们进入到reactor线程的 run 方法,找到处理IO事件的代

Netty源码分析第2章(NioEventLoop)---->第7节: 处理IO事件

Netty源码分析第二章: NioEventLoop 第七节:处理IO事件 上一小节我们了解了执行select()操作的相关逻辑, 这一小节我们继续学习select()之后, 轮询到io事件的相关逻辑: 回到NioEventLoop的run()方法: protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case Sele

Netty源码分析第2章(NioEventLoop)---->第6节: 执行selector操作

Netty源码分析第二章: NioEventLoop 第六节: 执行select操作 分析完了selector的创建和优化的过程, 这一小节分析select相关操作 跟到跟到NioEventLoop的run方法: protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE

Netty源码分析第3章(客户端接入流程)---->第5节: 监听读事件

Netty源码分析第三章: 客户端接入流程 第五节: 监听读事件 我们回到AbstractUnsafe的register0()方法: private void register0(ChannelPromise promise) { try { //省略代码 //做实际的注册 doRegister(); neverRegistered = false; registered = true; //触发事件 pipeline.invokeHandlerAddedIfNeeded(); safeSetS