[编织消息框架][netty源码分析]7 Unsafe 实现类NioSocketChannelUnsafe职责与实现

Unsafe 是channel的内部接口, 负责跟socket底层打交道。从书写跟命名上看是不公开给开发者使用的,直到最后实现NioSocketChannelUnsafe也没有公开出去

public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
    interface Unsafe {
        RecvByteBufAllocator.Handle recvBufAllocHandle();
        SocketAddress localAddress();
        SocketAddress remoteAddress();
        void register(EventLoop eventLoop, ChannelPromise promise);
        void bind(SocketAddress localAddress, ChannelPromise promise);
        void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
        void disconnect(ChannelPromise promise);
        void close(ChannelPromise promise);
        void closeForcibly();
        void deregister(ChannelPromise promise);
        void beginRead();
        void write(Object msg, ChannelPromise promise);
        void flush();
        ChannelPromise voidPromise();
        ChannelOutboundBuffer outboundBuffer();
    }
    public interface NioUnsafe extends Unsafe {
        SelectableChannel ch();
        void finishConnect();
        void read();
        void forceFlush();
    }
}

NioSocketChannelUnsafe 继承关系为: NioSocketChannelUnsafe -> NioByteUnsafe -> AbstractNioUnsafe -> AbstractUnsafe

AbstractUnsafe:负责socket 链路绑定、接受、关闭,数据fush操作

每个操作大概分四个阶段处理

        @Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            assertEventLoop();
            //执行前检查
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }

            boolean wasActive = isActive();
            //调用实现
            try {
                doBind(localAddress);
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }

            //调用业务,通知pipeline
            if (!wasActive && isActive()) {
                invokeLater(()-> pipeline.fireChannelActive(););
            }
            //完成阶段处理
            safeSetSuccess(promise);
        }
 @Override
        public final void flush() {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                return;
            }

            outboundBuffer.addFlush();
            flush0();
        }

        @SuppressWarnings("deprecation")
        protected void flush0() {
            //刚完成Flush操作
            if (inFlush0) {
                 return;
            }

            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null || outboundBuffer.isEmpty()) {
                return;
            }

            inFlush0 = true;

            //发送数据前链路检查
            if (!isActive()) {
                try {
                    if (isOpen()) {
                        //true 通知 handler channelWritabilityChanged方法
                        outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
                    } else {
                        outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                    }
                } finally {
                    inFlush0 = false;
                }
                return;
            }

            try {
                //调用channel实现
                doWrite(outboundBuffer);
            } catch (Throwable t) {
                if (t instanceof IOException && config().isAutoClose()) {
                    close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                } else {
                    outboundBuffer.failFlushed(t, true);
                }
            } finally {
                inFlush0 = false;
            }
        }

AbstractNioUnsafe:是NioUnsafe接口模板类,简单的包装

NioByteUnsafe:主要对NioUnsafe接口 read操作实现

NioSocketChannelUnsafe:只是简单的包装,最终公开给内部使用

NioByteUnsafe read方法

      public final void read() {
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    //填充byteBuf 调用channel实现
                    int size = doReadBytes(byteBuf);
                    //记录最后读取长度
                    allocHandle.lastBytesRead(size);
                    //链路关闭,释放byteBuf
                    if (allocHandle.lastBytesRead() <= 0) {
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        break;
                    }
                    //自增消息读取处理次数
                    allocHandle.incMessagesRead(1);
                    //已完成填充byteBuf 调用业务pipeline
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                } while (allocHandle.continueReading());

                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                //如果不是主动read 要完成后要清理read op
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }

小结:可以看出没有任何的计算代码,Unsafe只实现边界检查、流程控制,具体实现交给上层处理

时间: 2024-08-11 03:30:14

[编织消息框架][netty源码分析]7 Unsafe 实现类NioSocketChannelUnsafe职责与实现的相关文章

[编织消息框架][netty源码分析]4 eventLoop 实现类NioEventLoop职责与实现

