Netty(四):AbstractChannel源码解析

首先我们通过一张继承关系的图来认识下AbstractChannel在Netty中的位置。

除了Comaprable接口来自java自带的包,其他都是Netty包中提供的。

Comparable接口定义了Channel是可以比较的。

AttributeMap接口为Channel提供了绑定其他属性的能力。

这两个接口我们先不去深入了解,主要看ChannelOutboundInvoker。

ChannelOutboundInvoker接口主要提供了与网络链路相关的一些操作以及读写相关的操作,并统一返回了ChannelFuture对象,便于我们可以监听这些操作是否成功。

Channel接口继承了ChannelOutboundInvoker,在接口内部定义了Unsafe接口,增加了一部分方法,一些方法用来判断通道状态,一些方法用来绑定EventLoop,Pipeline和Unsafe对象。

接下来看AbstractChannel的源码:

   1 /*
   2  * Copyright 2012 The Netty Project
   3  *
   4  * The Netty Project licenses this file to you under the Apache License,
   5  * version 2.0 (the "License"); you may not use this file except in compliance
   6  * with the License. You may obtain a copy of the License at:
   7  *
   8  *   http://www.apache.org/licenses/LICENSE-2.0
   9  *
  10  * Unless required by applicable law or agreed to in writing, software
  11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13  * License for the specific language governing permissions and limitations
  14  * under the License.
  15  */
  16 package io.netty.channel;
  17
  18 import io.netty.buffer.ByteBufAllocator;
  19 import io.netty.channel.socket.ChannelOutputShutdownEvent;
  20 import io.netty.channel.socket.ChannelOutputShutdownException;
  21 import io.netty.util.DefaultAttributeMap;
  22 import io.netty.util.ReferenceCountUtil;
  23 import io.netty.util.internal.PlatformDependent;
  24 import io.netty.util.internal.ThrowableUtil;
  25 import io.netty.util.internal.UnstableApi;
  26 import io.netty.util.internal.logging.InternalLogger;
  27 import io.netty.util.internal.logging.InternalLoggerFactory;
  28
  29 import java.io.IOException;
  30 import java.net.ConnectException;
  31 import java.net.InetSocketAddress;
  32 import java.net.NoRouteToHostException;
  33 import java.net.SocketAddress;
  34 import java.net.SocketException;
  35 import java.nio.channels.ClosedChannelException;
  36 import java.nio.channels.NotYetConnectedException;
  37 import java.util.concurrent.Executor;
  38 import java.util.concurrent.RejectedExecutionException;
  39
  40 /**
  41  * A skeletal {@link Channel} implementation.
  42  */
  43 public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
  44
  45     private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
  46
  47     private static final ClosedChannelException FLUSH0_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
  48             new ClosedChannelException(), AbstractUnsafe.class, "flush0()");
  49     private static final ClosedChannelException ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
  50             new ClosedChannelException(), AbstractUnsafe.class, "ensureOpen(...)");
  51     private static final ClosedChannelException CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
  52             new ClosedChannelException(), AbstractUnsafe.class, "close(...)");
  53     private static final ClosedChannelException WRITE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
  54             new ClosedChannelException(), AbstractUnsafe.class, "write(...)");
  55     private static final NotYetConnectedException FLUSH0_NOT_YET_CONNECTED_EXCEPTION = ThrowableUtil.unknownStackTrace(
  56             new NotYetConnectedException(), AbstractUnsafe.class, "flush0()");
  57
  58     //父Channel
  59     private final Channel parent;
  60     private final ChannelId id;
  61     //Unsafe对象,封装ByteBuf的读写过程
  62     private final Unsafe unsafe;
  63     //关联的Pipeline对象
  64     private final DefaultChannelPipeline pipeline;
  65
  66     private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
  67     private final CloseFuture closeFuture = new CloseFuture(this);
  68
  69     //本地地址和远端地址
  70     private volatile SocketAddress localAddress;
  71     private volatile SocketAddress remoteAddress;
  72
  73     //用来处理读写的线程
  74     private volatile EventLoop eventLoop;
  75
  76     private volatile boolean registered;
  77     private boolean closeInitiated;
  78
  79     /** Cache for the string representation of this channel */
  80     private boolean strValActive;
  81     private String strVal;
  82
  83     /**
  84      * Creates a new instance.
  85      *
  86      * @param parent
  87      *        the parent of this channel. {@code null} if there‘s no parent.
  88      */
  89     protected AbstractChannel(Channel parent) {
  90         this.parent = parent;
  91         id = newId();
  92         unsafe = newUnsafe();
  93         pipeline = newChannelPipeline();
  94     }
  95
  96     /**
  97      * Creates a new instance.
  98      *
  99      * @param parent
 100      *        the parent of this channel. {@code null} if there‘s no parent.
 101      */
 102     protected AbstractChannel(Channel parent, ChannelId id) {
 103         this.parent = parent;
 104         this.id = id;
 105         unsafe = newUnsafe();
 106         pipeline = newChannelPipeline();
 107     }
 108
 109     @Override
 110     public final ChannelId id() {
 111         return id;
 112     }
 113
 114     /**
 115      * Returns a new {@link DefaultChannelId} instance. Subclasses may override this method to assign custom
 116      * {@link ChannelId}s to {@link Channel}s that use the {@link AbstractChannel#AbstractChannel(Channel)} constructor.
 117      */
 118     protected ChannelId newId() {
 119         return DefaultChannelId.newInstance();
 120     }
 121
 122     /**
 123      * Returns a new {@link DefaultChannelPipeline} instance.
 124      */
 125     protected DefaultChannelPipeline newChannelPipeline() {
 126         return new DefaultChannelPipeline(this);
 127     }
 128
 129     //通道是否可写
 130     @Override
 131     public boolean isWritable() {
 132         //是否存在输出缓冲区且缓冲区可写
 133         ChannelOutboundBuffer buf = unsafe.outboundBuffer();
 134         return buf != null && buf.isWritable();
 135     }
 136
 137     //获取剩余可写空间
 138     @Override
 139     public long bytesBeforeUnwritable() {
 140         ChannelOutboundBuffer buf = unsafe.outboundBuffer();
 141         // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
 142         // We should be consistent with that here.
 143         return buf != null ? buf.bytesBeforeUnwritable() : 0;
 144     }
 145
 146     //还需处理多少字节才可写
 147     @Override
 148     public long bytesBeforeWritable() {
 149         ChannelOutboundBuffer buf = unsafe.outboundBuffer();
 150         // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
 151         // We should be consistent with that here.
 152         return buf != null ? buf.bytesBeforeWritable() : Long.MAX_VALUE;
 153     }
 154
 155     @Override
 156     public Channel parent() {
 157         return parent;
 158     }
 159
 160     @Override
 161     public ChannelPipeline pipeline() {
 162         return pipeline;
 163     }
 164
 165     @Override
 166     public ByteBufAllocator alloc() {
 167         return config().getAllocator();
 168     }
 169
 170     @Override
 171     public EventLoop eventLoop() {
 172         EventLoop eventLoop = this.eventLoop;
 173         if (eventLoop == null) {
 174             throw new IllegalStateException("channel not registered to an event loop");
 175         }
 176         return eventLoop;
 177     }
 178
 179     @Override
 180     public SocketAddress localAddress() {
 181         SocketAddress localAddress = this.localAddress;
 182         if (localAddress == null) {
 183             try {
 184                 this.localAddress = localAddress = unsafe().localAddress();
 185             } catch (Throwable t) {
 186                 // Sometimes fails on a closed socket in Windows.
 187                 return null;
 188             }
 189         }
 190         return localAddress;
 191     }
 192
 193     /**
 194      * @deprecated no use-case for this.
 195      */
 196     @Deprecated
 197     protected void invalidateLocalAddress() {
 198         localAddress = null;
 199     }
 200
 201     //获取远端地址
 202     @Override
 203     public SocketAddress remoteAddress() {
 204         SocketAddress remoteAddress = this.remoteAddress;
 205         if (remoteAddress == null) {
 206             try {
 207                 this.remoteAddress = remoteAddress = unsafe().remoteAddress();
 208             } catch (Throwable t) {
 209                 // Sometimes fails on a closed socket in Windows.
 210                 return null;
 211             }
 212         }
 213         return remoteAddress;
 214     }
 215
 216     /**
 217      * @deprecated no use-case for this.
 218      */
 219     @Deprecated
 220     protected void invalidateRemoteAddress() {
 221         remoteAddress = null;
 222     }
 223
 224     //是否已经注册
 225     @Override
 226     public boolean isRegistered() {
 227         return registered;
 228     }
 229
 230     /******* ChannelOutboundInvoker接口中的方法都交给pipeline对象代理********/
 231     @Override
 232     public ChannelFuture bind(SocketAddress localAddress) {
 233         return pipeline.bind(localAddress);
 234     }
 235
 236     @Override
 237     public ChannelFuture connect(SocketAddress remoteAddress) {
 238         return pipeline.connect(remoteAddress);
 239     }
 240
 241     @Override
 242     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
 243         return pipeline.connect(remoteAddress, localAddress);
 244     }
 245
 246     @Override
 247     public ChannelFuture disconnect() {
 248         return pipeline.disconnect();
 249     }
 250
 251     @Override
 252     public ChannelFuture close() {
 253         return pipeline.close();
 254     }
 255
 256     @Override
 257     public ChannelFuture deregister() {
 258         return pipeline.deregister();
 259     }
 260
 261     @Override
 262     public Channel flush() {
 263         pipeline.flush();
 264         return this;
 265     }
 266
 267     @Override
 268     public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
 269         return pipeline.bind(localAddress, promise);
 270     }
 271
 272     @Override
 273     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
 274         return pipeline.connect(remoteAddress, promise);
 275     }
 276
 277     @Override
 278     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
 279         return pipeline.connect(remoteAddress, localAddress, promise);
 280     }
 281
 282     @Override
 283     public ChannelFuture disconnect(ChannelPromise promise) {
 284         return pipeline.disconnect(promise);
 285     }
 286
 287     @Override
 288     public ChannelFuture close(ChannelPromise promise) {
 289         return pipeline.close(promise);
 290     }
 291
 292     @Override
 293     public ChannelFuture deregister(ChannelPromise promise) {
 294         return pipeline.deregister(promise);
 295     }
 296
 297     @Override
 298     public Channel read() {
 299         pipeline.read();
 300         return this;
 301     }
 302
 303     @Override
 304     public ChannelFuture write(Object msg) {
 305         return pipeline.write(msg);
 306     }
 307
 308     @Override
 309     public ChannelFuture write(Object msg, ChannelPromise promise) {
 310         return pipeline.write(msg, promise);
 311     }
 312
 313     @Override
 314     public ChannelFuture writeAndFlush(Object msg) {
 315         return pipeline.writeAndFlush(msg);
 316     }
 317
 318     @Override
 319     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
 320         return pipeline.writeAndFlush(msg, promise);
 321     }
 322
 323     @Override
 324     public ChannelPromise newPromise() {
 325         return pipeline.newPromise();
 326     }
 327
 328     @Override
 329     public ChannelProgressivePromise newProgressivePromise() {
 330         return pipeline.newProgressivePromise();
 331     }
 332
 333     @Override
 334     public ChannelFuture newSucceededFuture() {
 335         return pipeline.newSucceededFuture();
 336     }
 337
 338     @Override
 339     public ChannelFuture newFailedFuture(Throwable cause) {
 340         return pipeline.newFailedFuture(cause);
 341     }
 342
 343     /**************ChannelOutboundInvoker接口*************/
 344
 345     //返回ChannelFuture
 346     @Override
 347     public ChannelFuture closeFuture() {
 348         return closeFuture;
 349     }
 350
 351     //返回UnSafe对象
 352     @Override
 353     public Unsafe unsafe() {
 354         return unsafe;
 355     }
 356
 357     /**
 358      * Create a new {@link AbstractUnsafe} instance which will be used for the life-time of the {@link Channel}
 359      */
 360     //抽象方法 具体获取Unsafe的方式交给子类实现,
 361     protected abstract AbstractUnsafe newUnsafe();
 362
 363     /**
 364      * Returns the ID of this channel.
 365      */
 366     @Override
 367     public final int hashCode() {
 368         return id.hashCode();
 369     }
 370
 371     /**
 372      * Returns {@code true} if and only if the specified object is identical
 373      * with this channel (i.e: {@code this == o}).
 374      */
 375     @Override
 376     public final boolean equals(Object o) {
 377         return this == o;
 378     }
 379
 380     @Override
 381     public final int compareTo(Channel o) {
 382         if (this == o) {
 383             return 0;
 384         }
 385
 386         return id().compareTo(o.id());
 387     }
 388
 389     /**
 390      * Returns the {@link String} representation of this channel.  The returned
 391      * string contains the {@linkplain #hashCode() ID}, {@linkplain #localAddress() local address},
 392      * and {@linkplain #remoteAddress() remote address} of this channel for
 393      * easier identification.
 394      */
 395     @Override
 396     public String toString() {
 397         boolean active = isActive();
 398         if (strValActive == active && strVal != null) {
 399             return strVal;
 400         }
 401
 402         SocketAddress remoteAddr = remoteAddress();
 403         SocketAddress localAddr = localAddress();
 404         if (remoteAddr != null) {
 405             StringBuilder buf = new StringBuilder(96)
 406                 .append("[id: 0x")
 407                 .append(id.asShortText())
 408                 .append(", L:")
 409                 .append(localAddr)
 410                 .append(active? " - " : " ! ")
 411                 .append("R:")
 412                 .append(remoteAddr)
 413                 .append(‘]‘);
 414             strVal = buf.toString();
 415         } else if (localAddr != null) {
 416             StringBuilder buf = new StringBuilder(64)
 417                 .append("[id: 0x")
 418                 .append(id.asShortText())
 419                 .append(", L:")
 420                 .append(localAddr)
 421                 .append(‘]‘);
 422             strVal = buf.toString();
 423         } else {
 424             StringBuilder buf = new StringBuilder(16)
 425                 .append("[id: 0x")
 426                 .append(id.asShortText())
 427                 .append(‘]‘);
 428             strVal = buf.toString();
 429         }
 430
 431         strValActive = active;
 432         return strVal;
 433     }
 434
 435     @Override
 436     public final ChannelPromise voidPromise() {
 437         return pipeline.voidPromise();
 438     }
 439
 440     /**
 441      * {@link Unsafe} implementation which sub-classes must extend and use.
 442      */
 443     //Unsafe的抽象实现,虽然是抽象类,但没有抽象的方法,只是对某些方法提供了空实现;一些方法的关键实现是交给AbstractChannel的特定子类
 444     protected abstract class AbstractUnsafe implements Unsafe {
 445
 446         private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
 447         private RecvByteBufAllocator.Handle recvHandle;
 448         private boolean inFlush0;
 449         /** true if the channel has never been registered, false otherwise */
 450         private boolean neverRegistered = true;
 451
 452         private void assertEventLoop() {
 453             assert !registered || eventLoop.inEventLoop();
 454         }
 455
 456         @Override
 457         public RecvByteBufAllocator.Handle recvBufAllocHandle() {
 458             if (recvHandle == null) {
 459                 recvHandle = config().getRecvByteBufAllocator().newHandle();
 460             }
 461             return recvHandle;
 462         }
 463
 464         @Override
 465         public final ChannelOutboundBuffer outboundBuffer() {
 466             return outboundBuffer;
 467         }
 468
 469         @Override
 470         public final SocketAddress localAddress() {
 471             return localAddress0();
 472         }
 473
 474         @Override
 475         public final SocketAddress remoteAddress() {
 476             return remoteAddress0();
 477         }
 478
 479         //注册;将Unsafe与EventLoop绑定
 480         @Override
 481         public final void register(EventLoop eventLoop, final ChannelPromise promise) {
 482             if (eventLoop == null) {
 483                 throw new NullPointerException("eventLoop");
 484             }
 485             if (isRegistered()) {
 486                 promise.setFailure(new IllegalStateException("registered to an event loop already"));
 487                 return;
 488             }
 489             if (!isCompatible(eventLoop)) {
 490                 promise.setFailure(
 491                         new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
 492                 return;
 493             }
 494
 495             AbstractChannel.this.eventLoop = eventLoop;
 496
 497             if (eventLoop.inEventLoop()) {
 498                 //如果当前线程是处理线程,直接注册
 499                 register0(promise);
 500             } else {
 501                 //如果当前线程不是处理线程,则包装成任务丢给eventLoop处理
 502                 try {
 503                     eventLoop.execute(new Runnable() {
 504                         @Override
 505                         public void run() {
 506                             register0(promise);
 507                         }
 508                     });
 509                 } catch (Throwable t) {
 510                     logger.warn(
 511                             "Force-closing a channel whose registration task was not accepted by an event loop: {}",
 512                             AbstractChannel.this, t);
 513                     closeForcibly();
 514                     closeFuture.setClosed();
 515                     safeSetFailure(promise, t);
 516                 }
 517             }
 518         }
 519
 520         private void register0(ChannelPromise promise) {
 521             try {
 522                 // check if the channel is still open as it could be closed in the mean time when the register
 523                 // call was outside of the eventLoop
 524                 //确认Channel仍然打开
 525                 if (!promise.setUncancellable() || !ensureOpen(promise)) {
 526                     return;
 527                 }
 528
 529                 boolean firstRegistration = neverRegistered;
 530                 //注册具体流程
 531                 doRegister();
 532                 //修改注册状态
 533                 neverRegistered = false;
 534                 registered = true;
 535
 536                 // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
 537                 // user may already fire events through the pipeline in the ChannelFutureListener.
 538                 //产生HandlerAdded
 539                 pipeline.invokeHandlerAddedIfNeeded();
 540
 541                 safeSetSuccess(promise);
 542                 //注册成功,产生注册事件
 543                 pipeline.fireChannelRegistered();
 544
 545                 //激活成功
 546                 if (isActive()) {
 547                     //首次注册,产生active事件
 548                     if (firstRegistration) {
 549                         pipeline.fireChannelActive();
 550                     } else if (config().isAutoRead()) {
 551                         //如果重注册且配置了自动读
 552                         // This channel was registered before and autoRead() is set. This means we need to begin read
 553                         // again so that we process inbound data.
 554                         //
 555                         // See https://github.com/netty/netty/issues/4805
 556                         beginRead();
 557                     }
 558                 }
 559             } catch (Throwable t) {
 560                 // Close the channel directly to avoid FD leak.
 561                 closeForcibly();
 562                 closeFuture.setClosed();
 563                 safeSetFailure(promise, t);
 564             }
 565         }
 566
 567         //绑定
 568         @Override
 569         public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
 570             assertEventLoop();
 571
 572             if (!promise.setUncancellable() || !ensureOpen(promise)) {
 573                 return;
 574             }
 575
 576             // See: https://github.com/netty/netty/issues/576
 577             if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
 578                 localAddress instanceof InetSocketAddress &&
 579                 !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
 580                 !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
 581                 // Warn a user about the fact that a non-root user can‘t receive a
 582                 // broadcast packet on *nix if the socket is bound on non-wildcard address.
 583                 logger.warn(
 584                         "A non-root user can‘t receive a broadcast packet if the socket " +
 585                         "is not bound to a wildcard address; binding to a non-wildcard " +
 586                         "address (" + localAddress + ") anyway as requested.");
 587             }
 588
 589             boolean wasActive = isActive();
 590             try {
 591                 //具体绑定流程
 592                 doBind(localAddress);
 593             } catch (Throwable t) {
 594                 safeSetFailure(promise, t);
 595                 closeIfClosed();
 596                 return;
 597             }
 598             //绑定成功,产生Active事件
 599             if (!wasActive && isActive()) {
 600                 invokeLater(new Runnable() {
 601                     @Override
 602                     public void run() {
 603                         pipeline.fireChannelActive();
 604                     }
 605                 });
 606             }
 607             //将Promis设置为成功
 608             safeSetSuccess(promise);
 609         }
 610
 611         //断开连接
 612         @Override
 613         public final void disconnect(final ChannelPromise promise) {
 614             assertEventLoop();
 615
 616             if (!promise.setUncancellable()) {
 617                 return;
 618             }
 619
 620             boolean wasActive = isActive();
 621             try {
 622                 //断开具体流程
 623                 doDisconnect();
 624             } catch (Throwable t) {
 625                 safeSetFailure(promise, t);
 626                 closeIfClosed();
 627                 return;
 628             }
 629
 630             //断开成功,产生Inactive事件
 631             if (wasActive && !isActive()) {
 632                 invokeLater(new Runnable() {
 633                     @Override
 634                     public void run() {
 635                         pipeline.fireChannelInactive();
 636                     }
 637                 });
 638             }
 639
 640             safeSetSuccess(promise);
 641             closeIfClosed(); // doDisconnect() might have closed the channel
 642         }
 643
 644         @Override
 645         public final void close(final ChannelPromise promise) {
 646             assertEventLoop();
 647
 648             close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
 649         }
 650
 651         /**
 652          * Shutdown the output portion of the corresponding {@link Channel}.
 653          * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
 654          */
 655         @UnstableApi
 656         public final void shutdownOutput(final ChannelPromise promise) {
 657             assertEventLoop();
 658             shutdownOutput(promise, null);
 659         }
 660
 661         /**
 662          * Shutdown the output portion of the corresponding {@link Channel}.
 663          * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
 664          * @param cause The cause which may provide rational for the shutdown.
 665          */
 666         private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
 667             if (!promise.setUncancellable()) {
 668                 return;
 669             }
 670
 671             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
 672             //如果不存在输出缓冲区,设置Promise为失败,抛出已关闭异常
 673             if (  == null) {
 674                 promise.setFailure(CLOSE_CLOSED_CHANNEL_EXCEPTION);
 675                 return;
 676             }
 677
 678             this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
 679
 680             //线程关闭原因
 681             final Throwable shutdownCause = cause == null ?
 682                     new ChannelOutputShutdownException("Channel output shutdown") :
 683                     new ChannelOutputShutdownException("Channel output shutdown", cause);
 684             Executor closeExecutor = prepareToClose();
 685             if (closeExecutor != null) {
 686                 closeExecutor.execute(new Runnable() {
 687                     @Override
 688                     public void run() {
 689                         try {
 690                             //关闭流程
 691                             doShutdownOutput();
 692                             promise.setSuccess();
 693                         } catch (Throwable err) {
 694                             promise.setFailure(err);
 695                         } finally {
 696                             // Dispatch to the EventLoop
 697                             eventLoop().execute(new Runnable() {
 698                                 @Override
 699                                 public void run() {
 700                                     closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
 701                                 }
 702                             });
 703                         }
 704                     }
 705                 });
 706             } else {
 707                 try {
 708                     // Execute the shutdown.
 709                     doShutdownOutput();
 710                     promise.setSuccess();
 711                 } catch (Throwable err) {
 712                     promise.setFailure(err);
 713                 } finally {
 714                     closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
 715                 }
 716             }
 717         }
 718
 719         private void closeOutboundBufferForShutdown(
 720                 ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {
 721             buffer.failFlushed(cause, false);
 722             buffer.close(cause, true);
 723             pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
 724         }
 725
 726         private void close(final ChannelPromise promise, final Throwable cause,
 727                            final ClosedChannelException closeCause, final boolean notify) {
 728             if (!promise.setUncancellable()) {
 729                 return;
 730             }
 731
 732             if (closeInitiated) {
 733                 if (closeFuture.isDone()) {
 734                     // Closed already.
 735                     safeSetSuccess(promise);
 736                 } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
 737                     // This means close() was called before so we just register a listener and return
 738                     closeFuture.addListener(new ChannelFutureListener() {
 739                         @Override
 740                         public void operationComplete(ChannelFuture future) throws Exception {
 741                             promise.setSuccess();
 742                         }
 743                     });
 744                 }
 745                 return;
 746             }
 747
 748             closeInitiated = true;
 749
 750             final boolean wasActive = isActive();
 751             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
 752             this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
 753             Executor closeExecutor = prepareToClose();
 754             if (closeExecutor != null) {
 755                 closeExecutor.execute(new Runnable() {
 756                     @Override
 757                     public void run() {
 758                         try {
 759                             // Execute the close.
 760                             doClose0(promise);
 761                         } finally {
 762                             // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
 763                             invokeLater(new Runnable() {
 764                                 @Override
 765                                 public void run() {
 766                                     if (outboundBuffer != null) {
 767                                         // Fail all the queued messages
 768                                         outboundBuffer.failFlushed(cause, notify);
 769                                         outboundBuffer.close(closeCause);
 770                                     }
 771                                     fireChannelInactiveAndDeregister(wasActive);
 772                                 }
 773                             });
 774                         }
 775                     }
 776                 });
 777             } else {
 778                 try {
 779                     // Close the channel and fail the queued messages in all cases.
 780                     doClose0(promise);
 781                 } finally {
 782                     if (outboundBuffer != null) {
 783                         // Fail all the queued messages.
 784                         outboundBuffer.failFlushed(cause, notify);
 785                         outboundBuffer.close(closeCause);
 786                     }
 787                 }
 788                 if (inFlush0) {
 789                     invokeLater(new Runnable() {
 790                         @Override
 791                         public void run() {
 792                             fireChannelInactiveAndDeregister(wasActive);
 793                         }
 794                     });
 795                 } else {
 796                     fireChannelInactiveAndDeregister(wasActive);
 797                 }
 798             }
 799         }
 800
 801         private void doClose0(ChannelPromise promise) {
 802             try {
 803                 doClose();
 804                 closeFuture.setClosed();
 805                 safeSetSuccess(promise);
 806             } catch (Throwable t) {
 807                 closeFuture.setClosed();
 808                 safeSetFailure(promise, t);
 809             }
 810         }
 811
 812         private void fireChannelInactiveAndDeregister(final boolean wasActive) {
 813             deregister(voidPromise(), wasActive && !isActive());
 814         }
 815
 816         @Override
 817         public final void closeForcibly() {
 818             assertEventLoop();
 819
 820             try {
 821                 doClose();
 822             } catch (Exception e) {
 823                 logger.warn("Failed to close a channel.", e);
 824             }
 825         }
 826
 827         @Override
 828         public final void deregister(final ChannelPromise promise) {
 829             assertEventLoop();
 830
 831             deregister(promise, false);
 832         }
 833
 834         private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
 835             if (!promise.setUncancellable()) {
 836                 return;
 837             }
 838
 839             if (!registered) {
 840                 safeSetSuccess(promise);
 841                 return;
 842             }
 843
 844             // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
 845             // we need to ensure we do the actual deregister operation later. This is needed as for example,
 846             // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
 847             // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
 848             // the deregister operation this could lead to have a handler invoked by different EventLoop and so
 849             // threads.
 850             //
 851             // See:
 852             // https://github.com/netty/netty/issues/4435
 853             invokeLater(new Runnable() {
 854                 @Override
 855                 public void run() {
 856                     try {
 857                         doDeregister();
 858                     } catch (Throwable t) {
 859                         logger.warn("Unexpected exception occurred while deregistering a channel.", t);
 860                     } finally {
 861                         if (fireChannelInactive) {
 862                             pipeline.fireChannelInactive();
 863                         }
 864                         // Some transports like local and AIO does not allow the deregistration of
 865                         // an open channel.  Their doDeregister() calls close(). Consequently,
 866                         // close() calls deregister() again - no need to fire channelUnregistered, so check
 867                         // if it was registered.
 868                         if (registered) {
 869                             registered = false;
 870                             pipeline.fireChannelUnregistered();
 871                         }
 872                         safeSetSuccess(promise);
 873                     }
 874                 }
 875             });
 876         }
 877
 878         @Override
 879         public final void beginRead() {
 880             assertEventLoop();
 881
 882             if (!isActive()) {
 883                 return;
 884             }
 885
 886             try {
 887                 doBeginRead();
 888             } catch (final Exception e) {
 889                 invokeLater(new Runnable() {
 890                     @Override
 891                     public void run() {
 892                         pipeline.fireExceptionCaught(e);
 893                     }
 894                 });
 895                 close(voidPromise());
 896             }
 897         }
 898
 899         @Override
 900         public final void write(Object msg, ChannelPromise promise) {
 901             assertEventLoop();
 902
 903             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
 904             if (outboundBuffer == null) {
 905                 // If the outboundBuffer is null we know the channel was closed and so
 906                 // need to fail the future right away. If it is not null the handling of the rest
 907                 // will be done in flush0()
 908                 // See https://github.com/netty/netty/issues/2362
 909                 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
 910                 // release message now to prevent resource-leak
 911                 ReferenceCountUtil.release(msg);
 912                 return;
 913             }
 914
 915             int size;
 916             try {
 917                 msg = filterOutboundMessage(msg);
 918                 size = pipeline.estimatorHandle().size(msg);
 919                 if (size < 0) {
 920                     size = 0;
 921                 }
 922             } catch (Throwable t) {
 923                 safeSetFailure(promise, t);
 924                 ReferenceCountUtil.release(msg);
 925                 return;
 926             }
 927
 928             outboundBuffer.addMessage(msg, size, promise);
 929         }
 930
 931         @Override
 932         public final void flush() {
 933             assertEventLoop();
 934
 935             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
 936             if (outboundBuffer == null) {
 937                 return;
 938             }
 939
 940             outboundBuffer.addFlush();
 941             flush0();
 942         }
 943
 944         @SuppressWarnings("deprecation")
 945         protected void flush0() {
 946             if (inFlush0) {
 947                 // Avoid re-entrance
 948                 return;
 949             }
 950
 951             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
 952             if (outboundBuffer == null || outboundBuffer.isEmpty()) {
 953                 return;
 954             }
 955
 956             inFlush0 = true;
 957
 958             // Mark all pending write requests as failure if the channel is inactive.
 959             if (!isActive()) {
 960                 try {
 961                     if (isOpen()) {
 962                         outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
 963                     } else {
 964                         // Do not trigger channelWritabilityChanged because the channel is closed already.
 965                         outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
 966                     }
 967                 } finally {
 968                     inFlush0 = false;
 969                 }
 970                 return;
 971             }
 972
 973             try {
 974                 doWrite(outboundBuffer);
 975             } catch (Throwable t) {
 976                 if (t instanceof IOException && config().isAutoClose()) {
 977                     /**
 978                      * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
 979                      * failing all flushed messages and also ensure the actual close of the underlying transport
 980                      * will happen before the promises are notified.
 981                      *
 982                      * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
 983                      * may still return {@code true} even if the channel should be closed as result of the exception.
 984                      */
 985                     close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
 986                 } else {
 987                     try {
 988                         shutdownOutput(voidPromise(), t);
 989                     } catch (Throwable t2) {
 990                         close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
 991                     }
 992                 }
 993             } finally {
 994                 inFlush0 = false;
 995             }
 996         }
 997
 998         @Override
 999         public final ChannelPromise voidPromise() {
1000             assertEventLoop();
1001
1002             return unsafeVoidPromise;
1003         }
1004
1005         protected final boolean ensureOpen(ChannelPromise promise) {
1006             if (isOpen()) {
1007                 return true;
1008             }
1009
1010             safeSetFailure(promise, ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION);
1011             return false;
1012         }
1013
1014         /**
1015          * Marks the specified {@code promise} as success.  If the {@code promise} is done already, log a message.
1016          */
1017         protected final void safeSetSuccess(ChannelPromise promise) {
1018             if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
1019                 logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
1020             }
1021         }
1022
1023         /**
1024          * Marks the specified {@code promise} as failure.  If the {@code promise} is done already, log a message.
1025          */
1026         protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
1027             if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
1028                 logger.warn("Failed to mark a promise as failure because it‘s done already: {}", promise, cause);
1029             }
1030         }
1031
1032         protected final void closeIfClosed() {
1033             if (isOpen()) {
1034                 return;
1035             }
1036             close(voidPromise());
1037         }
1038
1039         //将任务提交给EventLoop执行
1040         private void invokeLater(Runnable task) {
1041             try {
1042                 // This method is used by outbound operation implementations to trigger an inbound event later.
1043                 // They do not trigger an inbound event immediately because an outbound operation might have been
1044                 // triggered by another inbound event handler method.  If fired immediately, the call stack
1045                 // will look like this for example:
1046                 //
1047                 //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
1048                 //   -> handlerA.ctx.close()
1049                 //      -> channel.unsafe.close()
1050                 //         -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
1051                 //
1052                 // which means the execution of two inbound handler methods of the same handler overlap undesirably.
1053                 eventLoop().execute(task);
1054             } catch (RejectedExecutionException e) {
1055                 logger.warn("Can‘t invoke task later as EventLoop rejected it", e);
1056             }
1057         }
1058
1059         /**
1060          * Appends the remote address to the message of the exceptions caused by connection attempt failure.
1061          */
1062         protected final Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) {
1063             if (cause instanceof ConnectException) {
1064                 return new AnnotatedConnectException((ConnectException) cause, remoteAddress);
1065             }
1066             if (cause instanceof NoRouteToHostException) {
1067                 return new AnnotatedNoRouteToHostException((NoRouteToHostException) cause, remoteAddress);
1068             }
1069             if (cause instanceof SocketException) {
1070                 return new AnnotatedSocketException((SocketException) cause, remoteAddress);
1071             }
1072
1073             return cause;
1074         }
1075
1076         /**
1077          * Prepares to close the {@link Channel}. If this method returns an {@link Executor}, the
1078          * caller must call the {@link Executor#execute(Runnable)} method with a task that calls
1079          * {@link #doClose()} on the returned {@link Executor}. If this method returns {@code null},
1080          * {@link #doClose()} must be called from the caller thread. (i.e. {@link EventLoop})
1081          */
1082         protected Executor prepareToClose() {
1083             return null;
1084         }
1085     }
1086
1087     /**
1088      * Return {@code true} if the given {@link EventLoop} is compatible with this instance.
1089      */
1090     protected abstract boolean isCompatible(EventLoop loop);
1091
1092     /**
1093      * Returns the {@link SocketAddress} which is bound locally.
1094      */
1095     protected abstract SocketAddress localAddress0();
1096
1097     /**
1098      * Return the {@link SocketAddress} which the {@link Channel} is connected to.
1099      */
1100     protected abstract SocketAddress remoteAddress0();
1101
1102     /**
1103      * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
1104      *
1105      * Sub-classes may override this method
1106      */
1107     protected void doRegister() throws Exception {
1108         // NOOP
1109     }
1110
1111     /**
1112      * Bind the {@link Channel} to the {@link SocketAddress}
1113      */
1114     protected abstract void doBind(SocketAddress localAddress) throws Exception;
1115
1116     /**
1117      * Disconnect this {@link Channel} from its remote peer
1118      */
1119     protected abstract void doDisconnect() throws Exception;
1120
1121     /**
1122      * Close the {@link Channel}
1123      */
1124     protected abstract void doClose() throws Exception;
1125
1126     /**
1127      * Called when conditions justify shutting down the output portion of the channel. This may happen if a write
1128      * operation throws an exception.
1129      */
1130     @UnstableApi
1131     protected void doShutdownOutput() throws Exception {
1132         doClose();
1133     }
1134
1135     /**
1136      * Deregister the {@link Channel} from its {@link EventLoop}.
1137      *
1138      * Sub-classes may override this method
1139      */
1140     protected void doDeregister() throws Exception {
1141         // NOOP
1142     }
1143
1144     /**
1145      * Schedule a read operation.
1146      */
1147     protected abstract void doBeginRead() throws Exception;
1148
1149     /**
1150      * Flush the content of the given buffer to the remote peer.
1151      */
1152     protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
1153
1154     /**
1155      * Invoked when a new message is added to a {@link ChannelOutboundBuffer} of this {@link AbstractChannel}, so that
1156      * the {@link Channel} implementation converts the message to another. (e.g. heap buffer -> direct buffer)
1157      */
1158     protected Object filterOutboundMessage(Object msg) throws Exception {
1159         return msg;
1160     }
1161
1162     static final class CloseFuture extends DefaultChannelPromise {
1163
1164         CloseFuture(AbstractChannel ch) {
1165             super(ch);
1166         }
1167
1168         @Override
1169         public ChannelPromise setSuccess() {
1170             throw new IllegalStateException();
1171         }
1172
1173         @Override
1174         public ChannelPromise setFailure(Throwable cause) {
1175             throw new IllegalStateException();
1176         }
1177
1178         @Override
1179         public boolean trySuccess() {
1180             throw new IllegalStateException();
1181         }
1182
1183         @Override
1184         public boolean tryFailure(Throwable cause) {
1185             throw new IllegalStateException();
1186         }
1187
1188         boolean setClosed() {
1189             return super.trySuccess();
1190         }
1191     }
1192
1193     private static final class AnnotatedConnectException extends ConnectException {
1194
1195         private static final long serialVersionUID = 3901958112696433556L;
1196
1197         AnnotatedConnectException(ConnectException exception, SocketAddress remoteAddress) {
1198             super(exception.getMessage() + ": " + remoteAddress);
1199             initCause(exception);
1200             setStackTrace(exception.getStackTrace());
1201         }
1202
1203         @Override
1204         public Throwable fillInStackTrace() {
1205             return this;
1206         }
1207     }
1208
1209     private static final class AnnotatedNoRouteToHostException extends NoRouteToHostException {
1210
1211         private static final long serialVersionUID = -6801433937592080623L;
1212
1213         AnnotatedNoRouteToHostException(NoRouteToHostException exception, SocketAddress remoteAddress) {
1214             super(exception.getMessage() + ": " + remoteAddress);
1215             initCause(exception);
1216             setStackTrace(exception.getStackTrace());
1217         }
1218
1219         @Override
1220         public Throwable fillInStackTrace() {
1221             return this;
1222         }
1223     }
1224
1225     private static final class AnnotatedSocketException extends SocketException {
1226
1227         private static final long serialVersionUID = 3896743275010454039L;
1228
1229         AnnotatedSocketException(SocketException exception, SocketAddress remoteAddress) {
1230             super(exception.getMessage() + ": " + remoteAddress);
1231             initCause(exception);
1232             setStackTrace(exception.getStackTrace());
1233         }
1234
1235         @Override
1236         public Throwable fillInStackTrace() {
1237             return this;
1238         }
1239     }
1240 }

