Netty源码分析之客户端启动过程

一、先来看一下客户端示例代码。

 1 public class NettyClientTest {
 2     public void connect(int port, String host) throws Exception {
 3         EventLoopGroup group = new NioEventLoopGroup();//与服务端不同,客户端只需要一个IO线程组
 4
 5         try {
 6             Bootstrap b = new Bootstrap();
 7             b.group(group)
 8                     .option(ChannelOption.TCP_NODELAY, true)//禁用nagel算法
 9                     .channel(NioSocketChannel.class)//设置channel类型为NioSocketChannel
10                     .handler(new ChannelInitializer<SocketChannel>() {//为channel设置初始化Handler
11                         @Override
12                         protected void initChannel(SocketChannel ch) throws Exception {
13                             ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
14                             ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
15                             ch.pipeline().addLast(new StringDecoder());
16                             ch.pipeline().addLast(new EchoClientHandler());
17                         }
18                     });
19             ChannelFuture f = b.connect(host, port).sync();//等不等待连接结束
20             f.channel().closeFuture().sync();//同步等待关闭
21         }finally {
22              group.shutdownGracefully();
23         }
24     }
25    public static void main(String[] args) throws Exception{
26        int port = 8082;
27        new NettyClientTest().connect(port,"127.0.0.1");
28    }
29 }
30
31 class EchoClientHandler extends ChannelInboundHandlerAdapter{
32     private int count = 0;
33     static final String ECHO_REQ = "HI , MY NAME IS CHENYANG.$_";
34
35     @Override
36     public void channelActive(ChannelHandlerContext ctx) throws Exception {
37         for(int i = 0;i < 10;i++){
38             ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
39         }
40     }
41
42     @Override
43     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
44         System.out.println("This is"+ ++count + "times receive server:[" + msg + "]");
45         ctx.writeAndFlush(Unpooled.copiedBuffer("hehe.$_".getBytes()));
46         ctx.fireChannelRead(msg);
47     }
48
49     @Override
50     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
51         ctx.flush();
52     }
53
54     @Override
55     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
56         cause.printStackTrace();
57         ctx.close();
58     }
59 }

二、启动过程分析

由于客户端Bootstrap的配置过程和服务端ServerBootstrap配置过程原理相类似,此处不再单独讲解客户端的配置过程。接下来直接看客户端的connect过程。

三、connect过程分析

ChannelFuture f = b.connect(host, port).sync();

connect代码如下:

1    /**
2      * Connect a {@link Channel} to the remote peer.
3      */
4     public ChannelFuture connect(String inetHost, int inetPort) {
5         return connect(new InetSocketAddress(inetHost, inetPort));
6     }

继续深入

 1   /**
 2      * Connect a {@link Channel} to the remote peer.
 3      */
 4     public ChannelFuture connect(SocketAddress remoteAddress) {
 5         if (remoteAddress == null) {
 6             throw new NullPointerException("remoteAddress");
 7         }
 8
 9         validate();
10         return doConnect(remoteAddress, localAddress());
11     }

继续查看doConnect源码

 1  /**
 2      * @see {@link #connect()}
 3      */
 4     private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
 5         final ChannelFuture regFuture = initAndRegister();//与服务端的类似,负责初始化和注册这个channel
 6         final Channel channel = regFuture.channel();//获得创建的channel
 7         if (regFuture.cause() != null) {
 8             return regFuture;
 9         }
10
11         final ChannelPromise promise = channel.newPromise();
12         if (regFuture.isDone()) {
13             doConnect0(regFuture, channel, remoteAddress, localAddress, promise);//连接
14         } else {
15             regFuture.addListener(new ChannelFutureListener() {
16                 @Override
17                 public void operationComplete(ChannelFuture future) throws Exception {
18                     doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
19                 }
20             });
21         }
22
23         return promise;
24     }

