4. Netty源码分析之Unsafe

Unsafe类实际上是Channel接口的辅助类,实际的IO操作都是由Unsafe接口完成的。

一、Unsafe继承关系图

二、AbstractUnsafe源码分析

1. register方法

  register方法主要用于将当前Unsafe对应的Channel注册到EventLoop的多路复用器上,然后调用DefaultChannelPipeline的fireChannelRegisted方法,如果Channel被激活,则调用fireChannelActive方法。

public final void register(final ChannelPromise promise) {
    // 当前线程是否为Channel对应的NioEventLoop线程
    if (eventLoop.inEventLoop()) {
        // 如果是,则不存在多线程并发操作,直接注册
        register0(promise);
    } else {
        // 如果不是,说明是其他线程或用户线程发起的注册,存在并发操作,将其放进NioEventLoop任务队列中执行
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            promise.setFailure(t);
        }
    }
}

private void register0(ChannelPromise promise) {
    try {
        // 判断Channel是否打开了
        if (!ensureOpen(promise)) {
            return;
        }
        // 调用AbstractNioChannel的doRegister方法。请见 Netty源码分析-Channel
        doRegister();
        registered = true;
        promise.setSuccess();
        // 注册成功
        pipeline.fireChannelRegistered();
        if (isActive()) {
            // Channel被激活
            pipeline.fireChannelActive();
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        if (!promise.tryFailure(t)) {
            logger.warn(
                    "Tried to fail the registration promise, but it is complete already. " +
                            "Swallowing the cause of the registration failure:", t);
        }
    }
}

// AbstractNioChannel.doRegister()
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

2. bind方法

  bind方法主要用于绑定指定端口。对于服务端,用于绑定监听端口,并设置backlog参数;对于客户端,用于指定客户端Channel的本地绑定Socket地址。

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    if (!ensureOpen(promise)) {
        return;
    }

    // See: https://github.com/netty/netty/issues/576
    if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() &&
        Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceof InetSocketAddress &&
        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) {
        // Warn a user about the fact that a non-root user can‘t receive a
        // broadcast packet on *nix if the socket is bound on non-wildcard address.
        logger.warn(
                "A non-root user can‘t receive a broadcast packet if the socket " +
                "is not bound to a wildcard address; binding to a non-wildcard " +
                "address (" + localAddress + ") anyway as requested.");
    }

    // 是否是激活状态
    boolean wasActive = isActive();
    try {
        doBind(localAddress);
    } catch (Throwable t) {
        promise.setFailure(t);
        closeIfClosed();
        return;
    }
    if (!wasActive && isActive()) {
        // 如果是在绑定阶段成为active状态,则将调用fireChannelActive方法放进NioEventLoop执行队列中
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }
    promise.setSuccess();
}

private void invokeLater(Runnable task) {
    eventLoop().execute(task);
}

NioSocketChannel 的 diBind 实现:

protected void doBind(SocketAddress localAddress) throws Exception {
    javaChannel().socket().bind(localAddress);
}

NioServerSocketChannel 的 doBind 实现:

protected void doBind(SocketAddress localAddress) throws Exception {
    javaChannel().socket().bind(localAddress, config.getBacklog());
}

3. disconnect方法

  该方法用于客户端或服务端主动关闭连接。

public final void disconnect(final ChannelPromise promise) {
    boolean wasActive = isActive();
    try {
        doDisconnect();
    } catch (Throwable t) {
        promise.setFailure(t);
        closeIfClosed();
        return;
    }
    if (wasActive && !isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelInactive();
            }
        });
    }
    promise.setSuccess();
    closeIfClosed(); // doDisconnect() might have closed the channel
}

NioServerSocketChannel.doDisconnect():服务端不支持主动关闭连接

protected void doDisconnect() throws Exception {
    throw new UnsupportedOperationException();
}

NioSocketChannel.doDisconnect():调用SocketChannel关闭连接

protected void doDisconnect() throws Exception {
    doClose();
}

protected void doClose() throws Exception {
    javaChannel().close();
}

4. close方法

