Netty源码分析之Pipeline创建

  不论是NioServerSocketChannel,还是NioSocketChannel,最终都会调用父类AbstractChannel的构造函数,pipeline也在channel被创建的时候被创建。

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    this.unsafe = this.newUnsafe();
    this.pipeline = new DefaultChannelPipeline(this);
}

  而这里是创建了一个DefaultChannelPipeline。构造函数会保存传入的channel,并且默认创建两个节点head和tail并将它们构成双向链表的结构。

public DefaultChannelPipeline(AbstractChannel channel) {
    if(channel == null) {
        throw new NullPointerException("channel");
    } else {
        this.channel = channel;
        this.tail = new DefaultChannelPipeline.TailContext(this);
        this.head = new DefaultChannelPipeline.HeadContext(this);
        this.head.next = this.tail;
        this.tail.prev = this.head;
    }
}

  而这里的head和tail都是实现了ChannelHeadContext接口,在pipeline中每一个节点都是ChannelHeadContext,ChannelHeadContext实现了AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker,其中AttributeMap代表ChannelHeadContext可以存储自定义的属性,而ChannelInboundInvoker, ChannelOutboundInvoker则是定义一些事件的触发方法。

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
    // 返回对应的Channel对象
    Channel channel();
    // 返回对应的EventLoop对象
    EventExecutor executor();
    // 返回对应的ChannelHandler对象
    ChannelHandler handler();
    // 该Context对应的Handler是否从pipeline中移除
    boolean isRemoved();
    /*******************以下的fire方法都是触发对应事件在pipeline中传播*********************/
    ChannelHandlerContext fireChannelRegistered();

    ChannelHandlerContext fireChannelUnregistered();

    ChannelHandlerContext fireChannelActive();

    ChannelHandlerContext fireChannelInactive();

    ChannelHandlerContext fireExceptionCaught(Throwable cause);

    ChannelHandlerContext fireUserEventTriggered(Object evt);

    ChannelHandlerContext fireChannelRead(Object msg);

    ChannelHandlerContext fireChannelReadComplete();

    ChannelHandlerContext fireChannelWritabilityChanged();

    ChannelHandlerContext read();

    ChannelHandlerContext flush();
    // 返回对应的pipeline
    ChannelPipeline pipeline();
}

  pipeline中的head和tail分别新建是HeadContext和TailContext,仔细看这两个类的源码,发现它们不仅仅是实现ChannelHandlerContext,而且本身还是一个channelHandler,而让人比较意外的是,HeadContext实现的是ChannelOutboundHandler(处理输出字节),TailContext则是ChannelInboundHandler(处理输入字节)。

  从HeadContext的源码可以看出,它的很多操作还是依赖于unsafe的。

static final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler {
    private static final String HEAD_NAME = DefaultChannelPipeline.generateName0(DefaultChannelPipeline.HeadContext.class);
    protected final Channel.Unsafe unsafe;

    HeadContext(DefaultChannelPipeline pipeline) {
        super(pipeline, (EventExecutorGroup)null, HEAD_NAME, false, true);
        this.unsafe = pipeline.channel().unsafe();
    }

    public ChannelHandler handler() {
        return this;
    }

    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    }

    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    }

    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        this.unsafe.bind(localAddress, promise);
    }

    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        this.unsafe.connect(remoteAddress, localAddress, promise);
    }

    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        this.unsafe.disconnect(promise);
    }

    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        this.unsafe.close(promise);
    }

    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        this.unsafe.deregister(promise);
    }

    public void read(ChannelHandlerContext ctx) {
        this.unsafe.beginRead();
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        this.unsafe.write(msg, promise);
    }

    public void flush(ChannelHandlerContext ctx) throws Exception {
        this.unsafe.flush();
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}

  TailContext主要就是对输入数据的一个收尾工作,它的很多方法都为空。

static final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
    private static final String TAIL_NAME = DefaultChannelPipeline.generateName0(DefaultChannelPipeline.TailContext.class);

    TailContext(DefaultChannelPipeline pipeline) {
        super(pipeline, (EventExecutorGroup)null, TAIL_NAME, true, false);
    }

    public ChannelHandler handler() {
        return this;
    }

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    }

    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
    }

    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    }

    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        DefaultChannelPipeline.logger.warn("An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.", cause);
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            DefaultChannelPipeline.logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
        } finally {
            ReferenceCountUtil.release(msg);
        }

    }

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }
}

原文地址:https://www.cnblogs.com/xiaobaituyun/p/10803521.html

时间: 2024-08-10 22:14:42

Netty源码分析之Pipeline创建的相关文章

netty 5 alph1源码分析(服务端创建过程)