看一下initAndRegister代码

 1  final ChannelFuture initAndRegister() {
 2         final Channel channel = channelFactory().newChannel();//调用之前设置的channel工厂,创建channel,此处就是NioSocketChannel
 3         try {
 4             init(channel);//初始化这个channel,这个针对客户端和服务端是不同的
 5         } catch (Throwable t) {
 6             channel.unsafe().closeForcibly();
 7             // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
 8             return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
 9         }
10
11         ChannelFuture regFuture = group().register(channel);//向NioEventLoopGroup中注册这个channel
12         if (regFuture.cause() != null) {
13             if (channel.isRegistered()) {
14                 channel.close();
15             } else {
16                 channel.unsafe().closeForcibly();
17             }
18         }
19
20         // If we are here and the promise is not failed, it‘s one of the following cases:
21         // 1) If we attempted registration from the event loop, the registration has been completed at this point.
22         //    i.e. It‘s safe to attempt bind() or connect() now because the channel has been registered.
23         // 2) If we attempted registration from the other thread, the registration request has been successfully
24         //    added to the event loop‘s task queue for later execution.
25         //    i.e. It‘s safe to attempt bind() or connect() now:
26         //         because bind() or connect() will be executed *after* the scheduled registration task is executed
27         //         because register(), bind(), and connect() are all bound to the same thread.
28
29         return regFuture;
30     }

首先看一下针对客户端的init代码。

 1     @Override
 2     @SuppressWarnings("unchecked")
 3     void init(Channel channel) throws Exception {
 4         ChannelPipeline p = channel.pipeline();
 5         p.addLast(handler());//设置用户添加的handler,也就是初始化的handler
 6
 7         final Map<ChannelOption<?>, Object> options = options();
 8         synchronized (options) {
 9             for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
10                 try {
11                     if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {//设置channel的配置选项
12                         logger.warn("Unknown channel option: " + e);
13                     }
14                 } catch (Throwable t) {
15                     logger.warn("Failed to set a channel option: " + channel, t);
16                 }
17             }
18         }
19
20         final Map<AttributeKey<?>, Object> attrs = attrs();
21         synchronized (attrs) {
22             for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
23                 channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());//设置channel的属性
24             }
25         }
26     }

接下来看register过程,这个和服务端是一样的。(ChannelFuture regFuture = group().register(channel);)

1   @Override
2     public ChannelFuture register(Channel channel) {
3         return next().register(channel);//next()会在Group中选出下一个NioEventLoop
4     }
1     @Override
2     public ChannelFuture register(Channel channel) {
3         return register(channel, new DefaultChannelPromise(channel, this));
4     }
 1    @Override
 2     public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
 3         if (channel == null) {
 4             throw new NullPointerException("channel");
 5         }
 6         if (promise == null) {
 7             throw new NullPointerException("promise");
 8         }
 9