public final void close(final ChannelPromise promise) {
    // 1. 是否处于刷新状态,如果处于刷新状态说明还有消息没发出去,需要等到所有消息发完后再关闭
    // 放入队列中处理
    if (inFlush0) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                close(promise);
            }
        });
        return;
    }

    // 2. 判断关闭操作是否完成,如果已完成,则不需要重复关闭链路,设置promise成功即可
    if (closeFuture.isDone()) {
        // Closed already.
        promise.setSuccess();
        return;
    }

    // 3. 执行关闭操作,将消息发送缓冲数组置空,通知JVM回收
    boolean wasActive = isActive();
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.

    try {
        // 4. 关闭链路,本质是调用javaChannel的close方法
        doClose();
        closeFuture.setClosed();
        promise.setSuccess();
    } catch (Throwable t) {
        closeFuture.setClosed();
        promise.setFailure(t);
    }

    // 5. 调用ChannelOutboundBuffer.close()释放缓冲区消息,将链路关闭通知事件放进NioEventLoop执行队列中
    try {
        outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
        outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
    } finally {

        if (wasActive && !isActive()) {
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    pipeline.fireChannelInactive();
                }
            });
        }
        // 6. 将Channel从多路复用器上取消注册
        deregister();
    }
}

protected void doDeregister() throws Exception {
    eventLoop().cancel(selectionKey());
}

// 实际上就是将SelectionKey对应的Channel从多路复用器上去取消注册
void cancel(SelectionKey key) {
    key.cancel();
    cancelledKeys ++;
    if (cancelledKeys >= CLEANUP_INTERVAL) {
        cancelledKeys = 0;
        needsToSelectAgain = true;
    }
}

5. write方法

  write方法实际上是将消息添加到环形发送数组上,并不真正的写Channel(真正的写Channel是flush方法)。

public void write(Object msg, ChannelPromise promise) {
    if (!isActive()) {
        // 未激活,TCP链路还没建立成功,根据Channel打开情况设置不同的异常
        if (isOpen()) {
            promise.tryFailure(NOT_YET_CONNECTED_EXCEPTION);
        } else {
            promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);
        }
        // 无法发送,释放msg对象
        ReferenceCountUtil.release(msg);
    } else {
        // 链路状态正常,将数据和promise放进发送缓冲区
        outboundBuffer.addMessage(msg, promise);
    }
}

6. flush方法

  前面提到,write方法负责将消息放进发送缓冲区,并没有真正的发送,而flush方法就负责将发送缓冲区中待发送的消息全部写进Channel中并发送。

public void flush() {
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }
    // 先将unflush指针修改为tail,标识本次发送的范围
    outboundBuffer.addFlush();
    flush0();
}

protected void flush0() {
    if (inFlush0) {
        // Avoid re-entrance
        return;
    }

    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null || outboundBuffer.isEmpty()) {
        return;
    }

    inFlush0 = true;

    // Mark all pending write requests as failure if the channel is inactive.
    if (!isActive()) {
        try {
            if (isOpen()) {
                outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION);
            } else {
                outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
            }
        } finally {
            inFlush0 = false;
        }
        return;
    }

    try {
        // 调用NioSocketChannel的write方法
        doWrite(outboundBuffer);
    } catch (Throwable t) {
        outboundBuffer.failFlushed(t);
    } finally {
        inFlush0 = false;
    }
}

三、AbstractNioUnsafe源码分析

1. connect方法

  前面说到,NioSocketChannel的连接操作有三种可能:

    1. 连接成功

    2. 连接失败,关闭客户端连接

    3. 连接暂未响应,监听OP_CONNECT

  在connect方法中,如果连接成功,进行激活操作;如果连接暂未响应,则对其做一个监听,监听的内容是:如果连接失败,则关闭链路。

public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    // 设置不可取消 && Channel是打开状态
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    try {
        if (connectPromise != null) {
            // 已经有一个连接正在处理,直接抛异常
            throw new ConnectionPendingException();
        }

        boolean wasActive = isActive();

        // doConnect方法具体看NioSocketChannel.doConnect()实现
        if (doConnect(remoteAddress, localAddress)) {
            // 连接成功,进行连接后操作
            fulfillConnectPromise(promise, wasActive);
        } else {
            // 连接失败,TCP无应答,结果暂未知晓
            connectPromise = promise;
            requestedRemoteAddress = remoteAddress;

            // Schedule connect timeout.
            int connectTimeoutMillis = config().getConnectTimeoutMillis();
            if (connectTimeoutMillis > 0) {
                connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                    @Override
                    public void run() {
                        ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                        ConnectTimeoutException cause =
                                new ConnectTimeoutException("connection timed out: " + remoteAddress);
                        if (connectPromise != null && connectPromise.tryFailure(cause)) {
                            close(voidPromise());
                        }
                    }
                }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
            }

            promise.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isCancelled()) {
                        if (connectTimeoutFuture != null) {
                            connectTimeoutFuture.cancel(false);
                        }
                        connectPromise = null;
                        close(voidPromise());
                    }
                }
            });
        }
    } catch (Throwable t) {
        promise.tryFailure(annotateConnectException(t, remoteAddress));
        closeIfClosed();
    }
}

