在 Netty 中创建 1 个 NioServerSocketChannel 在指定的端口监听客户端连接,这个过程主要有以下 个步骤:
- 创建 NioServerSocketChannel
- 初始化并注册 NioServerSocketChannel
- 绑定指定端口
首先列出一个简易服务端的启动代码:
1 public void start() { 2 EventLoopGroup bossGroup = new NioEventLoopGroup(1); 3 EventLoopGroup workerGroup = new NioEventLoopGroup(); 4 try { 5 ServerBootstrap sbs = new ServerBootstrap() 6 //添加 group 7 .group(bossGroup, workerGroup) 8 //指定服务端 Channel 类型 9 .channel(NioServerSocketChannel.class) 10 //添加服务端 Channel 的 Handler 11 .handler(new HelloWorldServerHandler()) 12 //添加客户端 Channel 的 Handler 13 .childHandler(new ChannelInitializer<NioSocketChannel>() { 14 @Override 15 protected void initChannel(NioSocketChannel ch) throws Exception { 16 //为后续接入的客户端 Channel 准备的字符串编解码 Handler 17 ch.pipeline().addLast(new StringDecoder()); 18 ch.pipeline().addLast(new StringEncoder()); 19 } 20 }); 21 //监听指定的端口 22 ChannelFuture future = sbs.bind(port).sync(); 23 System.out.println("Server start listen at " + port); 24 future.channel().closeFuture().sync(); 25 } catch (Exception e) { 26 bossGroup.shutdownGracefully(); 27 workerGroup.shutdownGracefully(); 28 } 29 }
下面就从 ServerBootstrap 的 bind(int port)方法开始分析服务端的 NioServerSocketChannel 的创建过程。
1. 创建 NioServerSocketChannel
跟随 bind 方法的调用,最终在 AbstractBootstrap 类的 doBind()方法找到了初始化,注册和绑定方法调用:
1 private ChannelFuture doBind(final SocketAddress localAddress) { 2 //初始化并注册 3 final ChannelFuture regFuture = initAndRegister(); 4 final Channel channel = regFuture.channel(); 5 if (regFuture.cause() != null) { 6 return regFuture; 7 } 8 9 if (regFuture.isDone()) { 10 // At this point we know that the registration was complete and successful. 11 ChannelPromise promise = channel.newPromise(); 12 //绑定本地端口 13 doBind0(regFuture, channel, localAddress, promise); 14 return promise; 15 } else { 16 //.... 17 } 18 }
2.
- 初始化并注册 NioServerSocketChannel
首先来看一下这个 initAndRegister()方法:
1 final ChannelFuture initAndRegister() { 2 Channel channel = null; 3 try { 4 //创建 Channel 5 channel = channelFactory.newChannel(); 6 //初始化 Channel 7 init(channel); 8 } catch (Throwable t) { 9 //... 10 } 11 12 //注册 13 ChannelFuture regFuture = config().group().register(channel); 14 if (regFuture.cause() != null) { 15 if (channel.isRegistered()) { 16 channel.close(); 17 } else { 18 channel.unsafe().closeForcibly(); 19 } 20 } 21 //... 22 }
Channel 也是通过工厂类来创建的,这个工厂默认是 ReflectiveChannelFactory,是在前面启动代码中,设置服务端 Channel 类型时创建的。通过名字可以知道,是用反射的方式创建了 Channel 对象。
init()方法有两种实现,这里分析的是 ServerBootstrap 的实现:
1 @Override 2 void init(Channel channel) throws Exception { 3 //... option 的设置省略掉 4 //pipeline 的创建,默认使用的 DefaultPipeline 5 ChannelPipeline p = channel.pipeline(); 6 7 //... 客户端 Channel 相关配置的保存 8 9 p.addLast(new ChannelInitializer<Channel>() { 10 @Override 11 public void initChannel(Channel ch) throws Exception { 12 final ChannelPipeline pipeline = ch.pipeline(); 13 //这里添加的是启动代码中,服务端的 Handler 14 ChannelHandler handler = config.handler(); 15 if (handler != null) { 16 pipeline.addLast(handler); 17 } 18 19 // We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler. 20 // In this case the initChannel(...) method will only be called after this method returns. Because 21 // of this we need to ensure we add our handler in a delayed fashion so all the users handler are 22 // placed in front of the ServerBootstrapAcceptor. 23 ch.eventLoop().execute(new Runnable() { 24 @Override 25 public void run() { 26 //这里添加了一个 Accepter,用来处理新连接的接入 27 pipeline.addLast(new ServerBootstrapAcceptor( 28 currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); 29 } 30 }); 31 } 32 }); 33 }
初始化 Channel 这个动作,主要做了 4 件事:
- 创建 pipeline
- 为 Channel 添加用户创建的 Handler
- 添加 Accepter
- 其他属性的设置
接下来分析 Channel 的注册,需要关注的是这行代码:
1 ChannelFuture regFuture = config().group().register(channel);
config()方法获取了启动时创建的 config 对象,这个对象的 group()方法就返回了启动时传入的 bossGroup。启动代码中传入了两个 group,返回的为什么是 boosGroup 呢?查看启动代码中的 group(EventLoopGroup parentGroup, EventLoopGroup childGroup)方法,在它第一行就调用了 super.group(parentGroup),将第一个 group 对象传给了父类 AbstractBootstrap。而此处 config 调用的 group()方法返回的正是父类中的 group。
因为这里是一个 NioEventLoopGroup 对象,所以使用的 register(channel)方法是 MultithreadEventLoopGroup 中的。
1 @Override 2 public ChannelFuture register(Channel channel) { 3 return next().register(channel); 4 }
查看 next()方法可以发现,最终是调用之前创建 group 时创建的 chooser 的 next()方法,该方法会返回一个 NioEventLooop 对象(EventLoop 是在这里分配的),它的 register()方法是在父类 SingleThreadEventLoop 中实现的。最终调用了 AbstractChannel 中的注册方法。
1 @Override 2 public final void register(EventLoop eventLoop, final ChannelPromise promise) { 3 //... 4 //将前面返回的 eventLoop 保存起来 5 AbstractChannel.this.eventLoop = eventLoop; 6 //判断 eventLoop 中的 thread 是否是当前线程 7 //初次启动时,eventLoop 中的 thread 为 null 8 if (eventLoop.inEventLoop()) { 9 register0(promise); 10 } else { 11 try { 12 //将注册任务传进去 13 eventLoop.execute(new Runnable() { 14 @Override 15 public void run() { 16 //注册 17 register0(promise); 18 } 19 }); 20 } catch (Throwable t) { 21 //... 22 } 23 } 24 }
将注册动作封装成一个任务,然后交给 eventLoop 对象处理。
@Override public void execute(Runnable task) { //... //这里通用是 false boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { //启动线程 startThread(); addTask(task);//将前面传进来的注册任务添加进队列 if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } } private void startThread() { //判断是否需要启动线程 if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { //启动线程 doStartThread(); } } }
上面代码中的 startThread()方法有个 STATE_UPDATER,它是用来更新该对象的 state 属性,是一个线程安全的操作。state 默认值为 ST_NOT_STARTED,所以第一次进入该方法,条件判断为 true,接下来进行 CAS 操作,将 state 设置为 ST_STARTED,然后调用 doStartThread()方法。当 group 中的线程都启用之后,下一次 chooser 再选中这个线程,startThread()方法中的第一个 if 的条件判断就是 false 了,不会再创建新的线程。
1 private void doStartThread() { 2 assert thread == null; 3 //这个 executor 就是构建 group 时,创建出来的 executor 4 executor.execute(new Runnable() { 5 @Override 6 public void run() { 7 thread = Thread.currentThread(); 8 if (interrupted) { 9 thread.interrupt(); 10 } 11 12 boolean success = false; 13 updateLastExecutionTime(); 14 try { 15 //前面创建的是 NioEventLoop 16 SingleThreadEventExecutor.this.run(); 17 success = true; 18 } catch (Throwable t) { 19 logger.warn("Unexpected exception from an event executor: ", t); 20 } finally { 21 for (;;) { 22 //更新 state 23 int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this); 24 if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( 25 SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { 26 break; 27 } 28 } 29 //... 30 } 31 } 32 }); 33 }
前一篇分析 EventLoopGroup 创建时说过,会在 EventLoop 保存一个 executor 对象的引用,最终个任务就是交给这个 executor 来处理的。executor 的 execute(Runnable task) 方法会创建新线程,并执行传入的 task。接下来看一下 NioEventLoop 中的 run() 方法。
1 protected void run() { 2 for (;;) { 3 try { 4 //计算 select 策略,当前有任务时,会进行一次 selectNow(NIO),返回就绪的 key 个数 5 //显然 switch 中没有匹配项,直接跳出 switch 6 //无任务时,则直接返回 SelectStrategy.SELECT 7 //这里的 SelectStrategy.CONTINUE 感觉不会匹配到 8 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { 9 case SelectStrategy.CONTINUE: 10 continue; 11 case SelectStrategy.SELECT: 12 //当没有可处理的任务时,直接进行 select 操作 13 // wakenUp.getAndSet(false) 返回的是 oldValue,由于默认值是 false 14 // 所以第一次返回的是 false 15 select(wakenUp.getAndSet(false)); 16 17 // ‘wakenUp.compareAndSet(false, true)‘ is always evaluated 18 // before calling ‘selector.wakeup()‘ to reduce the wake-up 19 // overhead. (Selector.wakeup() is an expensive operation.) 20 // 21 // However, there is a race condition in this approach. 22 // The race condition is triggered when ‘wakenUp‘ is set to 23 // true too early. 24 // 25 // ‘wakenUp‘ is set to true too early if: 26 // 1) Selector is waken up between ‘wakenUp.set(false)‘ and 27 // ‘selector.select(...)‘. (BAD) 28 // 2) Selector is waken up between ‘selector.select(...)‘ and 29 // ‘if (wakenUp.get()) { ... }‘. (OK) 30 // 31 // In the first case, ‘wakenUp‘ is set to true and the 32 // following ‘selector.select(...)‘ will wake up immediately. 33 // Until ‘wakenUp‘ is set to false again in the next round, 34 // ‘wakenUp.compareAndSet(false, true)‘ will fail, and therefore 35 // any attempt to wake up the Selector will fail, too, causing 36 // the following ‘selector.select(...)‘ call to block 37 // unnecessarily. 38 // 39 // To fix this problem, we wake up the selector again if wakenUp 40 // is true immediately after selector.select(...). 41 // It is inefficient in that it wakes up the selector for both 42 // the first case (BAD - wake-up required) and the second case 43 // (OK - no wake-up required). 44 45 if (wakenUp.get()) { 46 selector.wakeup(); 47 } 48 default: 49 // fallthrough 50 } 51 52 cancelledKeys = 0; 53 needsToSelectAgain = false; 54 final int ioRatio = this.ioRatio; 55 //根据比例来处理 IO 事件和任务 56 if (ioRatio == 100) { 57 try { 58 //处理就绪的 key 59 processSelectedKeys(); 60 } finally { 61 // Ensure we always run tasks. 62 //执行任务 63 runAllTasks(); 64 } 65 } else { 66 final long ioStartTime = System.nanoTime(); 67 try { 68 processSelectedKeys(); 69 } finally { 70 // Ensure we always run tasks. 71 // 计算出处理 IO 事件的时间,然后根据比例算出执行任务的时间 72 final long ioTime = System.nanoTime() - ioStartTime; 73 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); 74 } 75 } 76 } catch (Throwable t) { 77 handleLoopException(t); 78 } 79 // Always handle shutdown even if the loop processing threw an exception. 80 try { 81 if (isShuttingDown()) { 82 closeAll(); 83 if (confirmShutdown()) { 84 return; 85 } 86 } 87 } catch (Throwable t) { 88 handleLoopException(t); 89 } 90 } 91 }
run()方法主要是做 select 操作,和处理 IO 事件和任务队列中的任务,这部分内容下一篇文章再分析。从 executor 执行 execute()方法开始,由 Netyy 管理的线程就开始启动运行了。实际上此时的 NioServerSocketChannel 对象还没有注册到 Netty 线程的 Selector 上,Debug 结果如下图:
上图中的 startThread()方法实际上是给 executor 提交了一个任务,紧接着 main 线程就调用了 addTask()方法,将 task 添加到 EventLoop 对象的任务队列中,而这个 task 的内容就是执行注册操作。在添加了注册任务之后,Netty 线程就会在 select 完成后,执行队列中的任务,将 NioServerSocketChannel 注册到该线程的 Selector 上。接下来分析一下 AbstractChannel 的 register0()方法:
1 private void register0(ChannelPromise promise) { 2 try { 3 // check if the channel is still open as it could be closed in the mean time when the register 4 // call was outside of the eventLoop 5 if (!promise.setUncancellable() || !ensureOpen(promise)) { 6 return; 7 } 8 boolean firstRegistration = neverRegistered; 9 //注册通道 10 doRegister(); 11 neverRegistered = false; 12 registered = true; 13 14 // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the 15 // user may already fire events through the pipeline in the ChannelFutureListener. 16 //添加服务端 Channel 的 Handler 17 pipeline.invokeHandlerAddedIfNeeded(); 18 19 safeSetSuccess(promise); 20 //触发通道注册事件在 pipeline 上传播 21 pipeline.fireChannelRegistered(); 22 // Only fire a channelActive if the channel has never been registered. This prevents firing 23 // multiple channel actives if the channel is deregistered and re-registered. 24 if (isActive()) {//第一次运行到这儿时,结果为 false,因为此时还没有 bind 25 if (firstRegistration) { 26 pipeline.fireChannelActive(); 27 } else if (config().isAutoRead()) { 28 // This channel was registered before and autoRead() is set. This means we need to begin read 29 // again so that we process inbound data. 30 // 31 // See https://github.com/netty/netty/issues/4805 32 beginRead(); 33 } 34 } 35 } catch (Throwable t) { 36 // Close the channel directly to avoid FD leak. 37 closeForcibly(); 38 closeFuture.setClosed(); 39 safeSetFailure(promise, t); 40 } 41 }
doRegister()方法实际上就是 Java NIO 中将通道注册到 Selector 上的操作:
1 selectionKey = javaChannel().register(eventLoop().selector, 0, this);//这里感兴趣的事件传入的是 0
pipeline.invokeHandlerAddedIfNeeded() 和 pipeline.fireChannelRegistered() 则是用来添加 Handler 并触发 Handler 别添加的事件的动作。
在 isActive()这个方法,由于当前是 NioServerSocketChannel,所以实际上是判断当前通道是否成功绑定到一个地址,很显然到目前为止,只是创建了通道并注册到 Selector 上,还没由绑定。
3. 绑定指定端口
在 initAndRegister()方法结束后,main 线程开始调用 doBind0()方法,该方法将绑定操作封装成任务交给 Netty 线程去执行。最后,调用 DefaultPipeline 中的 HeadContext 的 bind()方法,然后通过 unsafe.bind(localAddress,promise)完成绑定:
1 @Override 2 public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { 3 //... 4 //显然这里返回的是 false 5 boolean wasActive = isActive(); 6 try { 7 //绑定操作 8 doBind(localAddress); 9 } catch (Throwable t) { 10 safeSetFailure(promise, t); 11 closeIfClosed(); 12 return; 13 } 14 15 if (!wasActive && isActive()) { 16 invokeLater(new Runnable() { 17 @Override 18 public void run() { 19 //这里才是触发服务端 Channel 激活事件的地方 20 pipeline.fireChannelActive(); 21 } 22 }); 23 } 24 25 safeSetSuccess(promise); 26 }
这个过程,建议 Debug 跟一下代码,比较清楚代码是如何一步一步到 HeadContext 中来的。接下来分析一下 doBind()方法:
1 @Override 2 protected void doBind(SocketAddress localAddress) throws Exception { 3 if (PlatformDependent.javaVersion() >= 7) { 4 javaChannel().bind(localAddress, config.getBacklog()); 5 } else { 6 javaChannel().socket().bind(localAddress, config.getBacklog()); 7 } 8 }
最终是根据平台及其 Java 版本来调用 JDK 中的绑定方法。在绑定完成后,会触发通道激活事件,在 HeadContext 中经过时,发现它里面有这么一行代码:
1 readIfIsAutoRead();
Debug 一下,发现这个方法最终会调用到 HeadContext 的 read()方法,该方法是调用了 unsafe.beginRead(),紧接着就到了 AbstractNioChannel 的 doBeginRead()方法:
1 @Override 2 protected void doBeginRead() throws Exception { 3 // Channel.read() or ChannelHandlerContext.read() was called 4 final SelectionKey selectionKey = this.selectionKey; 5 if (!selectionKey.isValid()) { 6 return; 7 } 8 9 readPending = true; 10 11 final int interestOps = selectionKey.interestOps(); 12 if ((interestOps & readInterestOp) == 0) {//说明对 OP_ACCEPT 不感兴趣 13 selectionKey.interestOps(interestOps | readInterestOp);//通过 | 修改感兴趣的事件 14 } 15 }
前面通过反射创建 NioServerSocketChannel 对象时,调用了父类也就是 AbstractNioChannel 的构造方法,将 readInterestOp 设置为 16 了,在 NIO 中就是 OP_ACCEPT。从此,该 NioServerSocketChannel 就可以接收客户端连接了。
4. 总结
在 Netty 服务端启动过程中,主线程仅仅是创建了 EventLoopGroup 和启动引导对象,然后发起绑定操作。这个过程中的绑定,注册等操作都是主线程封装成任务交给 Netty 线程去执行的。
由于 Netty 代码中抽象类和接口都比较多,所以某些地方调用的方法有很多种实现,不熟悉的时候可以通过 Debug 来确定。
原文地址:https://www.cnblogs.com/magexi/p/10228780.html