10         channel.unsafe().register(this, promise);//unsafe中执行真正的注册操作
11         return promise;
12     }
 1 @Override
 2         public final void register(EventLoop eventLoop, final ChannelPromise promise) {
 3             if (eventLoop == null) {
 4                 throw new NullPointerException("eventLoop");
 5             }
 6             if (isRegistered()) {
 7                 promise.setFailure(new IllegalStateException("registered to an event loop already"));
 8                 return;
 9             }
10             if (!isCompatible(eventLoop)) {
11                 promise.setFailure(
12                         new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
13                 return;
14             }
15
16             AbstractChannel.this.eventLoop = eventLoop;//设置该channel绑定的eventloop
17
18             if (eventLoop.inEventLoop()) {//必须保证在eventloop线程中执行
19                 register0(promise);//注册
20             } else {
21                 try {
22                     eventLoop.execute(new OneTimeTask() {
23                         @Override
24                         public void run() {
25                             register0(promise);
26                         }
27                     });
28                 } catch (Throwable t) {
29                     logger.warn(
30                             "Force-closing a channel whose registration task was not accepted by an event loop: {}",
31                             AbstractChannel.this, t);
32                     closeForcibly();
33                     closeFuture.setClosed();
34                     safeSetFailure(promise, t);
35                 }
36             }
37         }

继续看register0代码

 1 private void register0(ChannelPromise promise) {
 2             try {
 3                 // check if the channel is still open as it could be closed in the mean time when the register
 4                 // call was outside of the eventLoop
 5                 if (!promise.setUncancellable() || !ensureOpen(promise)) {
 6                     return;
 7                 }
 8                 boolean firstRegistration = neverRegistered;
 9                 doRegister();//在selector上注册
10                 neverRegistered = false;
11                 registered = true;//设置已经注册标识
12                 safeSetSuccess(promise);//设置注册成功
13                 pipeline.fireChannelRegistered();//引发channelRegistered事件,这会导致初始化Handler的channelRegistered被调用
14                 // Only fire a channelActive if the channel has never been registered. This prevents firing
15                 // multiple channel actives if the channel is deregistered and re-registered.
16                 if (firstRegistration && isActive()) {//如果channel可用,针对客户端,也就是connect成功
17                     pipeline.fireChannelActive();//引发channelActive事件,最终注册read事件
18                 }
19             } catch (Throwable t) {
20                 // Close the channel directly to avoid FD leak.
21                 closeForcibly();
22                 closeFuture.setClosed();
23                 safeSetFailure(promise, t);
24             }
25         }

看doRegister代码

 1 @Override
 2     protected void doRegister() throws Exception {
 3         boolean selected = false;
 4         for (;;) {
 5             try {
 6                 selectionKey = javaChannel().register(eventLoop().selector, 0, this);//注意,这里注册的op为0,不会监听任何事件
 7                 return;
 8             } catch (CancelledKeyException e) {
 9                 if (!selected) {
10                     // Force the Selector to select now as the "canceled" SelectionKey may still be
11                     // cached and not removed because no Select.select(..) operation was called yet.
12                     eventLoop().selectNow();
13                     selected = true;
14                 } else {
15                     // We forced a select operation on the selector before but the SelectionKey is still cached
16                     // for whatever reason. JDK bug ?
17                     throw e;
18                 }
19             }
20         }
21     }

initAndRegister执行完成之后,继续看doConnect0代码

 1  private static void doConnect0(
 2             final ChannelFuture regFuture, final Channel channel,
 3             final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
 4
 5         // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
 6         // the pipeline in its channelRegistered() implementation.
 7         channel.eventLoop().execute(new Runnable() {//接下来的代码实在eventloop中执行,而不是用户线程
 8             @Override
 9             public void run() {
10                 if (regFuture.isSuccess()) {
11                     if (localAddress == null) {
12                         channel.connect(remoteAddress, promise);//执行connect
13                     } else {
14                         channel.connect(remoteAddress, localAddress, promise);
15                     }
16                     promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
17                 } else {
18                     promise.setFailure(regFuture.cause());
19                 }
20             }
21         });
22     }

继续看connect代码,简单的调用了pipeline.connect

1    @Override
2     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
3         return pipeline.connect(remoteAddress, promise);
4     }

从tail开始

1    @Override
2     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
3         return tail.connect(remoteAddress, promise);
4     }

最终会调用到head.connect()

1   @Override
2         public void connect(
3                 ChannelHandlerContext ctx,
4                 SocketAddress remoteAddress, SocketAddress localAddress,
5                 ChannelPromise promise) throws Exception {
6             unsafe.connect(remoteAddress, localAddress, promise);
7         }
 1    @Override
 2         public final void connect(
 3                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
 4             if (!promise.setUncancellable() || !ensureOpen(promise)) {
 5                 return;
 6             }
 7
 8             try {
 9                 if (connectPromise != null) {
10                     throw new IllegalStateException("connection attempt already made");
11                 }
12
13                 boolean wasActive = isActive();
14                 if (doConnect(remoteAddress, localAddress)) {
15                     fulfillConnectPromise(promise, wasActive);//设置promise
16                 } else {
17                     connectPromise = promise;
18                     requestedRemoteAddress = remoteAddress;
19
20                     // Schedule connect timeout.
21                     int connectTimeoutMillis = config().getConnectTimeoutMillis();//支持连接超时机制
22                     if (connectTimeoutMillis > 0) {
23                         connectTimeoutFuture = eventLoop().schedule(new OneTimeTask() {
24                             @Override
25                             public void run() {
26                                 ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
27                                 ConnectTimeoutException cause =
28                                         new ConnectTimeoutException("connection timed out: " + remoteAddress);
29                                 if (connectPromise != null && connectPromise.tryFailure(cause)) {
30                                     close(voidPromise());
31                                 }
32                             }
33                         }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
34                     }
35
36                     promise.addListener(new ChannelFutureListener() {
37                         @Override
38                         public void operationComplete(ChannelFuture future) throws Exception {
39                             if (future.isCancelled()) {
40                                 if (connectTimeoutFuture != null) {
41                                     connectTimeoutFuture.cancel(false);
42                                 }
43                                 connectPromise = null;
44                                 close(voidPromise());
45                             }
46                         }
47                     });
48                 }
49             } catch (Throwable t) {
50                 promise.tryFailure(annotateConnectException(t, remoteAddress));
51                 closeIfClosed();
52             }
53         }

客户端的isActive()

1     @Override
2     public boolean isActive() {
3         SocketChannel ch = javaChannel();
4         return ch.isOpen() && ch.isConnected();
5     }

服务端的isActive()

1   @Override
2     public boolean isActive() {
3         return javaChannel().socket().isBound();
4     }

看一下doConnect代码

 1 @Override
 2     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
 3         if (localAddress != null) {
 4             javaChannel().socket().bind(localAddress);
 5         }
 6
 7         boolean success = false;
 8         try {
 9             boolean connected = javaChannel().connect(remoteAddress);//执行真正的异步connect
10             if (!connected) {
11                 selectionKey().interestOps(SelectionKey.OP_CONNECT);//如果没有注册成功,就注册OP_CONNECT事件
12             }
13             success = true;
14             return connected;
15         } finally {
16             if (!success) {
17                 doClose();
18             }
19         }
20     }
 1 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
 2             if (promise == null) {
 3                 // Closed via cancellation and the promise has been notified already.
 4                 return;
 5             }
 6
 7             // trySuccess() will return false if a user cancelled the connection attempt.
 8             boolean promiseSet = promise.trySuccess();
 9