private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
    if (promise == null) {
        // Closed via cancellation and the promise has been notified already.
        return;
    }

    // 判断当前激活状态
    boolean active = isActive();

    // 如果用户取消了连接,则返回false,需调用close方法关闭链路
    boolean promiseSet = promise.trySuccess();

    // 如果doConnect之前未激活,doConnect之后激活了,需要调用fireChannelActive(即使被取消了也应该调)
    if (!wasActive && active) {
        pipeline().fireChannelActive();
    }

    // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
    if (!promiseSet) {
        close(voidPromise());
    }
}

2. finishConnect方法

  该方法用于判断连接操作是否结束。

  首先判断当前线程是否就是EventLoop执行线程,不允许其他线程操作;

  缓存当前active状态,用以下面是否要执行fireChannelActive方法;

  调用javaChannel的finishConnect方法,该方法返回三种情况:

    1)连接成功,返回true

    2)连接失败,返回false

    3)发生链路被关闭、链路中断异常,连接失败

  根据javaChannel的返回值,如果返回false,直接抛出error,进入到catch模块

  然后就根据连接状态做不同的后续处理

public final void finishConnect() {
    // Note this method is invoked by the event loop only if the connection attempt was
    // neither cancelled nor timed out.

    assert eventLoop().inEventLoop();

    try {
        boolean wasActive = isActive();
        // 通过javaChannel的finishConnect方法判断连接结果(如果连接失败则抛出Error,会走到catch块里)
        doFinishConnect();
        // 连接成功方法:fulfillConnectPromise(ChannelPromise promise, boolean wasActive)
        fulfillConnectPromise(connectPromise, wasActive);
    } catch (Throwable t) {
        // 关闭链路方法:fulfillConnectPromise(ChannelPromise promise, Throwable cause)
        fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
    } finally {
        // 如果连接超时时仍然没有收到服务端应答,则由定时任务关闭客户端连接,将SocketChannel从多路复用器上删除
        if (connectTimeoutFuture != null) {
            connectTimeoutFuture.cancel(false);
        }
        connectPromise = null;
    }
}

四、NioByteUnsafe源码分析

  这里我们主要分析下它的 read方法。

public final void read() {
            final ChannelConfig config = config();
            if (shouldBreakReadReady(config)) {
                clearReadPending();
                return;
            }
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;...

public RecvByteBufAllocator.Handle recvBufAllocHandle() {
  if (recvHandle == null) {
    recvHandle = config().getRecvByteBufAllocator().newHandle();
  }
  return recvHandle;
}

首先,获取NioSocketChannel的SocketChannelConfig,用于设置客户端连接的TCP参数。

继续看allocHandle的初始化,则从SocketChannelConfig的RecvByteBufAllocator中创建一个新的handle。

RecvByteBufAllocator有两个实现,分别是FixedRecvByteBufAllocator 和 AdaptiveRecvByteBufAllocator。FixedRecvByteBufAllocator 比较简单,我们主要分析下AdaptiveRecvByteBufAllocator。

根据名称就可以判断,AdaptiveRecvByteBufAllocator是根据本地读取的字节数动态调整下次接收缓冲区容量。

我们先看下AdaptiveRecvByteBufAllocator的 成员变量:

static final int DEFAULT_MINIMUM = 64;//最小缓冲区长度
static final int DEFAULT_INITIAL = 1024;//初始容量
static final int DEFAULT_MAXIMUM = 65536;//最大容量

private static final int INDEX_INCREMENT = 4;//动态调整扩张步进索引
private static final int INDEX_DECREMENT = 1;//动态调整收缩步进索引

private static final int[] SIZE_TABLE;//长度向量表,数组的每个值对应一个Buffer容量

// 初始化长度向量表
// 当容量小于512时,由于缓冲区已经比较小,需要降低步进值,容量每次下调幅度降低
// 当容量大于512时,说明需要解码的消息码流比较大,需要采用调大步进幅度的方式降低动态扩张频率
static {
    List<Integer> sizeTable = new ArrayList<Integer>();
    for (int i = 16; i < 512; i += 16) {
        sizeTable.add(i);
    }

    for (int i = 512; i > 0; i <<= 1) {
        sizeTable.add(i);
    }

    SIZE_TABLE = new int[sizeTable.size()];
    for (int i = 0; i < SIZE_TABLE.length; i ++) {
        SIZE_TABLE[i] = sizeTable.get(i);
    }
}

然后再来看一下AdaptiveRecvByteBufAllocator.getSizeTableIndex(..)方法:根据容量size查找容器向量表对应的索引

private static int getSizeTableIndex(final int size) {
    for (int low = 0, high = SIZE_TABLE.length - 1;;) {
        if (high < low) {
            return low;
        }
        if (high == low) {
            return high;
        }

        int mid = low + high >>> 1;
        int a = SIZE_TABLE[mid];
        int b = SIZE_TABLE[mid + 1];
        if (size > b) {
            low = mid + 1;
        } else if (size < a) {
            high = mid - 1;
        } else if (size == a) {
            return mid;
        } else {
            return mid + 1;
        }
    }
}

然后我们再来看一下AdaptiveRecvByteBufAllocator的静态内部类HandlerImpl,该类有五个成员变量:

private final int minIndex;            //最小索引
private final int maxIndex;            //最大索引
private int index;                    //当前索引
private int nextReceiveBufferSize;    //下一次预分配的Buffer大小
private boolean decreaseNow;        //是否立即执行容量收缩操作

该类有一个比较重要的方法,record(int actualReadBytes),当NioSocketChannel执行完读操作后,会计算获得本轮轮询读取的总字节数,也就是record方法的入参actualReadBytes,该方法根据读取的字节数对ByteBuf进行动态伸缩和扩张。record操作步骤如下

  1)将当前容量缩减后的值与实际读取的值做比较,如果实际读取的值小于收缩后的容量,则将缓冲区容量降低

  2)如果实际读取的值大于当前Buffer容量,说明实际分配容量不足,需要动态扩张