我们可以看到ChannelOutboundInvoker接口的方法主要交给Pipeline代理。

并且AbstractChannel留下了很多类似doXXX的抽象方法。这应该是模板模式的应用,由不同的子类实现不同的逻辑。

还注意到一点,AbstractChannel中有个抽象的内部类,实现了Unsafe接口。

Unsafe接口有很多方法与ChannelOutboundInvoker中的方法类似。

看AbstractUnsafe中的实现,与网络链路相关的方法中,具体的业务实现仍然是交给外部接口的方法实现。

比如register方法;关键的业务逻辑还是交给AbstractChannel中的doRegister方法。

并且在doRegister注册完成后,调用pipeline的fireXXX方法,让事件在Pipeline中传递。从而实现Netty的事件驱动。

原文地址:https://www.cnblogs.com/insaneXs/p/9854440.html

时间: 2024-10-11 03:08:55

Netty(四):AbstractChannel源码解析的相关文章

四.jQuery源码解析之jQuery.fn.init()的参数解析

从return new jQuery.fn.init( selector, context, rootjQuery )中可以看出 参数selector和context是来自我们在调用jQuery方法时传过来的.那么selector和context都有哪些可能. 对于表格中的4~9行中的可能做具体分析. 如果selector是字符串,则首先检测是html代码还是#id. 126行的if语句:以"<"开头,以">"结尾,且长度>=3.则假设额这个是HT