10             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
11             // because what happened is what happened.
12             if (!wasActive && isActive()) {//如果connect成功
13                 pipeline().fireChannelActive();//最终会注册read事件,细节如下
14             }
15
16             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
17             if (!promiseSet) {
18                 close(voidPromise());
19             }
20         }
 1   @Override
 2     public ChannelPipeline fireChannelActive() {
 3         head.fireChannelActive();
 4
 5         if (channel.config().isAutoRead()) {
 6             channel.read();//pipeline.read()-->tail.read()-->****-->head.read()-->unsafe.beginRead()-->doBeginRead()-->real操作
 7         }
 8
 9         return this;
10     }

四、看一下如何获取异步连接结果的

在NioEventLoop的循环中,可以看到如下代码:

1  if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
2                 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
3                 // See https://github.com/netty/netty/issues/924
4                 int ops = k.interestOps();
5                 ops &= ~SelectionKey.OP_CONNECT;
6                 k.interestOps(ops);
7
8                 unsafe.finishConnect();
9             }

当发生OP_CONNECT事件时,最终会调用unsafe.finishConnect,代码如下

 1 @Override
 2         public final void finishConnect() {
 3             // Note this method is invoked by the event loop only if the connection attempt was
 4             // neither cancelled nor timed out.
 5
 6             assert eventLoop().inEventLoop();//确保该操作是在eventLoop线程中的
 7
 8             try {
 9                 boolean wasActive = isActive();
10                 doFinishConnect();
11                 fulfillConnectPromise(connectPromise, wasActive);
12             } catch (Throwable t) {
13                 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
14             } finally {
15                 // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
16                 // See https://github.com/netty/netty/issues/1770
17                 if (connectTimeoutFuture != null) {
18                     connectTimeoutFuture.cancel(false);
19                 }
20                 connectPromise = null;
21             }
22         }
1   @Override
2     protected void doFinishConnect() throws Exception {
3         if (!javaChannel().finishConnect()) {//判断JDK的SocketChannel连接结果,返回true表示连接成功
4             throw new Error();
5         }
6     }