private void record(int actualReadBytes) {
    if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) {
        if (decreaseNow) {
            index = max(index - INDEX_DECREMENT, minIndex);
            nextReceiveBufferSize = SIZE_TABLE[index];
            decreaseNow = false;
        } else {
            decreaseNow = true;
        }
    } else if (actualReadBytes >= nextReceiveBufferSize) {
        index = min(index + INDEX_INCREMENT, maxIndex);
        nextReceiveBufferSize = SIZE_TABLE[index];
        decreaseNow = false;
    }
}

AdaptiveRecvByteBufAllocator优点总结:

1. 性能更高。容量过大会导致内存占用开销增加,后续的Buffer处理性能会下降;容量过小时需要频繁的内存扩张来接收大的请求消息,同样会导致性能下降

2. 更节约内存。根据不同的场景动态的扩张或缩减内存,达到内存使用最优化。

然后我们接着来分析 read方法,这里循环读取缓冲区数据,并根据上次读取字节数动态调整ByteBuffer大小。每次读取都要触发一次read事件 fireChannelRead,注意,这里并不是说一次read就读完了全部消息,可能存在粘包拆包情况。

当上次读取了0个字节,说明已经读完了,跳出循环,触发读操作完成事件 fireChannelReadComplete。

public final void read() {
        final ChannelConfig config = config();
        if (shouldBreakReadReady(config)) {
            clearReadPending();
            return;
        }
        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);

        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
                // 通过接收缓冲区分配器计算下次预分配的缓冲区容量并创建ByteBuffer
                byteBuf = allocHandle.allocate(allocator);
                // 这里分两步:1. doReadBytes(byteBuf):调用NioSocketChannel.doReadBytes(..),返回本次读取的字节数(返回0-无消息   返回小于0-发生了IO异常)
                // 2. 设置lastBytesRead,用以下面的处理
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {
                    // 走到这里说明上一步没有读取到数据,释放ByteBuffer
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    if (close) {
                        // 发生了IO异常,需关闭连接
                        readPending = false;
                    }
                    break;
                }

                allocHandle.incMessagesRead(1);
                readPending = false;
                // 一次读操作,触发一次read事件
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
            } while (allocHandle.continueReading());

            allocHandle.readComplete();
            // 触发读操作结束事件
            pipeline.fireChannelReadComplete();

            if (close) {
                closeOnRead(pipeline);
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
        } finally {
            // Check if there is a readPending which was not processed yet.
            // This could be for two reasons:
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
            //
            // See https://github.com/netty/netty/issues/2254
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
}

protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.attemptedBytesRead(byteBuf.writableBytes());
    return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}

public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
    ensureWritable(length);
    int writtenBytes = setBytes(writerIndex, in, length);
    if (writtenBytes > 0) {
        writerIndex += writtenBytes;
    }
    return writtenBytes;
}

原文地址:https://www.cnblogs.com/lovezmc/p/11556412.html

时间: 2024-10-14 14:57:37

4. Netty源码分析之Unsafe的相关文章

[编织消息框架][netty源码分析]7 Unsafe 实现类NioSocketChannelUnsafe职责与实现

Unsafe 是channel的内部接口, 负责跟socket底层打交道.从书写跟命名上看是不公开给开发者使用的,直到最后实现NioSocketChannelUnsafe也没有公开出去 public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> { interface Unsafe { RecvByteBufAllocator.Handle recvBufAllocHand

netty 源码分析二

以服务端启动,接收客户端连接整个过程为例分析, 简略分为 五个过程: 1.NioServerSocketChannel 管道生成, 2.NioServerSocketChannel 管道完成初始化, 3.NioServerSocketChannel注册至Selector选择器, 4.NioServerSocketChannel管道绑定到指定端口,启动服务 5.NioServerSocketChannel接受客户端的连接,进行相应IO操作 Ps:netty内部过程远比这复杂,简略记录下方便以后回忆

Netty源码分析第2章(NioEventLoop)----&gt;第7节: 处理IO事件

Netty源码分析第二章: NioEventLoop 第七节:处理IO事件 上一小节我们了解了执行select()操作的相关逻辑, 这一小节我们继续学习select()之后, 轮询到io事件的相关逻辑: 回到NioEventLoop的run()方法: protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case Sele

Netty源码分析第3章(客户端接入流程)----&gt;第4节: NioSocketChannel注册到selector

Netty源码分析第三章: 客户端接入流程 第四节: NioSocketChannel注册到selector 我们回到最初的NioMessageUnsafe的read()方法: public void read() { //必须是NioEventLoop方法调用的, 不能通过外部线程调用 assert eventLoop().inEventLoop(); //服务端channel的config final ChannelConfig config = config(); //服务端channel

Netty源码分析第3章(客户端接入流程)----&gt;第3节: NioSocketChannel的创建

Netty源码分析第三章: 客户端接入流程 第三节: NioSocketChannel的创建 回到上一小结的read()方法: public void read() { //必须是NioEventLoop方法调用的, 不能通过外部线程调用 assert eventLoop().inEventLoop(); //服务端channel的config final ChannelConfig config = config(); //服务端channel的pipeline final ChannelPi

Netty源码分析第4章(pipeline)----&gt;第7节: 前章节内容回顾

Netty源码分析第四章: pipeline 第七节: 前章节内容回顾 我们在第一章和第三章中, 遗留了很多有关事件传输的相关逻辑, 这里带大家一一回顾 首先看两个问题: 1.在客户端接入的时候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))为什么会调用到ServerBootstrap的内部类ServerBootstrapAcceptor中的channelRead()方法 2.客户端handler是什么时候被添加

Netty源码分析第5章(ByteBuf)----&gt;第3节: 内存分配器

Netty源码分析第五章: ByteBuf 第三节: 内存分配器 内存分配器, 顾明思议就是分配内存的工具, 在netty中, 内存分配器的顶级抽象是接口ByteBufAllocator, 里面定义了有关内存分配的相关api 抽象类AbstractByteBufAllocator实现了ByteBufAllocator接口, 并且实现了其大部分功能 和AbstractByteBuf一样, AbstractByteBufAllocator也实现了缓冲区分配的骨架逻辑, 剩余的交给其子类 以其中的分配

Netty源码分析第7章(编码器和写数据)----&gt;第3节: 写buffer队列

Netty源码分析七章: 编码器和写数据 第三节: 写buffer队列 之前的小结我们介绍过, writeAndFlush方法其实最终会调用write和flush方法 write方法最终会传递到head节点, 调用HeadContext的write方法: public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, prom

Netty源码分析第7章(编码器和写数据)----&gt;第4节: 刷新buffer队列

Netty源码分析第七章: 编码器和写数据 第四节: 刷新buffer队列 上一小节学习了writeAndFlush的write方法, 这一小节我们剖析flush方法 通过前面的学习我们知道, flush方法通过事件传递, 最终会传递到HeadContext的flush方法: public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); } 这里最终会调用AbstractUnsafe的flush方法