nio server启动的第一步,都是要创建一个serverSocketChannel,我截取一段启动代码,一步步分析:
public void afterPropertiesSet() throws Exception { // 创建rpc工厂 ThreadFactory threadRpcFactory = new NamedThreadFactory("NettyRPC ThreadFactory"); //执行worker线程池数量 int parallel = Runtime.getRuntime().availableProcessors() * 2; // boss EventLoopGroup boss = new NioEventLoopGroup(1); // worker EventLoopGroup worker = new NioEventLoopGroup(parallel,threadRpcFactory, SelectorProvider.provider()); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, worker).channel(NioServerSocketChannel.class) .childHandler(new MessageRecvChannelInitializer(handlerMap)) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture future = bootstrap.bind(serverAddress, Integer.valueOf(port)).sync(); System.out.printf("[author chenjianye] Netty RPC Server start success ip:%s port:%s\n", serverAddress, port); // 注册zookeeper服务 serviceRegistry.register(serverAddress + ":" + port); // wait future.channel().closeFuture().sync(); } finally { worker.shutdownGracefully(); boss.shutdownGracefully(); }}入口就在
ChannelFuture future = bootstrap.bind(serverAddress, Integer.valueOf(port)).sync(); 顺序往下执行,直到AbstractBootstrap类的doBind方法:
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = this.initAndRegister(); final Channel channel = regFuture.channel(); if(regFuture.cause() != null) { return regFuture; } else if(regFuture.isDone()) { ChannelPromise promise1 = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise1); return promise1; } else { final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel, null); regFuture.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if(cause != null) { promise.setFailure(cause); } else { promise.executor = channel.eventLoop(); } AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise); } }); return promise; }}=============================================这个方法需要重点分析,我做个标记,doBind方法分析如下: 1.final ChannelFuture regFuture = this.initAndRegister();
final ChannelFuture initAndRegister() { // 通过channel工厂反射创建ServerSocketChannel,并创建了作用于serverSocketChannel的channelPipeline管道 // 这个管道维护了ctx为元素的双向链表,到目前为止,pipeline的顺序为:head(outBound) ----> tail(inbound) Channel channel = this.channelFactory().newChannel(); try { // 这个init方法第一个主要是设置channel的属性,我不细说了 // 第二个作用是增加了inbound处理器,channelInitializer,里面有一个initChannel()方法会在特定时刻被触发,什么时候被触发,后面我会说到。 this.init(channel); } catch (Throwable var3) { channel.unsafe().closeForcibly(); return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3); } // 到了这里,就开始执行register逻辑,这个是关键,我把register的代码贴出来,跟着后面的代码继续看,跳转到2。 ChannelFuture regFuture = this.group().register(channel); if(regFuture.cause() != null) { if(channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture;}
2.AbstractChannel里的register方法:
public final void register(EventLoop eventLoop, final ChannelPromise promise) { if(eventLoop == null) { throw new NullPointerException("eventLoop"); } else if(AbstractChannel.this.isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); } else if(!AbstractChannel.this.isCompatible(eventLoop)) { promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); } else { AbstractChannel.this.eventLoop = eventLoop; // 到这里还是主线程启动,所以会执行else,启动了boss线程,register0方法跳转到3 if(eventLoop.inEventLoop()) { this.register0(promise); } else { try { eventLoop.execute(new OneTimeTask() { public void run() { AbstractUnsafe.this.register0(promise); } }); } catch (Throwable var4) { AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4); this.closeForcibly(); AbstractChannel.this.closeFuture.setClosed(); this.safeSetFailure(promise, var4); } } }}
3.register0方法
private void register0(ChannelPromise promise) { try { if(!promise.setUncancellable() || !this.ensureOpen(promise)) { return; } boolean t = this.neverRegistered; // 这个方法主要是 this.selectionKey = this.javaChannel().register(this.eventLoop().selector, 0, this);
AbstractChannel.this.doRegister(); this.neverRegistered = false; AbstractChannel.this.registered = true; // 回调之前doBind方法的listener,boss线程添加了新的任务:bind任务 this.safeSetSuccess(promise); // 这个方法东西内容很多 // 第一步:this.head.fireChannelRegistered()没有什么实质内容,最终执行到inbound的next.invokeChannelRegistered()方法 // 第二步:根据上文,目前的pipeline顺序为head---->initializer---->tail // 第三步:执行initializer // 第四步:添加一个新的inbound处理器,ServerBootstrapAcceptor,此时的顺序为:head---->initizlizer---->ServerBootstrapAcceptor---->tail // 第五步:移除initizlizer,此时pipeline顺序为:head---->ServerBootstrapAcceptor---->tail // 第六步:么有什么实质内容,这个方法就算执行结束了 AbstractChannel.this.pipeline.fireChannelRegistered(); // 这里channel还没有绑定,所以isActive()方法返回false,不会继续执行,目前boss线程还剩下bind任务 if(t && AbstractChannel.this.isActive()) { AbstractChannel.this.pipeline.fireChannelActive(); } } catch (Throwable var3) { this.closeForcibly(); AbstractChannel.this.closeFuture.setClosed(); this.safeSetFailure(promise, var3); } }
4.bind任务bind任务是一个outbound,所以会按照tail---->head的顺序执行,目前只有head是outbound。headHandler最终会执行AbstractUnsafe的bind方法:
AbstractChannel.this.doBind(localAddress);
public final void bind(SocketAddress localAddress, ChannelPromise promise) { if(promise.setUncancellable() && this.ensureOpen(promise)) { if(Boolean.TRUE.equals(AbstractChannel.this.config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress)localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.isRoot()) { AbstractChannel.logger.warn("A non-root user can\‘t receive a broadcast packet if the socket is not bound to a wildcard address; binding to a non-wildcard address (" + localAddress + ") anyway as requested."); } boolean wasActive = AbstractChannel.this.isActive(); try { //由于我们是nioServerSocketChannel,所以:this.javaChannel().socket().bind(localAddress, this.config.getBacklog()); AbstractChannel.this.doBind(localAddress); } catch (Throwable var5) {
this.safeSetFailure(promise, var5); this.closeIfClosed(); return; } // 此时已经绑定了,所以isActive()返回true,执行 if(!wasActive && AbstractChannel.this.isActive()) { // 重新往boss线程加入了任务 this.invokeLater(new OneTimeTask() { public void run() { AbstractChannel.this.pipeline.fireChannelActive(); } }); } this.safeSetSuccess(promise); }}
public ChannelPipeline fireChannelActive() { // 来回调用,貌似这里没有什么实质内容 this.head.fireChannelActive(); if(this.channel.config().isAutoRead()) { // 这个是outBound,最终会触发head // head会执行 this.unsafe.beginRead(),最终会执行abstractNioChannel里的doBeginRead()方法,最终会执行到5
this.channel.read(); } return this;}
5.
protected void doBeginRead() throws Exception { if(!this.inputShutdown) { SelectionKey selectionKey = this.selectionKey; if(selectionKey.isValid()) { this.readPending = true; int interestOps = selectionKey.interestOps(); if((interestOps & this.readInterestOp) == 0) { selectionKey.interestOps(interestOps | this.readInterestOp); } } }}
终于看到我们熟悉的东西了,最终把selectionKey的interestOps设置为SelectionKey.OP_ACCEPT。
时间: 2024-10-08 20:27:15