判断JDK的SocketChannel连接结果,返回true表示连接成功

  1  public boolean finishConnect() throws IOException {
  2         Object var1 = this.readLock;
  3         synchronized(this.readLock) {
  4             Object var2 = this.writeLock;
  5             synchronized(this.writeLock) {
  6                 Object var3 = this.stateLock;
  7                 boolean var10000;
  8                 synchronized(this.stateLock) {
  9                     if(!this.isOpen()) {
 10                         throw new ClosedChannelException();
 11                     }
 12
 13                     if(this.state == 2) {
 14                         var10000 = true;
 15                         return var10000;
 16                     }
 17
 18                     if(this.state != 1) {
 19                         throw new NoConnectionPendingException();
 20                     }
 21                 }
 22
 23                 int var41 = 0;
 24
 25                 Object var4;
 26                 try {
 27                     label525: {
 28                         boolean var29 = false;
 29
 30                         boolean var6;
 31                         label506: {
 32                             try {
 33                                 var29 = true;
 34                                 this.begin();
 35                                 synchronized(this.blockingLock()) {
 36                                     label480: {
 37                                         label494: {
 38                                             Object var5 = this.stateLock;
 39                                             synchronized(this.stateLock) {
 40                                                 if(!this.isOpen()) {
 41                                                     var6 = false;
 42                                                     break label494;
 43                                                 }
 44
 45                                                 this.readerThread = NativeThread.current();
 46                                             }
 47
 48                                             if(!this.isBlocking()) {
 49                                                 do {
 50                                                     var41 = checkConnect(this.fd, false, this.readyToConnect);
 51                                                 } while(var41 == -3 && this.isOpen());
 52                                             } else {
 53                                                 do {
 54                                                     while(true) {
 55                                                         var41 = checkConnect(this.fd, true, this.readyToConnect);
 56                                                         if(var41 == 0) {
 57                                                             continue;
 58                                                         }
 59                                                         break;
 60                                                     }
 61                                                 } while(var41 == -3 && this.isOpen());
 62                                             }
 63
 64                                             var29 = false;
 65                                             break label480;
 66                                         }
 67
 68                                         var29 = false;
 69                                         break label506;
 70                                     }
 71                                 }
 72                             } finally {
 73                                 if(var29) {
 74                                     Object var13 = this.stateLock;
 75                                     synchronized(this.stateLock) {
 76                                         this.readerThread = 0L;
 77                                         if(this.state == 3) {
 78                                             this.kill();
 79                                             var41 = 0;
 80                                         }
 81                                     }
 82
 83                                     this.end(var41 > 0 || var41 == -2);
 84
 85                                     assert IOStatus.check(var41);
 86
 87                                 }
 88                             }
 89
 90                             var4 = this.stateLock;
 91                             synchronized(this.stateLock) {
 92                                 this.readerThread = 0L;
 93                                 if(this.state == 3) {
 94                                     this.kill();
 95                                     var41 = 0;
 96                                 }
 97                             }
 98
 99                             this.end(var41 > 0 || var41 == -2);
100
101                             assert IOStatus.check(var41);
102                             break label525;
103                         }
104
105                         Object var7 = this.stateLock;
106                         synchronized(this.stateLock) {
107                             this.readerThread = 0L;
108                             if(this.state == 3) {
109                                 this.kill();
110                                 var41 = 0;
111                             }
112                         }
113
114                         this.end(var41 > 0 || var41 == -2);
115
116                         assert IOStatus.check(var41);
117
118                         return var6;
119                     }
120                 } catch (IOException var38) {
121                     this.close();
122                     throw var38;
123                 }
124
125                 if(var41 > 0) {
126                     var4 = this.stateLock;
127                     synchronized(this.stateLock) {
128                         this.state = 2;
129                         if(this.isOpen()) {
130                             this.localAddress = Net.localAddress(this.fd);
131                         }
132                     }
133
134                     var10000 = true;
135                     return var10000;
136                 } else {
137                     var10000 = false;
138                     return var10000;
139                 }
140             }
141         }
142     }

fulfillConnectPromise会出发链接激活事件

 1    private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
 2             if (promise == null) {
 3                 // Closed via cancellation and the promise has been notified already.
 4                 return;
 5             }
 6
 7             // trySuccess() will return false if a user cancelled the connection attempt.
 8             boolean promiseSet = promise.trySuccess();
 9
10             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
11             // because what happened is what happened.
12             if (!wasActive && isActive()) {
13                 pipeline().fireChannelActive();//参照前面的说明
14             }
15
16             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
17             if (!promiseSet) {
18                 close(voidPromise());
19             }
20         }
时间: 2024-11-08 21:08:21

Netty源码分析之客户端启动过程的相关文章

Netty源码分析-- 处理客户端接入请求(八)

