一、先来看一下客户端示例代码。
1 public class NettyClientTest { 2 public void connect(int port, String host) throws Exception { 3 EventLoopGroup group = new NioEventLoopGroup();//与服务端不同,客户端只需要一个IO线程组 4 5 try { 6 Bootstrap b = new Bootstrap(); 7 b.group(group) 8 .option(ChannelOption.TCP_NODELAY, true)//禁用nagel算法 9 .channel(NioSocketChannel.class)//设置channel类型为NioSocketChannel 10 .handler(new ChannelInitializer<SocketChannel>() {//为channel设置初始化Handler 11 @Override 12 protected void initChannel(SocketChannel ch) throws Exception { 13 ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); 14 ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); 15 ch.pipeline().addLast(new StringDecoder()); 16 ch.pipeline().addLast(new EchoClientHandler()); 17 } 18 }); 19 ChannelFuture f = b.connect(host, port).sync();//等不等待连接结束 20 f.channel().closeFuture().sync();//同步等待关闭 21 }finally { 22 group.shutdownGracefully(); 23 } 24 } 25 public static void main(String[] args) throws Exception{ 26 int port = 8082; 27 new NettyClientTest().connect(port,"127.0.0.1"); 28 } 29 } 30 31 class EchoClientHandler extends ChannelInboundHandlerAdapter{ 32 private int count = 0; 33 static final String ECHO_REQ = "HI , MY NAME IS CHENYANG.$_"; 34 35 @Override 36 public void channelActive(ChannelHandlerContext ctx) throws Exception { 37 for(int i = 0;i < 10;i++){ 38 ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes())); 39 } 40 } 41 42 @Override 43 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 44 System.out.println("This is"+ ++count + "times receive server:[" + msg + "]"); 45 ctx.writeAndFlush(Unpooled.copiedBuffer("hehe.$_".getBytes())); 46 ctx.fireChannelRead(msg); 47 } 48 49 @Override 50 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 51 ctx.flush(); 52 } 53 54 @Override 55 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 56 cause.printStackTrace(); 57 ctx.close(); 58 } 59 }
二、启动过程分析
由于客户端Bootstrap的配置过程和服务端ServerBootstrap配置过程原理相类似,此处不再单独讲解客户端的配置过程。接下来直接看客户端的connect过程。
三、connect过程分析
ChannelFuture f = b.connect(host, port).sync();
connect代码如下:
1 /** 2 * Connect a {@link Channel} to the remote peer. 3 */ 4 public ChannelFuture connect(String inetHost, int inetPort) { 5 return connect(new InetSocketAddress(inetHost, inetPort)); 6 }
继续深入
1 /** 2 * Connect a {@link Channel} to the remote peer. 3 */ 4 public ChannelFuture connect(SocketAddress remoteAddress) { 5 if (remoteAddress == null) { 6 throw new NullPointerException("remoteAddress"); 7 } 8 9 validate(); 10 return doConnect(remoteAddress, localAddress()); 11 }
继续查看doConnect源码
1 /** 2 * @see {@link #connect()} 3 */ 4 private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { 5 final ChannelFuture regFuture = initAndRegister();//与服务端的类似,负责初始化和注册这个channel 6 final Channel channel = regFuture.channel();//获得创建的channel 7 if (regFuture.cause() != null) { 8 return regFuture; 9 } 10 11 final ChannelPromise promise = channel.newPromise(); 12 if (regFuture.isDone()) { 13 doConnect0(regFuture, channel, remoteAddress, localAddress, promise);//连接 14 } else { 15 regFuture.addListener(new ChannelFutureListener() { 16 @Override 17 public void operationComplete(ChannelFuture future) throws Exception { 18 doConnect0(regFuture, channel, remoteAddress, localAddress, promise); 19 } 20 }); 21 } 22 23 return promise; 24 }
看一下initAndRegister代码
1 final ChannelFuture initAndRegister() { 2 final Channel channel = channelFactory().newChannel();//调用之前设置的channel工厂,创建channel,此处就是NioSocketChannel 3 try { 4 init(channel);//初始化这个channel,这个针对客户端和服务端是不同的 5 } catch (Throwable t) { 6 channel.unsafe().closeForcibly(); 7 // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor 8 return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); 9 } 10 11 ChannelFuture regFuture = group().register(channel);//向NioEventLoopGroup中注册这个channel 12 if (regFuture.cause() != null) { 13 if (channel.isRegistered()) { 14 channel.close(); 15 } else { 16 channel.unsafe().closeForcibly(); 17 } 18 } 19 20 // If we are here and the promise is not failed, it‘s one of the following cases: 21 // 1) If we attempted registration from the event loop, the registration has been completed at this point. 22 // i.e. It‘s safe to attempt bind() or connect() now because the channel has been registered. 23 // 2) If we attempted registration from the other thread, the registration request has been successfully 24 // added to the event loop‘s task queue for later execution. 25 // i.e. It‘s safe to attempt bind() or connect() now: 26 // because bind() or connect() will be executed *after* the scheduled registration task is executed 27 // because register(), bind(), and connect() are all bound to the same thread. 28 29 return regFuture; 30 }
首先看一下针对客户端的init代码。
1 @Override 2 @SuppressWarnings("unchecked") 3 void init(Channel channel) throws Exception { 4 ChannelPipeline p = channel.pipeline(); 5 p.addLast(handler());//设置用户添加的handler,也就是初始化的handler 6 7 final Map<ChannelOption<?>, Object> options = options(); 8 synchronized (options) { 9 for (Entry<ChannelOption<?>, Object> e: options.entrySet()) { 10 try { 11 if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {//设置channel的配置选项 12 logger.warn("Unknown channel option: " + e); 13 } 14 } catch (Throwable t) { 15 logger.warn("Failed to set a channel option: " + channel, t); 16 } 17 } 18 } 19 20 final Map<AttributeKey<?>, Object> attrs = attrs(); 21 synchronized (attrs) { 22 for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { 23 channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());//设置channel的属性 24 } 25 } 26 }
接下来看register过程,这个和服务端是一样的。(ChannelFuture regFuture = group().register(channel);)
1 @Override 2 public ChannelFuture register(Channel channel) { 3 return next().register(channel);//next()会在Group中选出下一个NioEventLoop 4 }
1 @Override 2 public ChannelFuture register(Channel channel) { 3 return register(channel, new DefaultChannelPromise(channel, this)); 4 }
1 @Override 2 public ChannelFuture register(final Channel channel, final ChannelPromise promise) { 3 if (channel == null) { 4 throw new NullPointerException("channel"); 5 } 6 if (promise == null) { 7 throw new NullPointerException("promise"); 8 } 9 10 channel.unsafe().register(this, promise);//unsafe中执行真正的注册操作 11 return promise; 12 }
1 @Override 2 public final void register(EventLoop eventLoop, final ChannelPromise promise) { 3 if (eventLoop == null) { 4 throw new NullPointerException("eventLoop"); 5 } 6 if (isRegistered()) { 7 promise.setFailure(new IllegalStateException("registered to an event loop already")); 8 return; 9 } 10 if (!isCompatible(eventLoop)) { 11 promise.setFailure( 12 new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); 13 return; 14 } 15 16 AbstractChannel.this.eventLoop = eventLoop;//设置该channel绑定的eventloop 17 18 if (eventLoop.inEventLoop()) {//必须保证在eventloop线程中执行 19 register0(promise);//注册 20 } else { 21 try { 22 eventLoop.execute(new OneTimeTask() { 23 @Override 24 public void run() { 25 register0(promise); 26 } 27 }); 28 } catch (Throwable t) { 29 logger.warn( 30 "Force-closing a channel whose registration task was not accepted by an event loop: {}", 31 AbstractChannel.this, t); 32 closeForcibly(); 33 closeFuture.setClosed(); 34 safeSetFailure(promise, t); 35 } 36 } 37 }
继续看register0代码
1 private void register0(ChannelPromise promise) { 2 try { 3 // check if the channel is still open as it could be closed in the mean time when the register 4 // call was outside of the eventLoop 5 if (!promise.setUncancellable() || !ensureOpen(promise)) { 6 return; 7 } 8 boolean firstRegistration = neverRegistered; 9 doRegister();//在selector上注册 10 neverRegistered = false; 11 registered = true;//设置已经注册标识 12 safeSetSuccess(promise);//设置注册成功 13 pipeline.fireChannelRegistered();//引发channelRegistered事件,这会导致初始化Handler的channelRegistered被调用 14 // Only fire a channelActive if the channel has never been registered. This prevents firing 15 // multiple channel actives if the channel is deregistered and re-registered. 16 if (firstRegistration && isActive()) {//如果channel可用,针对客户端,也就是connect成功 17 pipeline.fireChannelActive();//引发channelActive事件,最终注册read事件 18 } 19 } catch (Throwable t) { 20 // Close the channel directly to avoid FD leak. 21 closeForcibly(); 22 closeFuture.setClosed(); 23 safeSetFailure(promise, t); 24 } 25 }
看doRegister代码
1 @Override 2 protected void doRegister() throws Exception { 3 boolean selected = false; 4 for (;;) { 5 try { 6 selectionKey = javaChannel().register(eventLoop().selector, 0, this);//注意,这里注册的op为0,不会监听任何事件 7 return; 8 } catch (CancelledKeyException e) { 9 if (!selected) { 10 // Force the Selector to select now as the "canceled" SelectionKey may still be 11 // cached and not removed because no Select.select(..) operation was called yet. 12 eventLoop().selectNow(); 13 selected = true; 14 } else { 15 // We forced a select operation on the selector before but the SelectionKey is still cached 16 // for whatever reason. JDK bug ? 17 throw e; 18 } 19 } 20 } 21 }
initAndRegister执行完成之后,继续看doConnect0代码
1 private static void doConnect0( 2 final ChannelFuture regFuture, final Channel channel, 3 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { 4 5 // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up 6 // the pipeline in its channelRegistered() implementation. 7 channel.eventLoop().execute(new Runnable() {//接下来的代码实在eventloop中执行,而不是用户线程 8 @Override 9 public void run() { 10 if (regFuture.isSuccess()) { 11 if (localAddress == null) { 12 channel.connect(remoteAddress, promise);//执行connect 13 } else { 14 channel.connect(remoteAddress, localAddress, promise); 15 } 16 promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); 17 } else { 18 promise.setFailure(regFuture.cause()); 19 } 20 } 21 }); 22 }
继续看connect代码,简单的调用了pipeline.connect
1 @Override 2 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { 3 return pipeline.connect(remoteAddress, promise); 4 }
从tail开始
1 @Override 2 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { 3 return tail.connect(remoteAddress, promise); 4 }
最终会调用到head.connect()
1 @Override 2 public void connect( 3 ChannelHandlerContext ctx, 4 SocketAddress remoteAddress, SocketAddress localAddress, 5 ChannelPromise promise) throws Exception { 6 unsafe.connect(remoteAddress, localAddress, promise); 7 }
1 @Override 2 public final void connect( 3 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { 4 if (!promise.setUncancellable() || !ensureOpen(promise)) { 5 return; 6 } 7 8 try { 9 if (connectPromise != null) { 10 throw new IllegalStateException("connection attempt already made"); 11 } 12 13 boolean wasActive = isActive(); 14 if (doConnect(remoteAddress, localAddress)) { 15 fulfillConnectPromise(promise, wasActive);//设置promise 16 } else { 17 connectPromise = promise; 18 requestedRemoteAddress = remoteAddress; 19 20 // Schedule connect timeout. 21 int connectTimeoutMillis = config().getConnectTimeoutMillis();//支持连接超时机制 22 if (connectTimeoutMillis > 0) { 23 connectTimeoutFuture = eventLoop().schedule(new OneTimeTask() { 24 @Override 25 public void run() { 26 ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; 27 ConnectTimeoutException cause = 28 new ConnectTimeoutException("connection timed out: " + remoteAddress); 29 if (connectPromise != null && connectPromise.tryFailure(cause)) { 30 close(voidPromise()); 31 } 32 } 33 }, connectTimeoutMillis, TimeUnit.MILLISECONDS); 34 } 35 36 promise.addListener(new ChannelFutureListener() { 37 @Override 38 public void operationComplete(ChannelFuture future) throws Exception { 39 if (future.isCancelled()) { 40 if (connectTimeoutFuture != null) { 41 connectTimeoutFuture.cancel(false); 42 } 43 connectPromise = null; 44 close(voidPromise()); 45 } 46 } 47 }); 48 } 49 } catch (Throwable t) { 50 promise.tryFailure(annotateConnectException(t, remoteAddress)); 51 closeIfClosed(); 52 } 53 }
客户端的isActive()
1 @Override 2 public boolean isActive() { 3 SocketChannel ch = javaChannel(); 4 return ch.isOpen() && ch.isConnected(); 5 }
服务端的isActive()
1 @Override 2 public boolean isActive() { 3 return javaChannel().socket().isBound(); 4 }
看一下doConnect代码
1 @Override 2 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { 3 if (localAddress != null) { 4 javaChannel().socket().bind(localAddress); 5 } 6 7 boolean success = false; 8 try { 9 boolean connected = javaChannel().connect(remoteAddress);//执行真正的异步connect 10 if (!connected) { 11 selectionKey().interestOps(SelectionKey.OP_CONNECT);//如果没有注册成功,就注册OP_CONNECT事件 12 } 13 success = true; 14 return connected; 15 } finally { 16 if (!success) { 17 doClose(); 18 } 19 } 20 }
1 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { 2 if (promise == null) { 3 // Closed via cancellation and the promise has been notified already. 4 return; 5 } 6 7 // trySuccess() will return false if a user cancelled the connection attempt. 8 boolean promiseSet = promise.trySuccess(); 9 10 // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, 11 // because what happened is what happened. 12 if (!wasActive && isActive()) {//如果connect成功 13 pipeline().fireChannelActive();//最终会注册read事件,细节如下 14 } 15 16 // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). 17 if (!promiseSet) { 18 close(voidPromise()); 19 } 20 }
1 @Override 2 public ChannelPipeline fireChannelActive() { 3 head.fireChannelActive(); 4 5 if (channel.config().isAutoRead()) { 6 channel.read();//pipeline.read()-->tail.read()-->****-->head.read()-->unsafe.beginRead()-->doBeginRead()-->real操作 7 } 8 9 return this; 10 }
四、看一下如何获取异步连接结果的
在NioEventLoop的循环中,可以看到如下代码:
1 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { 2 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking 3 // See https://github.com/netty/netty/issues/924 4 int ops = k.interestOps(); 5 ops &= ~SelectionKey.OP_CONNECT; 6 k.interestOps(ops); 7 8 unsafe.finishConnect(); 9 }
当发生OP_CONNECT事件时,最终会调用unsafe.finishConnect,代码如下
1 @Override 2 public final void finishConnect() { 3 // Note this method is invoked by the event loop only if the connection attempt was 4 // neither cancelled nor timed out. 5 6 assert eventLoop().inEventLoop();//确保该操作是在eventLoop线程中的 7 8 try { 9 boolean wasActive = isActive(); 10 doFinishConnect(); 11 fulfillConnectPromise(connectPromise, wasActive); 12 } catch (Throwable t) { 13 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); 14 } finally { 15 // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used 16 // See https://github.com/netty/netty/issues/1770 17 if (connectTimeoutFuture != null) { 18 connectTimeoutFuture.cancel(false); 19 } 20 connectPromise = null; 21 } 22 }
1 @Override 2 protected void doFinishConnect() throws Exception { 3 if (!javaChannel().finishConnect()) {//判断JDK的SocketChannel连接结果,返回true表示连接成功 4 throw new Error(); 5 } 6 }
判断JDK的SocketChannel连接结果,返回true表示连接成功
1 public boolean finishConnect() throws IOException { 2 Object var1 = this.readLock; 3 synchronized(this.readLock) { 4 Object var2 = this.writeLock; 5 synchronized(this.writeLock) { 6 Object var3 = this.stateLock; 7 boolean var10000; 8 synchronized(this.stateLock) { 9 if(!this.isOpen()) { 10 throw new ClosedChannelException(); 11 } 12 13 if(this.state == 2) { 14 var10000 = true; 15 return var10000; 16 } 17 18 if(this.state != 1) { 19 throw new NoConnectionPendingException(); 20 } 21 } 22 23 int var41 = 0; 24 25 Object var4; 26 try { 27 label525: { 28 boolean var29 = false; 29 30 boolean var6; 31 label506: { 32 try { 33 var29 = true; 34 this.begin(); 35 synchronized(this.blockingLock()) { 36 label480: { 37 label494: { 38 Object var5 = this.stateLock; 39 synchronized(this.stateLock) { 40 if(!this.isOpen()) { 41 var6 = false; 42 break label494; 43 } 44 45 this.readerThread = NativeThread.current(); 46 } 47 48 if(!this.isBlocking()) { 49 do { 50 var41 = checkConnect(this.fd, false, this.readyToConnect); 51 } while(var41 == -3 && this.isOpen()); 52 } else { 53 do { 54 while(true) { 55 var41 = checkConnect(this.fd, true, this.readyToConnect); 56 if(var41 == 0) { 57 continue; 58 } 59 break; 60 } 61 } while(var41 == -3 && this.isOpen()); 62 } 63 64 var29 = false; 65 break label480; 66 } 67 68 var29 = false; 69 break label506; 70 } 71 } 72 } finally { 73 if(var29) { 74 Object var13 = this.stateLock; 75 synchronized(this.stateLock) { 76 this.readerThread = 0L; 77 if(this.state == 3) { 78 this.kill(); 79 var41 = 0; 80 } 81 } 82 83 this.end(var41 > 0 || var41 == -2); 84 85 assert IOStatus.check(var41); 86 87 } 88 } 89 90 var4 = this.stateLock; 91 synchronized(this.stateLock) { 92 this.readerThread = 0L; 93 if(this.state == 3) { 94 this.kill(); 95 var41 = 0; 96 } 97 } 98 99 this.end(var41 > 0 || var41 == -2); 100 101 assert IOStatus.check(var41); 102 break label525; 103 } 104 105 Object var7 = this.stateLock; 106 synchronized(this.stateLock) { 107 this.readerThread = 0L; 108 if(this.state == 3) { 109 this.kill(); 110 var41 = 0; 111 } 112 } 113 114 this.end(var41 > 0 || var41 == -2); 115 116 assert IOStatus.check(var41); 117 118 return var6; 119 } 120 } catch (IOException var38) { 121 this.close(); 122 throw var38; 123 } 124 125 if(var41 > 0) { 126 var4 = this.stateLock; 127 synchronized(this.stateLock) { 128 this.state = 2; 129 if(this.isOpen()) { 130 this.localAddress = Net.localAddress(this.fd); 131 } 132 } 133 134 var10000 = true; 135 return var10000; 136 } else { 137 var10000 = false; 138 return var10000; 139 } 140 } 141 } 142 }
fulfillConnectPromise会出发链接激活事件
1 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { 2 if (promise == null) { 3 // Closed via cancellation and the promise has been notified already. 4 return; 5 } 6 7 // trySuccess() will return false if a user cancelled the connection attempt. 8 boolean promiseSet = promise.trySuccess(); 9 10 // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, 11 // because what happened is what happened. 12 if (!wasActive && isActive()) { 13 pipeline().fireChannelActive();//参照前面的说明 14 } 15 16 // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). 17 if (!promiseSet) { 18 close(voidPromise()); 19 } 20 }
时间: 2024-11-08 21:08:21