NioEventLoop 是jdk nio多路处理实现同修复jdk nio的bug 1.NioEventLoop继承SingleThreadEventLoop 重用单线程处理 2.NioEventLoop是组成 pool EventLoopGroup 基本单元 总之好多边界判断跟业务经验之类的代码,非常烦碎 重要属性 public final class NioEventLoop extends SingleThreadEventLoop { //绑定 selector Selector sel

[编织消息框架][netty源码分析]9 Promise 实现类DefaultPromise职责与实现

netty Future是基于jdk Future扩展,以监听完成任务触发执行Promise是对Future修改任务数据DefaultPromise是重要的模板类,其它不同类型实现基本是一层简单的包装,如DefaultChannelPromise主要是分析await是如何等侍结果的 public interface Future<V> extends java.util.concurrent.Future<V> { Future<V> addListener(Gener

[编织消息框架][netty源码分析]5 EventLoopGroup 实现类NioEventLoopGroup职责与实现

分析NioEventLoopGroup最主有两个疑问 1.next work如何分配NioEventLoop 2.boss group 与child group 是如何协作运行的 从EventLoopGroup接口约定通过register方法从channel或promise转换成ChannelFuture对象 next方法就是用来分配NioEventLoop public interface EventLoopGroup extends EventExecutorGroup { @Overrid

[编织消息框架][netty源码分析]11 UnpooledHeapByteBuf 与 ByteBufAllocator

每种ByteBuf都有相应的分配器ByteBufAllocator,类似工厂模式.我们先学习UnpooledHeapByteBuf与其对应的分配器UnpooledByteBufAllocator 如何知道alloc分配器那是个? 可以从官方下载的TimeServer 例子来学习,本项目已有源码可在 TestChannelHandler.class里断点追踪 从图可以看出netty 4.1.8默认的ByteBufAllocator是PooledByteBufAllocator,可以参过启动参数-D

[编织消息框架][netty源码分析]14 PoolChunk 的 PoolSubpage

final class PoolSubpage<T> implements PoolSubpageMetric { //该page分配的chunk final PoolChunk<T> chunk; //内存使用记录 private final long[] bitmap; //该page是否已释放 boolean doNotDestroy; //该page在chunk中的id,通过区段计算偏移 private final int memoryMapIdx; //该page在chu

4. Netty源码分析之Unsafe

Unsafe类实际上是Channel接口的辅助类,实际的IO操作都是由Unsafe接口完成的. 一.Unsafe继承关系图 二.AbstractUnsafe源码分析 1. register方法 register方法主要用于将当前Unsafe对应的Channel注册到EventLoop的多路复用器上,然后调用DefaultChannelPipeline的fireChannelRegisted方法,如果Channel被激活,则调用fireChannelActive方法. public final v

Netty源码分析第4章(pipeline)----&gt;第4节: 传播inbound事件

Netty源码分析第四章: pipeline 第四节: 传播inbound事件 有关于inbound事件, 在概述中做过简单的介绍, 就是以自己为基准, 流向自己的事件, 比如最常见的channelRead事件, 就是对方发来数据流的所触发的事件, 己方要对这些数据进行处理, 这一小节, 以激活channelRead为例讲解有关inbound事件的处理流程 在业务代码中, 我们自己的handler往往会通过重写channelRead方法来处理对方发来的数据, 那么对方发来的数据是如何走到chan

Netty源码分析第6章(解码器)----&gt;第1节: ByteToMessageDecoder

Netty源码分析第六章: 解码器 概述: 在我们上一个章节遗留过一个问题, 就是如果Server在读取客户端的数据的时候, 如果一次读取不完整, 就触发channelRead事件, 那么Netty是如何处理这类问题的, 在这一章中, 会对此做详细剖析 之前的章节我们学习过pipeline, 事件在pipeline中传递, handler可以将事件截取并对其处理, 而之后剖析的编解码器, 其实就是一个handler, 截取byteBuf中的字节, 然后组建成业务需要的数据进行继续传播 编码器,

Netty源码分析第7章(编码器和写数据)----&gt;第3节: 写buffer队列

Netty源码分析七章: 编码器和写数据 第三节: 写buffer队列 之前的小结我们介绍过, writeAndFlush方法其实最终会调用write和flush方法 write方法最终会传递到head节点, 调用HeadContext的write方法: public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, prom