这一节我们来一起看下,一个客户端接入进来是什么情况.首先我们根据之前的分析,先启动服务端,然后打一个断点. 这个断点打在哪里呢?就是NioEventLoop上的select方法上. 然后我们启动一个客户端. 然后我们debug看到,selectedKey的数量 = 1,说明有accept或者读写等事件发生. 接下就会进 processSelectedKeys() 我们上一节讲到,这里的attach就是NioServerSocketChannel, 我们进入 processSelectedKey(

Netty源码分析之服务启动

本节主要分析server的启动过程. Netty是基于Nio实现的,所以也离不开selector.serverSocketChannel.socketChannel和selectKey等,只不过Netty把这些实现都封装在了底层. 从示例可以看出,一切从ServerBootstrap开始. ServerBootstrap实例中需要两个NioEventLoopGroup实例,分别为boss和work,有不同的分工:1. boss负责请求的accept操作.2. work负责请求的read.writ

wxWidgets源码分析(1) - App启动过程

目录 APP启动过程 wxApp入口定义 wxApp实例化准备 wxApp的实例化 wxApp运行 总结 APP启动过程 本文主要介绍wxWidgets应用程序的启动过程,从app.cpp入手. wxApp入口定义 wxApp通过IMPLEMENT_APP宏注册App类,这个宏同时定义了入口,实现在wx/app.h文件中. // wx/app.h 文件中定义 #define IMPLEMENT_APP(app) wxIMPLEMENT_APP(app); // 可以忽略 wxIMPLEMENT_

netty 5 alph1源码分析(服务端创建过程)

参照<Netty系列之Netty 服务端创建>,研究了netty的服务端创建过程.至于netty的优势,可以参照网络其他文章.<Netty系列之Netty 服务端创建>是 李林锋撰写的netty源码分析的一篇好文,绝对是技术干货.但抛开技术来说,也存在一些瑕疵. 缺点如下 代码衔接不连贯,上下不连贯. 代码片段是截图,对阅读代理不便(可能和阅读习惯有关) 本篇主要内容,参照<Netty系列之Netty 服务端创建>,梳理出自己喜欢的阅读风格. 1.整体逻辑图 整体将服务

Netty源码分析第3章(客户端接入流程)----&gt;第4节: NioSocketChannel注册到selector

Netty源码分析第三章: 客户端接入流程 第四节: NioSocketChannel注册到selector 我们回到最初的NioMessageUnsafe的read()方法: public void read() { //必须是NioEventLoop方法调用的, 不能通过外部线程调用 assert eventLoop().inEventLoop(); //服务端channel的config final ChannelConfig config = config(); //服务端channel

Netty源码分析第3章(客户端接入流程)----&gt;第3节: NioSocketChannel的创建

Netty源码分析第三章: 客户端接入流程 第三节: NioSocketChannel的创建 回到上一小结的read()方法: public void read() { //必须是NioEventLoop方法调用的, 不能通过外部线程调用 assert eventLoop().inEventLoop(); //服务端channel的config final ChannelConfig config = config(); //服务端channel的pipeline final ChannelPi

Netty源码分析第3章(客户端接入流程)----&gt;第5节: 监听读事件

Netty源码分析第三章: 客户端接入流程 第五节: 监听读事件 我们回到AbstractUnsafe的register0()方法: private void register0(ChannelPromise promise) { try { //省略代码 //做实际的注册 doRegister(); neverRegistered = false; registered = true; //触发事件 pipeline.invokeHandlerAddedIfNeeded(); safeSetS

netty 源码分析二

以服务端启动,接收客户端连接整个过程为例分析, 简略分为 五个过程: 1.NioServerSocketChannel 管道生成, 2.NioServerSocketChannel 管道完成初始化, 3.NioServerSocketChannel注册至Selector选择器, 4.NioServerSocketChannel管道绑定到指定端口,启动服务 5.NioServerSocketChannel接受客户端的连接,进行相应IO操作 Ps:netty内部过程远比这复杂,简略记录下方便以后回忆

Netty源码分析--内存模型(上)(十一)

前两节我们分别看了FastThreadLocal和ThreadLocal的源码分析,并且在第八节的时候讲到了处理一个客户端的接入请求,一个客户端是接入进来的,是怎么注册到多路复用器上的.那么这一节我们来一起看下客户端接入完成之后,是怎么实现读写操作的?我们自己想一下,应该就是为刚刚读取的数据分配一块缓冲区,然后把channel中的信息写入到缓冲区中,然后传入到各个handler链上,分别进行处理.那Netty是怎么去分配一块缓冲区的呢?这个就涉及到了Netty的内存模型. 当然,我们在第一节的时