参照<Netty系列之Netty 服务端创建>,研究了netty的服务端创建过程.至于netty的优势,可以参照网络其他文章.<Netty系列之Netty 服务端创建>是 李林锋撰写的netty源码分析的一篇好文,绝对是技术干货.但抛开技术来说,也存在一些瑕疵. 缺点如下 代码衔接不连贯,上下不连贯. 代码片段是截图,对阅读代理不便(可能和阅读习惯有关) 本篇主要内容,参照<Netty系列之Netty 服务端创建>,梳理出自己喜欢的阅读风格. 1.整体逻辑图 整体将服务

Netty源码分析第3章(客户端接入流程)----&gt;第3节: NioSocketChannel的创建

Netty源码分析第三章: 客户端接入流程 第三节: NioSocketChannel的创建 回到上一小结的read()方法: public void read() { //必须是NioEventLoop方法调用的, 不能通过外部线程调用 assert eventLoop().inEventLoop(); //服务端channel的config final ChannelConfig config = config(); //服务端channel的pipeline final ChannelPi

Netty源码分析第4章(pipeline)----&gt;第4节: 传播inbound事件

Netty源码分析第四章: pipeline 第四节: 传播inbound事件 有关于inbound事件, 在概述中做过简单的介绍, 就是以自己为基准, 流向自己的事件, 比如最常见的channelRead事件, 就是对方发来数据流的所触发的事件, 己方要对这些数据进行处理, 这一小节, 以激活channelRead为例讲解有关inbound事件的处理流程 在业务代码中, 我们自己的handler往往会通过重写channelRead方法来处理对方发来的数据, 那么对方发来的数据是如何走到chan

Netty源码分析第4章(pipeline)----&gt;第7节: 前章节内容回顾

Netty源码分析第四章: pipeline 第七节: 前章节内容回顾 我们在第一章和第三章中, 遗留了很多有关事件传输的相关逻辑, 这里带大家一一回顾 首先看两个问题: 1.在客户端接入的时候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))为什么会调用到ServerBootstrap的内部类ServerBootstrapAcceptor中的channelRead()方法 2.客户端handler是什么时候被添加

netty 源码分析二

以服务端启动,接收客户端连接整个过程为例分析, 简略分为 五个过程: 1.NioServerSocketChannel 管道生成, 2.NioServerSocketChannel 管道完成初始化, 3.NioServerSocketChannel注册至Selector选择器, 4.NioServerSocketChannel管道绑定到指定端口,启动服务 5.NioServerSocketChannel接受客户端的连接,进行相应IO操作 Ps:netty内部过程远比这复杂,简略记录下方便以后回忆

netty源码分析之揭开reactor线程的面纱(二)

如果你对netty的reactor线程不了解,建议先看下上一篇文章netty源码分析之揭开reactor线程的面纱(一),这里再把reactor中的三个步骤的图贴一下 reactor线程 我们已经了解到netty reactor线程的第一步是轮询出注册在selector上面的IO事件(select),那么接下来就要处理这些IO事件(process selected keys),本篇文章我们将一起来探讨netty处理IO事件的细节 我们进入到reactor线程的 run 方法,找到处理IO事件的代

Netty源码分析第3章(客户端接入流程)----&gt;第5节: 监听读事件

Netty源码分析第三章: 客户端接入流程 第五节: 监听读事件 我们回到AbstractUnsafe的register0()方法: private void register0(ChannelPromise promise) { try { //省略代码 //做实际的注册 doRegister(); neverRegistered = false; registered = true; //触发事件 pipeline.invokeHandlerAddedIfNeeded(); safeSetS

Netty源码分析第3章(客户端接入流程)----&gt;第4节: NioSocketChannel注册到selector

Netty源码分析第三章: 客户端接入流程 第四节: NioSocketChannel注册到selector 我们回到最初的NioMessageUnsafe的read()方法: public void read() { //必须是NioEventLoop方法调用的, 不能通过外部线程调用 assert eventLoop().inEventLoop(); //服务端channel的config final ChannelConfig config = config(); //服务端channel

Netty源码分析第6章(解码器)----&gt;第1节: ByteToMessageDecoder

Netty源码分析第六章: 解码器 概述: 在我们上一个章节遗留过一个问题, 就是如果Server在读取客户端的数据的时候, 如果一次读取不完整, 就触发channelRead事件, 那么Netty是如何处理这类问题的, 在这一章中, 会对此做详细剖析 之前的章节我们学习过pipeline, 事件在pipeline中传递, handler可以将事件截取并对其处理, 而之后剖析的编解码器, 其实就是一个handler, 截取byteBuf中的字节, 然后组建成业务需要的数据进行继续传播 编码器,