ServerBootstrap在bind时,主要做了3个操作:init、register、bind
init
1 void init(Channel channel) throws Exception { 2 final Map<ChannelOption<?>, Object> options = options0(); 3 synchronized (options) { 4 setChannelOptions(channel, options, logger); 5 } 6 7 final Map<AttributeKey<?>, Object> attrs = attrs0(); 8 synchronized (attrs) { 9 for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { 10 @SuppressWarnings("unchecked") 11 AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); 12 channel.attr(key).set(e.getValue()); 13 } 14 } 15 16 ChannelPipeline p = channel.pipeline(); 17 18 final EventLoopGroup currentChildGroup = childGroup; 19 final ChannelHandler currentChildHandler = childHandler; 20 final Entry<ChannelOption<?>, Object>[] currentChildOptions; 21 final Entry<AttributeKey<?>, Object>[] currentChildAttrs; 22 synchronized (childOptions) { 23 currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); 24 } 25 synchronized (childAttrs) { 26 currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); 27 } 28 //register后,ChannelInitializer的initChannel就会通过ChannelPipeline的fireChannelRegistered被调用, 29 //initChannel方法返回后,ChannelInitializer会被从ChannelPipeline中删除。 30 //pipeline.addLast(handler)会将我们设置的handler添加在pipeline中,如果我们添加的是一个ChannelInitializer, 31 //执行完这个initChannel后,ChannelPipeline中就会有ChannelInitializer、ServerBootstrapAcceptor两个Handler, 32 //registered事件继续在ChannelPipeline中传播,传至新添加的ChannelInitializer时,又会执行initChannel逻辑, 33 //我们通常会在initChannel方法中进行添加handler操作,假设添加了h1、h2, 34 //如果不通过execute执行添加ServerBootstrapAcceptor的操作(execute会将任务入队), 35 //最后ChannelPipeline中就会有ServerBootstrapAcceptor、h1、h2。 36 p.addLast(new ChannelInitializer<Channel>() { 37 @Override 38 public void initChannel(final Channel ch) throws Exception { 39 final ChannelPipeline pipeline = ch.pipeline(); 40 ChannelHandler handler = config.handler(); 41 if (handler != null) { 42 pipeline.addLast(handler); 43 } 44 45 ch.eventLoop().execute(new Runnable() { 46 @Override 47 public void run() { 48 //ServerBootstrapAcceptor 49 pipeline.addLast(new ServerBootstrapAcceptor( 50 ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); 51 } 52 }); 53 } 54 }); 55 }
ServerBootstrapAcceptor主要就是将channel注册到selector上
1 private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { 2 //...... 3 4 public void channelRead(ChannelHandlerContext ctx, Object msg) { 5 //io.netty.channel.Channel,该channel是对jdk的channel的包装, 6 //在serverSocket accept到socket后,socket就会被包装为io.netty.channel.Channel, 7 //然后通过pipeline.fireChannelRead传到这里,目前这一系列操作都是在boss线程中进行的。 8 final Channel child = (Channel) msg; 9 //将childHandler添加到accept的channel上 10 child.pipeline().addLast(childHandler); 11 12 setChannelOptions(child, childOptions, logger); 13 14 for (Entry<AttributeKey<?>, Object> e: childAttrs) { 15 child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); 16 } 17 18 try { 19 //注册Channel,注册后channel就绑定在了特定的EventLoop上, 20 //并且注册在与EventLoop对应的selector上,该channel之后的IO操作都是在EventLoop上进行的。 21 childGroup.register(child).addListener(new ChannelFutureListener() { 22 @Override 23 public void operationComplete(ChannelFuture future) throws Exception { 24 if (!future.isSuccess()) { 25 forceClose(child, future.cause()); 26 } 27 } 28 }); 29 } catch (Throwable t) { 30 forceClose(child, t); 31 } 32 } 33 34 //...... 35 }
register
register和bind都涉及到一个接口io.netty.channel.Channel.Unsafe
1 //Unsafe提供了提供了与真正的IO操作,比如从socket读数据、写数据,将channel注册到selector等。 2 interface Unsafe { 3 //...... 4 5 void bind(SocketAddress localAddress, ChannelPromise promise); 6 7 void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise); 8 9 void write(Object msg, ChannelPromise promise); 10 11 void flush(); 12 13 /** 14 * Schedules a read operation that fills the inbound buffer of the first {@link ChannelInboundHandler} in the 15 * {@link ChannelPipeline}. If there‘s already a pending read operation, this method does nothing. 16 */ 17 void beginRead(); 18 19 /** 20 * Register the {@link Channel} of the {@link ChannelPromise} and notify 21 * the {@link ChannelFuture} once the registration was complete. 22 */ 23 void register(EventLoop eventLoop, ChannelPromise promise); 24 25 //...... 26 }
1 //Unsafe实现的register方法,有删减 2 public final void register(EventLoop eventLoop, final ChannelPromise promise) { 3 //如果在worker线程中,则直接注册(即将channel注册在selector上) 4 if (eventLoop.inEventLoop()) { 5 register0(promise); 6 } else { 7 //如果不在worker线程中,则异步注册,如果worker线程未开启,execute会先开启线程再入队。 8 eventLoop.execute(new Runnable() { 9 @Override 10 public void run() { 11 register0(promise); 12 } 13 }); 14 } 15 }
bind
注册之后,就可以bind了,bind仍然会在eventLoop中执行,此后serverSocket就开始监听client请求了。
时间: 2024-10-25 19:16:43