Handler机制(四)---Handler源码解析

Handler的主要用途有两个:(1).在将来的某个时刻执行消息或一个runnable,(2)把消息发送到消息队列. 主要依靠post(Runnable).postAtTime(Runnable, long).postDelayed(Runnable, long).sendEmptyMessage(int).sendMessage(Message).sendMessageAtTime(Message).sendMessageDelayed(Message, long)这些方法来来完成消息调度.p

rocketmq源码解析之NamesrvController创建

说在前面 本次开始进行rocketmq源码解析,比较喜欢rocketmq的架构设计,rocketmq内嵌了namesrv注册中心保存了元数据,进行负载均衡.容错的一些处理,4.3以上支持消息事务,有管理控制台.命令行工具,底层namesrv与broker.client与server交互netty实现. 源码解析 创建NamesrvController,进入这个方法org.apache.rocketmq.namesrv.NamesrvStartup#main,再进入这个方法org.apache.r

netty服务端启动--ServerBootstrap源码解析

netty服务端启动--ServerBootstrap源码解析 前面的第一篇文章中,我以spark中的netty客户端的创建为切入点,分析了netty的客户端引导类Bootstrap的参数设置以及启动过程.显然,我们还有另一个重要的部分--服务端的初始化和启动过程没有探究,所以这一节,我们就来从源码层面详细分析一下netty的服务端引导类ServerBootstrap的启动过程. spark中netty服务端的创建 我们仍然以spark中对netty的使用为例,以此为源码分析的切入点,首先我们看

第十四章 Executors源码解析

前边两章介绍了基础线程池ThreadPoolExecutor的使用方式.工作机理.参数详细介绍以及核心源码解析. 具体的介绍请参照: 第十二章 ThreadPoolExecutor使用与工作机理 第十三章 ThreadPoolExecutor源码解析 1.Executors与ThreadPoolExecutor ThreadPoolExecutor 可以灵活的自定义的创建线程池,可定制性很高 想创建好一个合适的线程池比较难 使用稍微麻烦一些 实际中很少使用 Executors 可以创建4种线程池

Netty 4源码解析:请求处理

Netty 4源码解析:请求处理 通过之前<Netty 4源码解析:服务端启动>的分析,我们知道在最前端"扛压力"的是NioEventLoop.run()方法.我们指定创建出的NioServerSocketChannel就是注册到了NioEventLoop中的Selector上.所以我们继续顺藤摸瓜,看看服务端启动完成后,Netty是如何处理每个请求的. 1.MainReactor 1.1 事件轮询 之前我们曾分析过到NioEventLoop.run()方法,但因为之前只关

Spring 源码解析之ViewResolver源码解析(四)

Spring 源码解析之ViewResolver源码解析(四) 1 ViewResolver类功能解析 1.1 ViewResolver Interface to be implemented by objects that can resolve views by name. View state doesn't change during the running of the application, so implementations are free to cache views. I

netty源码解析

最近在看netty的源码,本来想写一些东西的,但是无意间看到了一个牛人写的一些有关netty的博客,感觉写得太好了,故对他的博客中有关netty的部分整理了一下放入了我的印象笔记中,现在把链接公开出来,希望对想学习netty的同学有所帮助: https://app.yinxiang.com/pub/topxiall/netty netty源码解析

mybatis源码-解析配置文件(四-1)之配置文件Mapper解析(cache)

相关文章推荐 mybatis 缓存的使用, 看这篇就够了 mybatis源码-解析配置文件(四)之配置文件Mapper解析 1. 简介 本文章主要讲解的是, xxxMapper.xml 文件中, cache 节点的源码. 2. 解析 XMLMapperBuilder.cacheElement() 方法主要负责解析 <cache> private void cacheElement(XNode context) throws Exception { if (context != null) {